handle no redis connection at startup gracefully
2 files changed, 39 insertions(+), 11 deletions(-)
M internal/events/redis.go → internal/events/redis.go
@@ -96,13 +96,6 @@ func (rl *RedisListener) Subscribe() (<-chan Event, errors.E) { events := make(chan Event, 1) ctx := context.TODO() channel := getKeyspaceName(key) - pubsub := rl.client.Subscribe(ctx, channel) - rl.log.Debug("subscribing", "channel", channel) - - _, err := pubsub.Receive(ctx) - if err != nil { - return nil, errors.WithMessage(err, "could not subscribe to channel") - } go func(ch <-chan *redis.Message) { for msg := range ch {@@ -119,7 +112,7 @@ RunID: runID, }, } } - }(pubsub.Channel()) + }(rl.client.Subscribe(ctx, channel).Channel()) return events, nil }
M internal/fetcher/fetcher.go → internal/fetcher/fetcher.go
@@ -4,6 +4,7 @@ import ( "context" "io" "io/fs" + "math" "net/http" "os" "path/filepath"@@ -199,7 +200,6 @@ return f.current, nil } func (f *Fetcher) Subscribe() (<-chan string, errors.E) { - ch := make(chan string, 1) err := f.checkFolder() if err != nil { return nil, err@@ -220,9 +220,40 @@ return nil, err } root = f.path(runID) } + + ch := make(chan string, 1) + go func() { + var err error + var attempt uint + for { + err = f.connect(root, ch) + if err == nil { + return + } + + next := expBackoff(attempt) + attempt++ + f.log.Warn( + "could not connect to update listener", + "error", + err, + "attempt", + attempt, + "next_try", + next, + ) + + <-time.After(next) + } + }() + + return ch, nil +} + +func (f *Fetcher) connect(root string, ch chan string) errors.E { updates, err := f.updater.Subscribe() if err != nil { - return nil, errors.WithMessage(err, "could not subscribe to updates") + return errors.WithMessage(err, "could not subscribe to updates") } go func() {@@ -250,9 +281,13 @@ } } }() - return ch, nil + return nil } func (f *Fetcher) path(runID uint64) string { return filepath.Join(f.options.Root, strconv.FormatUint(runID, 10)) } + +func expBackoff(attempt uint) time.Duration { + return time.Duration(math.Exp2(float64(attempt))) * time.Second +}