internal/events/redis.go (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | package events import ( "context" "crypto/tls" "fmt" "strconv" "time" "github.com/redis/go-redis/v9" "gitlab.com/tozd/go/errors" "go.alanpearce.eu/x/log" "go.uber.org/zap" ) const db = 0 const key = "run_id" const fallbackRunID uint64 = 210 type RedisOptions struct { Address string Username string `conf:"default:default"` Password string `conf:"default:default"` TLSEnabled bool `conf:"default:true"` } type Logger struct { log *zap.SugaredLogger } func (l *Logger) Printf(_ context.Context, format string, v ...interface{}) { l.log.Infof(format, v...) } type RedisListener struct { client *redis.Client log *log.Logger } func NewRedisListener(opts *RedisOptions, log *log.Logger) (*RedisListener, errors.E) { clientConfig := &redis.Options{ Addr: opts.Address, Username: opts.Username, Password: opts.Password, } if opts.TLSEnabled { clientConfig.TLSConfig = &tls.Config{ InsecureSkipVerify: false, MinVersion: tls.VersionTLS13, } } redis.SetLogger(&Logger{log: log.GetLogger().Sugar()}) client := redis.NewClient(clientConfig) log.Debug( "connecting to redis", "address", opts.Address, "username", opts.Username, "tls", opts.TLSEnabled, ) return &RedisListener{ client: client, log: log, }, nil } func (rl *RedisListener) GetLatestRunID() (uint64, errors.E) { ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second) defer cancel() payload, err := rl.client.Get(ctx, key).Result() if err != nil { return fallbackRunID, errors.WithMessage(err, "could not get latest run ID") } runID, err := strconv.ParseUint(payload, 10, 64) if err != nil { return fallbackRunID, errors.WithMessage(err, "could not parse latest run ID") } rl.log.Debug("redis response", "payload", payload, "run_id", runID) return runID, nil } // requires `redis-cli config set notify-keyspace-events KEA` func getKeyspaceName(key string) string { return fmt.Sprintf("__keyspace@%d__:%s", db, key) } func (rl *RedisListener) Subscribe() (<-chan Event, errors.E) { events := make(chan Event, 1) ctx := context.TODO() channel := getKeyspaceName(key) pubsub := rl.client.Subscribe(ctx, channel) rl.log.Debug("subscribing", "channel", channel) _, err := pubsub.Receive(ctx) if err != nil { return nil, errors.WithMessage(err, "could not subscribe to channel") } go func(ch <-chan *redis.Message) { for msg := range ch { rl.log.Debug("got event", "payload", msg.Payload) runID, err := rl.GetLatestRunID() if err != nil { rl.log.Warn("could not get latest run ID") } events <- Event{ CIEvent: CIEvent{ RunID: runID, }, } } }(pubsub.Channel()) return events, nil } |