diff options
author | Alan Pearce | 2024-05-16 23:41:57 +0200 |
---|---|---|
committer | Alan Pearce | 2024-05-16 23:41:57 +0200 |
commit | a5e758d41c151c17ed03b39454470ba8dd0c3b99 (patch) | |
tree | 386333b5020477eabcf490773113b029e47a21ef /internal/importer | |
parent | d558039919b6198a246a6a3fd007276191cb4b2f (diff) | |
download | searchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.tar.lz searchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.tar.zst searchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.zip |
refactor: separate fetch and import logic
Diffstat (limited to 'internal/importer')
-rw-r--r-- | internal/importer/channel.go | 97 | ||||
-rw-r--r-- | internal/importer/download-options.go | 87 | ||||
-rw-r--r-- | internal/importer/http.go | 70 | ||||
-rw-r--r-- | internal/importer/importer.go | 81 | ||||
-rw-r--r-- | internal/importer/main.go | 62 | ||||
-rw-r--r-- | internal/importer/nixpkgs-channel.go | 84 | ||||
-rw-r--r-- | internal/importer/options.go (renamed from internal/importer/ingest.go) | 6 | ||||
-rw-r--r-- | internal/importer/package.go | 6 | ||||
-rw-r--r-- | internal/importer/utils.go | 22 |
9 files changed, 84 insertions, 431 deletions
diff --git a/internal/importer/channel.go b/internal/importer/channel.go deleted file mode 100644 index 1bce1b0..0000000 --- a/internal/importer/channel.go +++ /dev/null @@ -1,97 +0,0 @@ -package importer - -import ( - "context" - "fmt" - "log/slog" - "os" - "os/exec" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - "strconv" - "strings" - - "github.com/pkg/errors" -) - -type ChannelImporter struct { - DataPath string - Source *config.Source - SourceFile string - Logger *slog.Logger -} - -func (i *ChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - dest := i.DataPath - - before, err := os.Readlink(dest) - if file.NeedNotExist(err) != nil { - return false, errors.WithMessagef(err, "could not call readlink on file %s", dest) - } - i.Logger.Debug("stat before", "name", before) - - args := []string{ - "--no-build-output", - "--timeout", - strconv.Itoa(int(i.Source.FetchTimeout.Seconds() - 1)), - fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath), - "--attr", - i.Source.Attribute, - "--out-link", - dest, - } - - if i.Source.URL != "" { - args = append(args, "-I", fmt.Sprintf("%s=%s", i.Source.Channel, i.Source.URL)) - } - - i.Logger.Debug("nix-build command", "args", args) - cmd := exec.CommandContext(ctx, "nix-build", args...) - out, err := cmd.Output() - if err != nil { - return false, errors.WithMessage(err, "failed to run nix-build (--dry-run)") - } - i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out))) - - outPath := path.Join(dest, i.Source.OutputPath) - i.Logger.Debug( - "checking output path", - "outputPath", - outPath, - "dest", - dest, - "source", - i.Source.OutputPath, - ) - after, err := os.Readlink(dest) - if err := file.NeedNotExist(err); err != nil { - return false, errors.WithMessagef( - err, - "failed to stat output file from nix-build, filename: %s", - outPath, - ) - } - i.Logger.Debug("stat after", "name", after) - - return before != after, nil -} - -func (i *ChannelImporter) Import(parent context.Context, indexer *index.WriteIndex) (bool, error) { - if i.Source.OutputPath == "" { - return false, errors.New("no output path specified") - } - - filename := path.Join(i.DataPath, i.SourceFile, i.Source.OutputPath) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -} diff --git a/internal/importer/download-options.go b/internal/importer/download-options.go deleted file mode 100644 index 6727138..0000000 --- a/internal/importer/download-options.go +++ /dev/null @@ -1,87 +0,0 @@ -package importer - -import ( - "bytes" - "context" - "log/slog" - "net/url" - "os" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - - "github.com/pkg/errors" -) - -type DownloadOptionsImporter struct { - DataPath string - Source *config.Source - SourceFile string - Logger *slog.Logger -} - -var optionsFiles = map[string]string{ - "revision": "revision", - "options": "options.json", -} - -func (i *DownloadOptionsImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - root := i.DataPath - - err := file.Mkdirp(root) - if err != nil { - return false, errors.WithMessagef(err, "error creating directory for data: %s", root) - } - - var updated bool - for _, filename := range optionsFiles { - url, err := url.JoinPath(i.Source.URL, filename) - if err != nil { - return false, errors.WithMessagef( - err, - "could not build URL with elements %s and %s", - i.Source.URL, - filename, - ) - } - - path := path.Join(root, filename) - - i.Logger.Debug("preparing to fetch URL", "url", url, "path", path) - - updated, err = fetchFileIfNeeded(ctx, path, url) - if err != nil { - return false, err - } - // don't bother to issue requests for the later files - if !updated { - return false, err - } - } - - return updated, nil -} - -func (i *DownloadOptionsImporter) Import( - parent context.Context, - indexer *index.WriteIndex, -) (bool, error) { - filename := path.Join(i.DataPath, optionsFiles["options"]) - revFilename := path.Join(i.DataPath, optionsFiles["revision"]) - bits, err := os.ReadFile(revFilename) - if err != nil { - return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename) - } - i.Source.Repo.Revision = string(bytes.TrimSpace(bits)) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -} diff --git a/internal/importer/http.go b/internal/importer/http.go deleted file mode 100644 index b496177..0000000 --- a/internal/importer/http.go +++ /dev/null @@ -1,70 +0,0 @@ -package importer - -import ( - "context" - "fmt" - "log/slog" - "net/http" - "os" - "searchix/internal/config" - "searchix/internal/file" - "strings" - "time" - - "github.com/pkg/errors" -) - -func fetchFileIfNeeded(ctx context.Context, path string, url string) (needed bool, err error) { - stat, err := file.StatIfExists(path) - if err != nil { - return false, errors.WithMessagef(err, "could not stat file %s", path) - } - - var mtime string - if stat != nil { - mtime = strings.Replace(stat.ModTime().UTC().Format(time.RFC1123), "UTC", "GMT", 1) - } - - req, err := http.NewRequestWithContext(ctx, "GET", url, http.NoBody) - if err != nil { - return false, errors.WithMessagef(err, "could not create HTTP request for %s", url) - } - - req.Header.Set("User-Agent", fmt.Sprintf("Searchix %s", config.ShortSHA)) - - if mtime != "" { - req.Header.Set("If-Modified-Since", mtime) - } - res, err := http.DefaultClient.Do(req) - if err != nil { - return false, errors.WithMessagef(err, "could not make HTTP request to %s", url) - } - defer res.Body.Close() - - switch res.StatusCode { - case http.StatusNotModified: - needed = false - case http.StatusOK: - newMtime, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) - if err != nil { - slog.Warn( - "could not parse Last-Modified header from response", - "value", - res.Header.Get("Last-Modified"), - ) - } - err = file.WriteToFile(path, res.Body) - if err != nil { - return false, errors.WithMessagef(err, "could not write response body to file %s", path) - } - err = os.Chtimes(path, time.Time{}, newMtime) - if err != nil { - slog.Warn("could not update mtime on file", "file", path) - } - needed = true - default: - return false, fmt.Errorf("got response code %d, don't know what to do", res.StatusCode) - } - - return needed, nil -} diff --git a/internal/importer/importer.go b/internal/importer/importer.go index 255f70e..57118c8 100644 --- a/internal/importer/importer.go +++ b/internal/importer/importer.go @@ -3,79 +3,32 @@ package importer import ( "context" "log/slog" - "searchix/internal/config" "searchix/internal/index" + "searchix/internal/nix" "sync" ) type Importer interface { - FetchIfNeeded(context.Context) (bool, error) Import(context.Context, *index.WriteIndex) (bool, error) } -func NewNixpkgsChannelImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *NixpkgsChannelImporter { - return &NixpkgsChannelImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -func NewChannelImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *ChannelImporter { - return &ChannelImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -func NewDownloadOptionsImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *DownloadOptionsImporter { - return &DownloadOptionsImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -type importConfig struct { - Filename string - Source *config.Source - Logger *slog.Logger +type Processor interface { + Process(context.Context) (<-chan nix.Importable, <-chan error) } -func processOptions( - parent context.Context, +func process( + ctx context.Context, indexer *index.WriteIndex, - conf *importConfig, -) (bool, error) { - ctx, cancel := context.WithTimeout(parent, conf.Source.ImportTimeout) - defer cancel() - - conf.Logger.Debug("creating option processor", "filename", conf.Filename) - processor, err := NewOptionProcessor(conf.Filename, conf.Source) - if err != nil { - return true, err - } - + processor Processor, + logger *slog.Logger, +) bool { wg := sync.WaitGroup{} wg.Add(1) - options, pErrs := processor.Process(ctx) + objects, pErrs := processor.Process(ctx) wg.Add(1) - iErrs := indexer.Import(ctx, options) + iErrs := indexer.Import(ctx, objects) var hadErrors bool go func() { @@ -85,29 +38,29 @@ func processOptions( if !running { wg.Done() iErrs = nil - conf.Logger.Info("ingest completed") + logger.Debug("ingest completed") continue } hadErrors = true - conf.Logger.Warn("error ingesting option", "error", err) + logger.Warn("error ingesting object", "error", err) case err, running := <-pErrs: if !running { wg.Done() pErrs = nil - conf.Logger.Debug("processing completed") + logger.Debug("processing completed") continue } hadErrors = true - conf.Logger.Warn("error processing option", "error", err) + logger.Warn("error processing object", "error", err) } } }() - conf.Logger.Debug("options processing", "state", "waiting") + logger.Debug("object processing", "state", "waiting") wg.Wait() - conf.Logger.Debug("options processing", "state", "complete") + logger.Debug("object processing", "state", "complete") - return hadErrors, nil + return hadErrors } diff --git a/internal/importer/main.go b/internal/importer/main.go index 0b7a99d..d2b66e1 100644 --- a/internal/importer/main.go +++ b/internal/importer/main.go @@ -2,14 +2,15 @@ package importer import ( "context" - "errors" - "log" "log/slog" "os/exec" "path" "searchix/internal/config" + "searchix/internal/fetcher" "searchix/internal/index" "strings" + + "github.com/pkg/errors" ) func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { @@ -22,27 +23,20 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration) defer cancel() - var imp Importer for name, source := range cfg.Importer.Sources { - logger := slog.With("name", name, "importer", source.Type.String()) - logger.Debug("starting importer") + logger := slog.With("name", name, "fetcher", source.Fetcher.String()) + logger.Debug("starting fetcher") - importerDataPath := path.Join(cfg.DataPath, "sources", source.Channel) + fetcherDataPath := path.Join(cfg.DataPath, "sources", source.Key) - switch source.Type { - case config.ChannelNixpkgs: - imp = NewNixpkgsChannelImporter(source, importerDataPath, logger) - case config.Channel: - imp = NewChannelImporter(source, importerDataPath, logger) - case config.DownloadOptions: - imp = NewDownloadOptionsImporter(source, importerDataPath, logger) - default: - log.Printf("unsupported importer type %s", source.Type.String()) + fetcher, err := fetcher.New(source, fetcherDataPath, logger) + if err != nil { + logger.Warn("error creating fetcher", "error", err) continue } - updated, err := imp.FetchIfNeeded(ctx) + files, updated, err := fetcher.FetchIfNeeded(ctx) if err != nil { var exerr *exec.ExitError @@ -60,16 +54,38 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { logger.Info("importer fetch succeeded", "updated", updated) if updated || replace { - hadWarnings, err := imp.Import(ctx, indexer) - + err = setRepoRevision(files.Revision, source) if err != nil { - msg := err.Error() - for _, line := range strings.Split(strings.TrimSpace(msg), "\n") { - logger.Error("importer init failed", "error", line) - } + logger.Warn("could not set source repo revision", "error", err) + } - continue + var processor Processor + switch source.Importer { + case config.Options: + logger.Debug( + "creating processor", + "filename", + files.Options, + "revision", + source.Repo.Revision, + ) + processor, err = NewOptionProcessor(files.Options, source) + case config.Packages: + logger.Debug( + "creating processor", + "filename", + files.Packages, + "revision", + source.Repo.Revision, + ) + processor, err = NewPackageProcessor(files.Packages, source) } + if err != nil { + logger.Warn("failed to create processor", "type", source.Importer, "error", err) + } + + hadWarnings := process(ctx, indexer, processor, logger) + if hadWarnings { logger.Warn("importer succeeded, but with warnings/errors") } else { diff --git a/internal/importer/nixpkgs-channel.go b/internal/importer/nixpkgs-channel.go deleted file mode 100644 index d302154..0000000 --- a/internal/importer/nixpkgs-channel.go +++ /dev/null @@ -1,84 +0,0 @@ -package importer - -import ( - "bytes" - "context" - "log/slog" - "net/url" - "os" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - - "github.com/pkg/errors" -) - -type NixpkgsChannelImporter struct { - DataPath string - Source *config.Source - Logger *slog.Logger -} - -func makeChannelURL(channel string, subPath string) (string, error) { - url, err := url.JoinPath("https://channels.nixos.org/", channel, subPath) - - return url, errors.WithMessagef(err, "error creating URL") -} - -var filesToFetch = map[string]string{ - "revision": "git-revision", - "options": "options.json.br", -} - -func (i *NixpkgsChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - root := i.DataPath - - err := file.Mkdirp(root) - if err != nil { - return false, errors.WithMessagef(err, "error creating directory for data: %s", root) - } - - for _, filename := range filesToFetch { - url, err := makeChannelURL(i.Source.Channel, filename) - if err != nil { - return false, err - } - - path := path.Join(root, filename) - - updated, err := fetchFileIfNeeded(ctx, path, url) - if err != nil { - return false, err - } - // don't bother to issue requests for the later files - if !updated { - return false, err - } - } - - return true, nil -} - -func (i *NixpkgsChannelImporter) Import( - parent context.Context, - indexer *index.WriteIndex, -) (bool, error) { - filename := path.Join(i.DataPath, filesToFetch["options"]) - revFilename := path.Join(i.DataPath, filesToFetch["revision"]) - bits, err := os.ReadFile(revFilename) - if err != nil { - return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename) - } - i.Source.Repo.Revision = string(bytes.TrimSpace(bits)) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -} diff --git a/internal/importer/ingest.go b/internal/importer/options.go index 9b92ae8..ec2c20f 100644 --- a/internal/importer/ingest.go +++ b/internal/importer/options.go @@ -91,9 +91,8 @@ func NewOptionProcessor(inpath string, source *config.Source) (*OptionIngester, return &i, nil } -func (i *OptionIngester) Process( - ctx context.Context, -) (<-chan nix.Importable, <-chan error) { +func (i *OptionIngester) Process(parent context.Context) (<-chan nix.Importable, <-chan error) { + ctx, cancel := context.WithTimeout(parent, i.source.ImportTimeout) results := make(chan nix.Importable) errs := make(chan error) @@ -101,6 +100,7 @@ func (i *OptionIngester) Process( defer i.infile.Close() defer close(results) defer close(errs) + defer cancel() slog.Debug("starting decoder stream") outer: diff --git a/internal/importer/package.go b/internal/importer/package.go index 49e313d..3e0ec83 100644 --- a/internal/importer/package.go +++ b/internal/importer/package.go @@ -109,9 +109,8 @@ func convertToLicense(in map[string]any) *nix.License { return l } -func (i *PackageIngester) Process( - ctx context.Context, -) (<-chan nix.Importable, <-chan error) { +func (i *PackageIngester) Process(parent context.Context) (<-chan nix.Importable, <-chan error) { + ctx, cancel := context.WithTimeout(parent, i.source.ImportTimeout) results := make(chan nix.Importable) errs := make(chan error) @@ -119,6 +118,7 @@ func (i *PackageIngester) Process( defer i.infile.Close() defer close(results) defer close(errs) + defer cancel() userRepo := i.source.Repo.Owner + "/" + i.source.Repo.Repo slog.Debug("starting decoder stream") diff --git a/internal/importer/utils.go b/internal/importer/utils.go index 13d4702..3eb034f 100644 --- a/internal/importer/utils.go +++ b/internal/importer/utils.go @@ -1,11 +1,15 @@ package importer import ( + "bytes" "fmt" "net/url" + "os" + "searchix/internal/config" "searchix/internal/nix" "github.com/bcicen/jstream" + "github.com/pkg/errors" ) func ValueTypeToString(valueType jstream.ValueType) string { @@ -58,3 +62,21 @@ func MakeChannelLink(channel string, ref string, subPath string) (*nix.Link, err URL: makeGitHubFileURL(channelRepoMap[channel], ref, subPath, ""), }, nil } + +func setRepoRevision(filename string, source *config.Source) error { + if filename != "" { + bits, err := os.ReadFile(filename) + if err != nil { + return errors.WithMessagef( + err, + "unable to read revision file at %s", + filename, + ) + } + + source.Repo.Revision = string(bytes.TrimSpace(bits)) + + } + + return nil +} |