package fetcher import ( "context" "io" "io/fs" "math" "net/http" "os" "path/filepath" "regexp" "strconv" "strings" "time" "github.com/google/renameio/v2" "gitlab.com/tozd/go/errors" "go.alanpearce.eu/homestead/internal/config" "go.alanpearce.eu/homestead/internal/events" "go.alanpearce.eu/x/log" ) var files = []string{"config.toml", "site.db"} var numericFilename = regexp.MustCompile("[0-9]{3,}") var timeout = 10 * time.Second type Fetcher struct { options *Options log *log.Logger updater events.Listener current uint64 } type Options struct { Root string RedisEnabled bool FetchURL config.URL Listener events.Listener } func New(log *log.Logger, options *Options) Fetcher { return Fetcher{ log: log, options: options, updater: options.Listener, } } func (f *Fetcher) getArtefacts(run uint64) errors.E { runID := strconv.FormatUint(run, 10) f.log.Debug("getting artefacts", "run_id", runID) err := os.MkdirAll(filepath.Join(f.options.Root, runID), 0o750) if err != nil { return errors.WithMessage(err, "could not create directory") } for _, file := range files { err := f.getFile(runID, file) if err != nil { return errors.WithMessage(err, "could not fetch file") } } f.current = run err = renameio.Symlink(runID, filepath.Join(f.options.Root, "current")) if err != nil { return errors.WithMessage(err, "could not create/update symlink") } return nil } func (f *Fetcher) checkFolder() errors.E { contents, err := os.ReadDir(f.options.Root) if err != nil { return errors.WithMessage(err, "could not read root directory") } var badFiles []string for _, f := range contents { name := f.Name() if name != "current" && !numericFilename.MatchString(name) { badFiles = append(badFiles, name) } } if len(badFiles) > 0 { return errors.WithStack( errors.Basef("unexpected files in root directory: %s", strings.Join(badFiles, ", ")), ) } return nil } func (f *Fetcher) CleanOldRevisions() errors.E { contents, err := os.ReadDir(f.options.Root) if err != nil { return errors.WithMessage(err, "could not read root directory") } for _, file := range contents { name := file.Name() if name == "current" { continue } if numericFilename.MatchString(name) { v, err := strconv.ParseUint(name, 10, 64) if err != nil { return errors.WithMessagef(err, "could not parse numeric filename %s", name) } if v < f.current-1 { err := os.RemoveAll(filepath.Join(f.options.Root, name)) if err != nil { return errors.WithMessage(err, "could not remove folder") } } } } return nil } func (f *Fetcher) cleanOldRevisionsAsync() { go func() { if err := f.CleanOldRevisions(); err != nil { f.log.Warn("error cleaning up old revisions", "error", err) } }() } func (f *Fetcher) getFile(runID, basename string) errors.E { filename := filepath.Join(f.options.Root, runID, basename) url := f.options.FetchURL.JoinPath(runID, basename).String() f.log.Debug("getting file", "filename", filename, "url", url) file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o600) if err != nil { return errors.WithMessage(err, "could not open file") } defer file.Close() ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return errors.WithMessage(err, "could not create request") } res, err := http.DefaultClient.Do(req) if err != nil { return errors.WithMessage(err, "could not issue request") } _, err = io.Copy(file, res.Body) if err != nil { return errors.WithMessage(err, "could not write file") } err = file.Sync() if err != nil { return errors.WithMessage(err, "could not sync file") } return nil } func (f *Fetcher) getCurrentVersion() (uint64, errors.E) { target, err := os.Readlink(filepath.Join(f.options.Root, "current")) if err != nil && errors.Is(err, fs.ErrNotExist) { return 0, errors.WithMessage(err, "could not stat current link") } f.current, err = strconv.ParseUint(target, 10, 64) if err != nil { return 0, errors.WithMessagef(err, "unexpected symlink target (current -> %s)", target) } return f.current, nil } func (f *Fetcher) initialiseStorage() (uint64, errors.E) { latest, err := f.updater.GetLatestRunID() if err != nil { f.log.Warn("could not get latest run ID, using fallback", "error", err) } f.log.Debug("versions", "current", f.current, "latest", latest) defer f.cleanOldRevisionsAsync() if latest > f.current { err = f.getArtefacts(latest) if err != nil { return latest, errors.WithMessage(err, "could not fetch artefacts") } return latest, nil } return f.current, nil } func (f *Fetcher) Subscribe() (<-chan string, errors.E) { err := f.checkFolder() if err != nil { return nil, err } var root string f.current, err = f.getCurrentVersion() if err != nil { f.log.Warn("could not get current version", "error", err) } if !f.options.RedisEnabled { root = f.path(f.current) } else { runID, err := f.initialiseStorage() if err != nil { return nil, err } root = f.path(runID) } ch := make(chan string, 1) go func() { var err error var attempt uint for { err = f.connect(root, ch) if err == nil { return } next := expBackoff(attempt) attempt++ f.log.Warn( "could not connect to update listener", "error", err, "attempt", attempt, "next_try", next, ) <-time.After(next) } }() return ch, nil } func (f *Fetcher) connect(root string, ch chan string) errors.E { updates, err := f.updater.Subscribe() if err != nil { return errors.WithMessage(err, "could not subscribe to updates") } go func() { ch <- root for update := range updates { if update.RunID == 0 { if f.options.RedisEnabled { f.log.Warn("got zero runID") continue } ch <- f.path(f.current) } else { err := f.getArtefacts(update.RunID) if err != nil { f.log.Warn("could not get artefacts for version", "run_id", update.RunID, "error", err) continue } ch <- f.path(update.RunID) } } }() return nil } func (f *Fetcher) path(runID uint64) string { return filepath.Join(f.options.Root, strconv.FormatUint(runID, 10)) } func expBackoff(attempt uint) time.Duration { return time.Duration(math.Exp2(float64(attempt))) * time.Second }