package events import ( "context" "crypto/tls" "fmt" "strconv" "time" "github.com/redis/go-redis/v9" "gitlab.com/tozd/go/errors" "go.alanpearce.eu/x/log" "go.uber.org/zap" ) const db = 0 const key = "run_id" const fallbackRunID uint64 = 210 type RedisOptions struct { Enabled bool `conf:"default:false"` Address string `conf:"default:localhost:6379"` Username string `conf:"default:default"` Password string `conf:"default:default"` TLSEnabled bool `conf:"default:false"` } type Logger struct { log *zap.SugaredLogger } func (l *Logger) Printf(_ context.Context, format string, v ...interface{}) { l.log.Infof(format, v...) } type RedisListener struct { client *redis.Client log *log.Logger } func NewRedisListener(opts *RedisOptions, log *log.Logger) (*RedisListener, error) { clientConfig := &redis.Options{ Addr: opts.Address, Username: opts.Username, Password: opts.Password, } if opts.TLSEnabled { clientConfig.TLSConfig = &tls.Config{ InsecureSkipVerify: false, MinVersion: tls.VersionTLS13, } } redis.SetLogger(&Logger{log: log.GetLogger().Sugar()}) client := redis.NewClient(clientConfig) log.Debug( "connecting to redis", "address", opts.Address, "username", opts.Username, "tls", opts.TLSEnabled, ) return &RedisListener{ client: client, log: log, }, nil } func (rl *RedisListener) GetLatestRunID() (uint64, error) { ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second) defer cancel() payload, err := rl.client.Get(ctx, key).Result() if err != nil { return fallbackRunID, errors.WithMessage(err, "could not get latest run ID") } runID, err := strconv.ParseUint(payload, 10, 64) if err != nil { return fallbackRunID, errors.WithMessage(err, "could not parse latest run ID") } rl.log.Debug("redis response", "payload", payload, "run_id", runID) return runID, nil } // requires `redis-cli config set notify-keyspace-events KEA` func getKeyspaceName(key string) string { return fmt.Sprintf("__keyspace@%d__:%s", db, key) } func (rl *RedisListener) Subscribe() (<-chan Event, error) { events := make(chan Event, 1) ctx := context.TODO() channel := getKeyspaceName(key) pubsub := rl.client.Subscribe(ctx, channel) rl.log.Debug("subscribing", "channel", channel) _, err := pubsub.Receive(ctx) if err != nil { return nil, errors.WithMessage(err, "could not subscribe to channel") } go func(ch <-chan *redis.Message) { for msg := range ch { rl.log.Debug("got event", "payload", msg.Payload) runID, err := rl.GetLatestRunID() if err != nil { rl.log.Warn("could not get latest run ID") } events <- Event{ CIEvent: CIEvent{ RunID: runID, }, } } }(pubsub.Channel()) return events, nil }