diff options
author | Alan Pearce | 2024-05-23 13:14:45 +0200 |
---|---|---|
committer | Alan Pearce | 2024-05-23 13:14:45 +0200 |
commit | 0dbfe37fbddb95c184d845c79bbe014597d55fe8 (patch) | |
tree | e68a2db861211ceebe4c357a059a4cb511f707a9 /internal/fetcher | |
parent | 3053e41b1528ef898cccd44e056e4d167619af6b (diff) | |
download | searchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.tar.lz searchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.tar.zst searchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.zip |
feat: stream files directly from fetcher to importer
Use IndexMeta to store the information relevant to making conditional updates in future runs.
Diffstat (limited to 'internal/fetcher')
-rw-r--r-- | internal/fetcher/channel.go | 61 | ||||
-rw-r--r-- | internal/fetcher/download.go | 56 | ||||
-rw-r--r-- | internal/fetcher/http.go | 74 | ||||
-rw-r--r-- | internal/fetcher/main.go | 17 | ||||
-rw-r--r-- | internal/fetcher/nixpkgs-channel.go | 60 |
5 files changed, 125 insertions, 143 deletions
diff --git a/internal/fetcher/channel.go b/internal/fetcher/channel.go index fd7427c..3756012 100644 --- a/internal/fetcher/channel.go +++ b/internal/fetcher/channel.go @@ -8,15 +8,15 @@ import ( "os/exec" "path" "searchix/internal/config" - "searchix/internal/file" + "searchix/internal/index" "strconv" "strings" + "time" "github.com/pkg/errors" ) type ChannelFetcher struct { - DataPath string Source *config.Source SourceFile string Logger *slog.Logger @@ -24,15 +24,13 @@ type ChannelFetcher struct { func NewChannelFetcher( source *config.Source, - dataPath string, logger *slog.Logger, ) (*ChannelFetcher, error) { switch source.Importer { case config.Options: return &ChannelFetcher{ - DataPath: dataPath, - Source: source, - Logger: logger, + Source: source, + Logger: logger, }, nil default: return nil, fmt.Errorf("unsupported importer type %s", source.Importer) @@ -40,22 +38,9 @@ func NewChannelFetcher( } func (i *ChannelFetcher) FetchIfNeeded( - parent context.Context, -) (f FetchedFiles, updated bool, err error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration) - defer cancel() - - dest := i.DataPath - - var before string - before, err = os.Readlink(dest) - if file.NeedNotExist(err) != nil { - err = errors.WithMessagef(err, "could not call readlink on file %s", dest) - - return - } - i.Logger.Debug("stat before", "name", before) - + ctx context.Context, + sourceMeta *index.SourceMeta, +) (f FetchedFiles, err error) { args := []string{ "--no-build-output", "--timeout", @@ -63,8 +48,7 @@ func (i *ChannelFetcher) FetchIfNeeded( fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath), "--attr", i.Source.Attribute, - "--out-link", - dest, + "--no-out-link", } if i.Source.URL != "" { @@ -80,35 +64,28 @@ func (i *ChannelFetcher) FetchIfNeeded( return } - i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out))) - outPath := path.Join(dest, i.Source.OutputPath) + outPath := path.Join(strings.TrimSpace(string(out)), i.Source.OutputPath, "options.json") i.Logger.Debug( "checking output path", "outputPath", outPath, - "dest", - dest, - "source", - i.Source.OutputPath, ) - var after string - after, err = os.Readlink(dest) - if err = file.NeedNotExist(err); err != nil { - err = errors.WithMessagef( - err, - "failed to stat output file from nix-build, filename: %s", - outPath, - ) - return + if outPath != sourceMeta.Path { + sourceMeta.Path = outPath + sourceMeta.Updated = time.Now().Truncate(time.Second) } - i.Logger.Debug("stat after", "name", after) - updated = before != after + file, err := os.Open(outPath) + if err != nil { + err = errors.WithMessage(err, "failed to open options.json") + + return + } f = FetchedFiles{ - Options: path.Join(dest, i.Source.OutputPath, "options.json"), + Options: file, } return diff --git a/internal/fetcher/download.go b/internal/fetcher/download.go index 2c7b8fd..59ef8d1 100644 --- a/internal/fetcher/download.go +++ b/internal/fetcher/download.go @@ -5,15 +5,13 @@ import ( "fmt" "log/slog" "net/url" - "path" "searchix/internal/config" - "searchix/internal/file" + "searchix/internal/index" "github.com/pkg/errors" ) type DownloadFetcher struct { - DataPath string Source *config.Source SourceFile string Logger *slog.Logger @@ -21,15 +19,13 @@ type DownloadFetcher struct { func NewDownloadFetcher( source *config.Source, - dataPath string, logger *slog.Logger, ) (*DownloadFetcher, error) { switch source.Importer { case config.Options: return &DownloadFetcher{ - DataPath: dataPath, - Source: source, - Logger: logger, + Source: source, + Logger: logger, }, nil default: return nil, fmt.Errorf("unsupported importer type %s", source.Importer) @@ -42,22 +38,11 @@ var files = map[string]string{ } func (i *DownloadFetcher) FetchIfNeeded( - parent context.Context, -) (f FetchedFiles, updated bool, err error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration) - defer cancel() - - root := i.DataPath - - err = file.Mkdirp(root) - if err != nil { - err = errors.WithMessagef(err, "error creating directory for data: %s", root) - - return - } - + ctx context.Context, + sourceMeta *index.SourceMeta, +) (f FetchedFiles, err error) { var fetchURL string - for _, filename := range files { + for key, filename := range files { fetchURL, err = url.JoinPath(i.Source.URL, filename) if err != nil { err = errors.WithMessagef( @@ -70,23 +55,28 @@ func (i *DownloadFetcher) FetchIfNeeded( return } - outPath := path.Join(root, filename) - - i.Logger.Debug("preparing to fetch URL", "url", fetchURL, "path", outPath) + i.Logger.Debug("preparing to fetch URL", "url", fetchURL) - updated, err = fetchFileIfNeeded(ctx, outPath, fetchURL) + body, mtime, err := fetchFileIfNeeded(ctx, sourceMeta.Updated, fetchURL) if err != nil { - return + i.Logger.Warn("failed to fetch file", "url", fetchURL, "error", err) + + return f, err } // don't bother to issue requests for the later files - if !updated { + if mtime.Before(sourceMeta.Updated) { break } - } - - f = FetchedFiles{ - Revision: path.Join(root, "revision"), - Options: path.Join(root, "options.json"), + sourceMeta.Updated = mtime + + switch key { + case "revision": + f.Revision = body + case "options": + f.Options = body + default: + return f, errors.Errorf("unknown file kind %s", key) + } } return diff --git a/internal/fetcher/http.go b/internal/fetcher/http.go index 9afbbc0..675c3b3 100644 --- a/internal/fetcher/http.go +++ b/internal/fetcher/http.go @@ -3,68 +3,90 @@ package fetcher import ( "context" "fmt" + "io" "log/slog" "net/http" - "os" "searchix/internal/config" - "searchix/internal/file" "strings" "time" + "github.com/andybalholm/brotli" "github.com/pkg/errors" ) -func fetchFileIfNeeded(ctx context.Context, path string, url string) (needed bool, err error) { - stat, err := file.StatIfExists(path) - if err != nil { - return false, errors.WithMessagef(err, "could not stat file %s", path) +type brotliReadCloser struct { + src io.ReadCloser + *brotli.Reader +} + +func newBrotliReader(src io.ReadCloser) *brotliReadCloser { + return &brotliReadCloser{ + src: src, + Reader: brotli.NewReader(src), } +} - var mtime string - if stat != nil { - mtime = strings.Replace(stat.ModTime().UTC().Format(time.RFC1123), "UTC", "GMT", 1) +func (r *brotliReadCloser) Close() error { + return errors.Wrap(r.src.Close(), "failed to call close on underlying reader") +} + +func fetchFileIfNeeded( + ctx context.Context, + mtime time.Time, + url string, +) (body io.ReadCloser, newMtime time.Time, err error) { + var ifModifiedSince string + if !mtime.IsZero() { + ifModifiedSince = strings.Replace(mtime.UTC().Format(time.RFC1123), "UTC", "GMT", 1) } req, err := http.NewRequestWithContext(ctx, "GET", url, http.NoBody) if err != nil { - return false, errors.WithMessagef(err, "could not create HTTP request for %s", url) + err = errors.WithMessagef(err, "could not create HTTP request for %s", url) + + return } req.Header.Set("User-Agent", fmt.Sprintf("Searchix %s", config.ShortSHA)) - if mtime != "" { - req.Header.Set("If-Modified-Since", mtime) + if ifModifiedSince != "" { + req.Header.Set("If-Modified-Since", ifModifiedSince) } res, err := http.DefaultClient.Do(req) if err != nil { - return false, errors.WithMessagef(err, "could not make HTTP request to %s", url) + err = errors.WithMessagef(err, "could not make HTTP request to %s", url) + + return } - defer res.Body.Close() switch res.StatusCode { case http.StatusNotModified: - needed = false + newMtime = mtime + + return case http.StatusOK: - newMtime, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) + newMtime, err = time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) if err != nil { slog.Warn( "could not parse Last-Modified header from response", "value", res.Header.Get("Last-Modified"), ) + newMtime = time.Now() } - err = file.WriteToFile(path, res.Body) - if err != nil { - return false, errors.WithMessagef(err, "could not write response body to file %s", path) - } - err = os.Chtimes(path, time.Time{}, newMtime) - if err != nil { - slog.Warn("could not update mtime on file", "file", path) + + switch ce := res.Header.Get("Content-Encoding"); ce { + case "br": + slog.Debug("using brotli encoding") + body = newBrotliReader(res.Body) + case "", "identity", "gzip": + body = res.Body + default: + err = fmt.Errorf("cannot handle a body with content-encoding %s", ce) } - needed = true default: - return false, fmt.Errorf("got response code %d, don't know what to do", res.StatusCode) + err = fmt.Errorf("got response code %d, don't know what to do", res.StatusCode) } - return needed, nil + return } diff --git a/internal/fetcher/main.go b/internal/fetcher/main.go index 65f62db..7ea0b03 100644 --- a/internal/fetcher/main.go +++ b/internal/fetcher/main.go @@ -2,34 +2,35 @@ package fetcher import ( "context" + "io" "log/slog" "searchix/internal/config" + "searchix/internal/index" "github.com/pkg/errors" ) type FetchedFiles struct { - Revision string - Options string - Packages string + Revision io.ReadCloser + Options io.ReadCloser + Packages io.ReadCloser } type Fetcher interface { - FetchIfNeeded(context.Context) (FetchedFiles, bool, error) + FetchIfNeeded(context.Context, *index.SourceMeta) (FetchedFiles, error) } func New( source *config.Source, - fetcherDataPath string, logger *slog.Logger, ) (fetcher Fetcher, err error) { switch source.Fetcher { case config.ChannelNixpkgs: - fetcher, err = NewNixpkgsChannelFetcher(source, fetcherDataPath, logger) + fetcher, err = NewNixpkgsChannelFetcher(source, logger) case config.Channel: - fetcher, err = NewChannelFetcher(source, fetcherDataPath, logger) + fetcher, err = NewChannelFetcher(source, logger) case config.Download: - fetcher, err = NewDownloadFetcher(source, fetcherDataPath, logger) + fetcher, err = NewDownloadFetcher(source, logger) default: err = errors.Errorf("unsupported fetcher type %s", source.Fetcher.String()) } diff --git a/internal/fetcher/nixpkgs-channel.go b/internal/fetcher/nixpkgs-channel.go index 033b577..62fea13 100644 --- a/internal/fetcher/nixpkgs-channel.go +++ b/internal/fetcher/nixpkgs-channel.go @@ -5,17 +5,15 @@ import ( "fmt" "log/slog" "net/url" - "path" "searchix/internal/config" - "searchix/internal/file" + "searchix/internal/index" "github.com/pkg/errors" ) type NixpkgsChannelFetcher struct { - DataPath string - Source *config.Source - Logger *slog.Logger + Source *config.Source + Logger *slog.Logger } func makeChannelURL(channel string, subPath string) (string, error) { @@ -26,15 +24,13 @@ func makeChannelURL(channel string, subPath string) (string, error) { func NewNixpkgsChannelFetcher( source *config.Source, - dataPath string, logger *slog.Logger, ) (*NixpkgsChannelFetcher, error) { switch source.Importer { case config.Options, config.Packages: return &NixpkgsChannelFetcher{ - DataPath: dataPath, - Source: source, - Logger: logger, + Source: source, + Logger: logger, }, nil default: return nil, fmt.Errorf("unsupported importer type %s", source.Importer) @@ -48,19 +44,9 @@ const ( ) func (i *NixpkgsChannelFetcher) FetchIfNeeded( - parent context.Context, -) (f FetchedFiles, updated bool, err error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration) - defer cancel() - - root := i.DataPath - - err = file.Mkdirp(root) - if err != nil { - err = errors.WithMessagef(err, "error creating directory for data: %s", root) - - return - } + ctx context.Context, + sourceMeta *index.SourceMeta, +) (f FetchedFiles, err error) { filesToFetch := make([]string, 2) filesToFetch[0] = revisionFilename @@ -78,23 +64,29 @@ func (i *NixpkgsChannelFetcher) FetchIfNeeded( return } - outPath := path.Join(root, filename) - - i.Logger.Debug("attempting to fetch file", "url", fetchURL, "outPath", outPath) - updated, err = fetchFileIfNeeded(ctx, outPath, fetchURL) + i.Logger.Debug("attempting to fetch file", "url", fetchURL) + body, mtime, err := fetchFileIfNeeded(ctx, sourceMeta.Updated, fetchURL) if err != nil { - return + i.Logger.Warn("failed to fetch file", "url", fetchURL, "error", err) + + return f, err } // don't bother to issue requests for the later files - if !updated { + if mtime.Before(sourceMeta.Updated) { break } - } - - f = FetchedFiles{ - Revision: path.Join(root, "git-revision"), - Options: path.Join(root, "options.json.br"), - Packages: path.Join(root, "packages.json.br"), + sourceMeta.Updated = mtime + + switch filename { + case revisionFilename: + f.Revision = body + case optionsFilename: + f.Options = body + case packagesFileName: + f.Packages = body + default: + return f, errors.Errorf("unknown file kind %s", filename) + } } return |