package fetcher import ( "context" "io" "io/fs" "net/http" "os" "path/filepath" "regexp" "strconv" "strings" "time" "github.com/google/renameio/v2" "gitlab.com/tozd/go/errors" "go.alanpearce.eu/homestead/internal/config" "go.alanpearce.eu/homestead/internal/events" "go.alanpearce.eu/x/log" ) var files = []string{"config.toml", "site.db"} var numericFilename = regexp.MustCompile("[0-9]{3,}") var timeout = 10 * time.Second type Fetcher struct { options *Options log *log.Logger updater events.Listener } type Options struct { Root string RedisEnabled bool FetchURL config.URL Listener events.Listener } func New(log *log.Logger, options *Options) Fetcher { return Fetcher{ log: log, options: options, updater: options.Listener, } } func (f *Fetcher) getArtefacts(run uint64) error { runID := strconv.FormatUint(run, 10) f.log.Debug("getting artefacts", "run_id", runID) err := os.MkdirAll(filepath.Join(f.options.Root, runID), 0755) if err != nil { return errors.WithMessage(err, "could not create directory") } for _, file := range files { err := f.getFile(runID, file) if err != nil { return errors.WithMessage(err, "could not fetch file") } } err = renameio.Symlink(runID, filepath.Join(f.options.Root, "current")) if err != nil { return errors.WithMessage(err, "could not create/update symlink") } return nil } func (f *Fetcher) checkFolder() error { contents, err := os.ReadDir(f.options.Root) if err != nil { return errors.WithMessage(err, "could not read root directory") } var badFiles []string for _, f := range contents { name := f.Name() if !(name == "current" || numericFilename.MatchString(name)) { badFiles = append(badFiles, name) } } if len(badFiles) > 0 { return errors.Basef("unexpected files in root directory: %s", strings.Join(badFiles, ", ")) } return nil } func (f *Fetcher) getFile(runID, basename string) error { filename := filepath.Join(f.options.Root, runID, basename) url := f.options.FetchURL.JoinPath(runID, basename).String() f.log.Debug("getting file", "filename", filename, "url", url) file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return errors.WithMessage(err, "could not open file") } defer file.Close() ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return errors.WithMessage(err, "could not create request") } res, err := http.DefaultClient.Do(req) if err != nil { return errors.WithMessage(err, "could not issue request") } _, err = io.Copy(file, res.Body) if err != nil { return errors.WithMessage(err, "could not write file") } err = file.Sync() if err != nil { return errors.WithMessage(err, "could not sync file") } return nil } func (f *Fetcher) getCurrentVersion() (uint64, error) { target, err := os.Readlink(filepath.Join(f.options.Root, "current")) if err != nil && errors.Is(err, fs.ErrNotExist) { return 0, errors.WithMessage(err, "could not stat current link") } runID, err := strconv.ParseUint(target, 10, 64) if err != nil { return 0, errors.WithMessagef(err, "unexpected symlink target (current -> %s)", target) } return runID, nil } func (f *Fetcher) initialiseStorage() (uint64, error) { latest, err := f.updater.GetLatestRunID() if err != nil { f.log.Warn("could not get latest run ID, using fallback", "error", err) } current, err := f.getCurrentVersion() if err != nil { f.log.Warn("could not get current version", "error", err) } f.log.Debug("versions", "current", current, "latest", latest) if latest > current { err = f.getArtefacts(latest) if err != nil { return latest, errors.WithMessage(err, "could not fetch artefacts") } return latest, nil } return current, nil } func (f *Fetcher) Subscribe() (<-chan string, error) { ch := make(chan string, 1) err := f.checkFolder() if err != nil { return nil, err } var root string if f.options.RedisEnabled { root = f.path("current") } else { runID, err := f.initialiseStorage() if err != nil { return nil, err } root = f.path(strconv.FormatUint(runID, 10)) } updates, err := f.updater.Subscribe() if err != nil { return nil, errors.WithMessage(err, "could not subscribe to updates") } go func() { ch <- root for update := range updates { if update.RunID == 0 { if !f.options.RedisEnabled { f.log.Warn("got zero runID") continue } ch <- f.path("current") } else { err := f.getArtefacts(update.RunID) if err != nil { f.log.Warn("could not get artefacts for version", "run_id", update.RunID, "error", err) continue } ch <- f.path(strconv.FormatUint(update.RunID, 10)) } } }() return ch, nil } func (f *Fetcher) path(runID string) string { return filepath.Join(f.options.Root, runID) }