refactor: separate fetch and import logic
Alan Pearce alan@alanpearce.eu
Thu, 16 May 2024 23:41:57 +0200
18 files changed, 517 insertions(+), 417 deletions(-)
jump to
- internal/config/config.go
- internal/config/fetcher.go
- internal/config/importer-type.go
- internal/config/repository.go
- internal/config/source.go
- internal/fetcher/channel.go
- internal/fetcher/download.go
- internal/fetcher/main.go
- internal/fetcher/nixpkgs-channel.go
- internal/fetcher/http.go
- internal/importer/importer.go
- internal/importer/options.go
- internal/importer/main.go
- internal/importer/package.go
- internal/importer/utils.go
M internal/config/config.go → internal/config/config.go
@@ -115,12 +115,13 @@ "nixos": { Name: "NixOS", Key: "nixos", Enable: true, - Type: Channel, + Importer: Options, + Fetcher: Channel, Channel: "nixpkgs", URL: "https://channels.nixos.org/nixos-unstable/nixexprs.tar.xz", ImportPath: "nixos/release.nix", Attribute: "options", - OutputPath: "share/doc/nixos/options.json", + OutputPath: "share/doc/nixos", FetchTimeout: 5 * time.Minute, ImportTimeout: 15 * time.Minute, Repo: nixpkgs, @@ -129,12 +130,13 @@ "darwin": { Name: "Darwin", Key: "darwin", Enable: false, - Type: Channel, + Importer: Options, + Fetcher: Channel, Channel: "darwin", URL: "https://github.com/LnL7/nix-darwin/archive/master.tar.gz", ImportPath: "release.nix", Attribute: "options", - OutputPath: "share/doc/darwin/options.json", + OutputPath: "share/doc/darwin", FetchTimeout: 5 * time.Minute, ImportTimeout: 15 * time.Minute, Repo: Repository{ @@ -147,12 +149,13 @@ "home-manager": { Name: "Home Manager", Key: "home-manager", Enable: false, + Importer: Options, Channel: "home-manager", URL: "https://github.com/nix-community/home-manager/archive/master.tar.gz", - Type: Channel, + Fetcher: Channel, ImportPath: "default.nix", Attribute: "docs.json", - OutputPath: "share/doc/home-manager/options.json", + OutputPath: "share/doc/home-manager", FetchTimeout: 5 * time.Minute, ImportTimeout: 15 * time.Minute, Repo: Repository{ @@ -165,7 +168,9 @@ "nixpkgs": { Name: "Nix Packages", Key: "nixpkgs", Enable: true, - Type: ChannelNixpkgs, + Importer: Packages, + Fetcher: ChannelNixpkgs, + Channel: "nixos-unstable", OutputPath: "packages.json.br", FetchTimeout: 5 * time.Minute, ImportTimeout: 15 * time.Minute,
A internal/config/fetcher.go
@@ -0,0 +1,49 @@+package config + +import ( + "fmt" + + "github.com/stoewer/go-strcase" +) + +type Fetcher int + +const ( + UnknownFetcher = iota + Channel + ChannelNixpkgs + Download +) + +func (f Fetcher) String() string { + switch f { + case Channel: + return "channel" + case ChannelNixpkgs: + return "channel-nixpkgs" + case Download: + return "download" + } + + return fmt.Sprintf("Fetcher(%d)", f) +} + +func parseFetcher(name string) (Fetcher, error) { + switch strcase.KebabCase(name) { + case "channel": + return Channel, nil + case "channel-nixpkgs": + return ChannelNixpkgs, nil + case "download": + return Download, nil + default: + return UnknownFetcher, fmt.Errorf("unsupported fetcher %s", name) + } +} + +func (f *Fetcher) UnmarshalText(text []byte) error { + var err error + *f, err = parseFetcher(string(text)) + + return err +}
A internal/config/importer-type.go
@@ -0,0 +1,44 @@+package config + +import ( + "fmt" + + "github.com/stoewer/go-strcase" +) + +type ImporterType int + +const ( + UnknownType = iota + Packages + Options +) + +func (i ImporterType) String() string { + switch i { + case Packages: + return "packages" + case Options: + return "options" + } + + return fmt.Sprintf("Type(%d)", i) +} + +func parseType(name string) (ImporterType, error) { + switch strcase.KebabCase(name) { + case "packages": + return Packages, nil + case "options": + return Options, nil + default: + return UnknownType, fmt.Errorf("unsupported importer %s", name) + } +} + +func (i *ImporterType) UnmarshalText(text []byte) error { + var err error + *i, err = parseType(string(text)) + + return err +}
M internal/config/repository.go → internal/config/repository.go
@@ -8,7 +8,8 @@ type RepoType int const ( - GitHub = iota + 1 + UnknownRepoType = iota + GitHub ) type Repository struct { @@ -32,7 +33,7 @@ switch strings.ToLower(name) { case "github": return GitHub, nil default: - return Unknown, fmt.Errorf("unsupported repo type %s", name) + return UnknownRepoType, fmt.Errorf("unsupported repo type %s", name) } }
M internal/config/source.go → internal/config/source.go
@@ -1,59 +1,15 @@ package config import ( - "fmt" "time" - - "github.com/stoewer/go-strcase" -) - -type Type int - -const ( - Unknown = iota - Channel - ChannelNixpkgs - DownloadOptions ) -func (f Type) String() string { - switch f { - case Channel: - return "channel" - case ChannelNixpkgs: - return "channel-nixpkgs" - case DownloadOptions: - return "download-options" - } - - return fmt.Sprintf("Fetcher(%d)", f) -} - -func parseType(name string) (Type, error) { - switch strcase.KebabCase(name) { - case "channel": - return Channel, nil - case "channel-nixpkgs": - return ChannelNixpkgs, nil - case "download-options": - return DownloadOptions, nil - default: - return Unknown, fmt.Errorf("unsupported fetcher %s", name) - } -} - -func (f *Type) UnmarshalText(text []byte) error { - var err error - *f, err = parseType(string(text)) - - return err -} - type Source struct { Name string Key string Enable bool - Type Type + Fetcher Fetcher + Importer ImporterType Channel string URL string Attribute string
A internal/fetcher/channel.go
@@ -0,0 +1,99 @@+package fetcher + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path" + "searchix/internal/config" + "searchix/internal/file" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +type ChannelFetcher struct { + DataPath string + Source *config.Source + SourceFile string + Logger *slog.Logger +} + +func (i *ChannelFetcher) FetchIfNeeded( + parent context.Context, +) (f FetchedFiles, updated bool, err error) { + ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) + defer cancel() + + dest := i.DataPath + + var before string + before, err = os.Readlink(dest) + if file.NeedNotExist(err) != nil { + err = errors.WithMessagef(err, "could not call readlink on file %s", dest) + + return + } + i.Logger.Debug("stat before", "name", before) + + args := []string{ + "--no-build-output", + "--timeout", + strconv.Itoa(int(i.Source.FetchTimeout.Seconds() - 1)), + fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath), + "--attr", + i.Source.Attribute, + "--out-link", + dest, + } + + if i.Source.URL != "" { + args = append(args, "-I", fmt.Sprintf("%s=%s", i.Source.Channel, i.Source.URL)) + } + + i.Logger.Debug("nix-build command", "args", args) + cmd := exec.CommandContext(ctx, "nix-build", args...) + var out []byte + out, err = cmd.Output() + if err != nil { + err = errors.WithMessage(err, "failed to run nix-build (--dry-run)") + + return + } + i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out))) + + outPath := path.Join(dest, i.Source.OutputPath) + i.Logger.Debug( + "checking output path", + "outputPath", + outPath, + "dest", + dest, + "source", + i.Source.OutputPath, + ) + var after string + after, err = os.Readlink(dest) + if err = file.NeedNotExist(err); err != nil { + err = errors.WithMessagef( + err, + "failed to stat output file from nix-build, filename: %s", + outPath, + ) + + return + } + i.Logger.Debug("stat after", "name", after) + + updated = before != after + + f = FetchedFiles{ + Options: path.Join(dest, i.Source.OutputPath, "options.json"), + Packages: path.Join(dest, i.Source.OutputPath, "packages.json"), + } + + return +}
A internal/fetcher/download.go
@@ -0,0 +1,75 @@+package fetcher + +import ( + "context" + "log/slog" + "net/url" + "path" + "searchix/internal/config" + "searchix/internal/file" + + "github.com/pkg/errors" +) + +type DownloadFetcher struct { + DataPath string + Source *config.Source + SourceFile string + Logger *slog.Logger +} + +var files = map[string]string{ + "revision": "revision", + "options": "options.json", +} + +func (i *DownloadFetcher) FetchIfNeeded( + parent context.Context, +) (f FetchedFiles, updated bool, err error) { + ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) + defer cancel() + + root := i.DataPath + + err = file.Mkdirp(root) + if err != nil { + err = errors.WithMessagef(err, "error creating directory for data: %s", root) + + return + } + + var fetchURL string + for _, filename := range files { + fetchURL, err = url.JoinPath(i.Source.URL, filename) + if err != nil { + err = errors.WithMessagef( + err, + "could not build URL with elements %s and %s", + i.Source.URL, + filename, + ) + + return + } + + outPath := path.Join(root, filename) + + i.Logger.Debug("preparing to fetch URL", "url", fetchURL, "path", outPath) + + updated, err = fetchFileIfNeeded(ctx, outPath, fetchURL) + if err != nil { + return + } + // don't bother to issue requests for the later files + if !updated { + return + } + } + + f = FetchedFiles{ + Revision: path.Join(root, "revision"), + Options: path.Join(root, "options.json"), + } + + return +}
A internal/fetcher/main.go
@@ -0,0 +1,74 @@+package fetcher + +import ( + "context" + "log/slog" + "searchix/internal/config" + + "github.com/pkg/errors" +) + +type FetchedFiles struct { + Revision string + Options string + Packages string +} + +type Fetcher interface { + FetchIfNeeded(context.Context) (FetchedFiles, bool, error) +} + +func NewNixpkgsChannelFetcher( + source *config.Source, + dataPath string, + logger *slog.Logger, +) *NixpkgsChannelFetcher { + return &NixpkgsChannelFetcher{ + DataPath: dataPath, + Source: source, + Logger: logger, + } +} + +func NewChannelFetcher( + source *config.Source, + dataPath string, + logger *slog.Logger, +) *ChannelFetcher { + return &ChannelFetcher{ + DataPath: dataPath, + Source: source, + Logger: logger, + } +} + +func NewDownloadFetcher( + source *config.Source, + dataPath string, + logger *slog.Logger, +) *DownloadFetcher { + return &DownloadFetcher{ + DataPath: dataPath, + Source: source, + Logger: logger, + } +} + +func New( + source *config.Source, + fetcherDataPath string, + logger *slog.Logger, +) (fetcher Fetcher, err error) { + switch source.Fetcher { + case config.ChannelNixpkgs: + fetcher = NewNixpkgsChannelFetcher(source, fetcherDataPath, logger) + case config.Channel: + fetcher = NewChannelFetcher(source, fetcherDataPath, logger) + case config.Download: + fetcher = NewDownloadFetcher(source, fetcherDataPath, logger) + default: + err = errors.Errorf("unsupported fetcher type %s", source.Fetcher.String()) + } + + return +}
A internal/fetcher/nixpkgs-channel.go
@@ -0,0 +1,74 @@+package fetcher + +import ( + "context" + "log/slog" + "net/url" + "path" + "searchix/internal/config" + "searchix/internal/file" + + "github.com/pkg/errors" +) + +type NixpkgsChannelFetcher struct { + DataPath string + Source *config.Source + Logger *slog.Logger +} + +func makeChannelURL(channel string, subPath string) (string, error) { + url, err := url.JoinPath("https://channels.nixos.org/", channel, subPath) + + return url, errors.WithMessagef(err, "error creating URL") +} + +var filesToFetch = map[string]string{ + "revision": "git-revision", + "options": "options.json.br", + "packages": "packages.json.br", +} + +func (i *NixpkgsChannelFetcher) FetchIfNeeded( + parent context.Context, +) (f FetchedFiles, updated bool, err error) { + ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) + defer cancel() + + root := i.DataPath + + err = file.Mkdirp(root) + if err != nil { + err = errors.WithMessagef(err, "error creating directory for data: %s", root) + + return + } + + var fetchURL string + for _, filename := range filesToFetch { + fetchURL, err = makeChannelURL(i.Source.Channel, filename) + if err != nil { + return + } + + outPath := path.Join(root, filename) + + i.Logger.Debug("attempting to fetch file", "url", fetchURL, "outPath", outPath) + updated, err = fetchFileIfNeeded(ctx, outPath, fetchURL) + if err != nil { + return + } + // don't bother to issue requests for the later files + if !updated { + return + } + } + + f = FetchedFiles{ + Revision: path.Join(root, "git-revision"), + Options: path.Join(root, "options.json.br"), + Packages: path.Join(root, "packages.json.br"), + } + + return +}
D internal/importer/channel.go
@@ -1,97 +0,0 @@-package importer - -import ( - "context" - "fmt" - "log/slog" - "os" - "os/exec" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - "strconv" - "strings" - - "github.com/pkg/errors" -) - -type ChannelImporter struct { - DataPath string - Source *config.Source - SourceFile string - Logger *slog.Logger -} - -func (i *ChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - dest := i.DataPath - - before, err := os.Readlink(dest) - if file.NeedNotExist(err) != nil { - return false, errors.WithMessagef(err, "could not call readlink on file %s", dest) - } - i.Logger.Debug("stat before", "name", before) - - args := []string{ - "--no-build-output", - "--timeout", - strconv.Itoa(int(i.Source.FetchTimeout.Seconds() - 1)), - fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath), - "--attr", - i.Source.Attribute, - "--out-link", - dest, - } - - if i.Source.URL != "" { - args = append(args, "-I", fmt.Sprintf("%s=%s", i.Source.Channel, i.Source.URL)) - } - - i.Logger.Debug("nix-build command", "args", args) - cmd := exec.CommandContext(ctx, "nix-build", args...) - out, err := cmd.Output() - if err != nil { - return false, errors.WithMessage(err, "failed to run nix-build (--dry-run)") - } - i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out))) - - outPath := path.Join(dest, i.Source.OutputPath) - i.Logger.Debug( - "checking output path", - "outputPath", - outPath, - "dest", - dest, - "source", - i.Source.OutputPath, - ) - after, err := os.Readlink(dest) - if err := file.NeedNotExist(err); err != nil { - return false, errors.WithMessagef( - err, - "failed to stat output file from nix-build, filename: %s", - outPath, - ) - } - i.Logger.Debug("stat after", "name", after) - - return before != after, nil -} - -func (i *ChannelImporter) Import(parent context.Context, indexer *index.WriteIndex) (bool, error) { - if i.Source.OutputPath == "" { - return false, errors.New("no output path specified") - } - - filename := path.Join(i.DataPath, i.SourceFile, i.Source.OutputPath) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -}
D internal/importer/download-options.go
@@ -1,87 +0,0 @@-package importer - -import ( - "bytes" - "context" - "log/slog" - "net/url" - "os" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - - "github.com/pkg/errors" -) - -type DownloadOptionsImporter struct { - DataPath string - Source *config.Source - SourceFile string - Logger *slog.Logger -} - -var optionsFiles = map[string]string{ - "revision": "revision", - "options": "options.json", -} - -func (i *DownloadOptionsImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - root := i.DataPath - - err := file.Mkdirp(root) - if err != nil { - return false, errors.WithMessagef(err, "error creating directory for data: %s", root) - } - - var updated bool - for _, filename := range optionsFiles { - url, err := url.JoinPath(i.Source.URL, filename) - if err != nil { - return false, errors.WithMessagef( - err, - "could not build URL with elements %s and %s", - i.Source.URL, - filename, - ) - } - - path := path.Join(root, filename) - - i.Logger.Debug("preparing to fetch URL", "url", url, "path", path) - - updated, err = fetchFileIfNeeded(ctx, path, url) - if err != nil { - return false, err - } - // don't bother to issue requests for the later files - if !updated { - return false, err - } - } - - return updated, nil -} - -func (i *DownloadOptionsImporter) Import( - parent context.Context, - indexer *index.WriteIndex, -) (bool, error) { - filename := path.Join(i.DataPath, optionsFiles["options"]) - revFilename := path.Join(i.DataPath, optionsFiles["revision"]) - bits, err := os.ReadFile(revFilename) - if err != nil { - return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename) - } - i.Source.Repo.Revision = string(bytes.TrimSpace(bits)) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -}
M internal/importer/http.go → internal/fetcher/http.go
@@ -1,4 +1,4 @@-package importer +package fetcher import ( "context"
M internal/importer/importer.go → internal/importer/importer.go
@@ -3,79 +3,32 @@ import ( "context" "log/slog" - "searchix/internal/config" "searchix/internal/index" + "searchix/internal/nix" "sync" ) type Importer interface { - FetchIfNeeded(context.Context) (bool, error) Import(context.Context, *index.WriteIndex) (bool, error) } -func NewNixpkgsChannelImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *NixpkgsChannelImporter { - return &NixpkgsChannelImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } +type Processor interface { + Process(context.Context) (<-chan nix.Importable, <-chan error) } -func NewChannelImporter( - source *config.Source, - dataPath string, +func process( + ctx context.Context, + indexer *index.WriteIndex, + processor Processor, logger *slog.Logger, -) *ChannelImporter { - return &ChannelImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -func NewDownloadOptionsImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *DownloadOptionsImporter { - return &DownloadOptionsImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -type importConfig struct { - Filename string - Source *config.Source - Logger *slog.Logger -} - -func processOptions( - parent context.Context, - indexer *index.WriteIndex, - conf *importConfig, -) (bool, error) { - ctx, cancel := context.WithTimeout(parent, conf.Source.ImportTimeout) - defer cancel() - - conf.Logger.Debug("creating option processor", "filename", conf.Filename) - processor, err := NewOptionProcessor(conf.Filename, conf.Source) - if err != nil { - return true, err - } - +) bool { wg := sync.WaitGroup{} wg.Add(1) - options, pErrs := processor.Process(ctx) + objects, pErrs := processor.Process(ctx) wg.Add(1) - iErrs := indexer.Import(ctx, options) + iErrs := indexer.Import(ctx, objects) var hadErrors bool go func() { @@ -85,29 +38,29 @@ case err, running := <-iErrs: if !running { wg.Done() iErrs = nil - conf.Logger.Info("ingest completed") + logger.Debug("ingest completed") continue } hadErrors = true - conf.Logger.Warn("error ingesting option", "error", err) + logger.Warn("error ingesting object", "error", err) case err, running := <-pErrs: if !running { wg.Done() pErrs = nil - conf.Logger.Debug("processing completed") + logger.Debug("processing completed") continue } hadErrors = true - conf.Logger.Warn("error processing option", "error", err) + logger.Warn("error processing object", "error", err) } } }() - conf.Logger.Debug("options processing", "state", "waiting") + logger.Debug("object processing", "state", "waiting") wg.Wait() - conf.Logger.Debug("options processing", "state", "complete") + logger.Debug("object processing", "state", "complete") - return hadErrors, nil + return hadErrors }
M internal/importer/ingest.go → internal/importer/options.go
@@ -91,9 +91,8 @@ return &i, nil } -func (i *OptionIngester) Process( - ctx context.Context, -) (<-chan nix.Importable, <-chan error) { +func (i *OptionIngester) Process(parent context.Context) (<-chan nix.Importable, <-chan error) { + ctx, cancel := context.WithTimeout(parent, i.source.ImportTimeout) results := make(chan nix.Importable) errs := make(chan error) @@ -101,6 +100,7 @@ go func() { defer i.infile.Close() defer close(results) defer close(errs) + defer cancel() slog.Debug("starting decoder stream") outer:
M internal/importer/main.go → internal/importer/main.go
@@ -2,14 +2,15 @@ package importer import ( "context" - "errors" - "log" "log/slog" "os/exec" "path" "searchix/internal/config" + "searchix/internal/fetcher" "searchix/internal/index" "strings" + + "github.com/pkg/errors" ) func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { @@ -22,27 +23,20 @@ ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration) defer cancel() - var imp Importer for name, source := range cfg.Importer.Sources { - logger := slog.With("name", name, "importer", source.Type.String()) - logger.Debug("starting importer") + logger := slog.With("name", name, "fetcher", source.Fetcher.String()) + logger.Debug("starting fetcher") - importerDataPath := path.Join(cfg.DataPath, "sources", source.Channel) + fetcherDataPath := path.Join(cfg.DataPath, "sources", source.Key) - switch source.Type { - case config.ChannelNixpkgs: - imp = NewNixpkgsChannelImporter(source, importerDataPath, logger) - case config.Channel: - imp = NewChannelImporter(source, importerDataPath, logger) - case config.DownloadOptions: - imp = NewDownloadOptionsImporter(source, importerDataPath, logger) - default: - log.Printf("unsupported importer type %s", source.Type.String()) + fetcher, err := fetcher.New(source, fetcherDataPath, logger) + if err != nil { + logger.Warn("error creating fetcher", "error", err) continue } - updated, err := imp.FetchIfNeeded(ctx) + files, updated, err := fetcher.FetchIfNeeded(ctx) if err != nil { var exerr *exec.ExitError @@ -60,16 +54,38 @@ } logger.Info("importer fetch succeeded", "updated", updated) if updated || replace { - hadWarnings, err := imp.Import(ctx, indexer) - + err = setRepoRevision(files.Revision, source) if err != nil { - msg := err.Error() - for _, line := range strings.Split(strings.TrimSpace(msg), "\n") { - logger.Error("importer init failed", "error", line) - } + logger.Warn("could not set source repo revision", "error", err) + } - continue + var processor Processor + switch source.Importer { + case config.Options: + logger.Debug( + "creating processor", + "filename", + files.Options, + "revision", + source.Repo.Revision, + ) + processor, err = NewOptionProcessor(files.Options, source) + case config.Packages: + logger.Debug( + "creating processor", + "filename", + files.Packages, + "revision", + source.Repo.Revision, + ) + processor, err = NewPackageProcessor(files.Packages, source) + } + if err != nil { + logger.Warn("failed to create processor", "type", source.Importer, "error", err) } + + hadWarnings := process(ctx, indexer, processor, logger) + if hadWarnings { logger.Warn("importer succeeded, but with warnings/errors") } else {
D internal/importer/nixpkgs-channel.go
@@ -1,84 +0,0 @@-package importer - -import ( - "bytes" - "context" - "log/slog" - "net/url" - "os" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - - "github.com/pkg/errors" -) - -type NixpkgsChannelImporter struct { - DataPath string - Source *config.Source - Logger *slog.Logger -} - -func makeChannelURL(channel string, subPath string) (string, error) { - url, err := url.JoinPath("https://channels.nixos.org/", channel, subPath) - - return url, errors.WithMessagef(err, "error creating URL") -} - -var filesToFetch = map[string]string{ - "revision": "git-revision", - "options": "options.json.br", -} - -func (i *NixpkgsChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - root := i.DataPath - - err := file.Mkdirp(root) - if err != nil { - return false, errors.WithMessagef(err, "error creating directory for data: %s", root) - } - - for _, filename := range filesToFetch { - url, err := makeChannelURL(i.Source.Channel, filename) - if err != nil { - return false, err - } - - path := path.Join(root, filename) - - updated, err := fetchFileIfNeeded(ctx, path, url) - if err != nil { - return false, err - } - // don't bother to issue requests for the later files - if !updated { - return false, err - } - } - - return true, nil -} - -func (i *NixpkgsChannelImporter) Import( - parent context.Context, - indexer *index.WriteIndex, -) (bool, error) { - filename := path.Join(i.DataPath, filesToFetch["options"]) - revFilename := path.Join(i.DataPath, filesToFetch["revision"]) - bits, err := os.ReadFile(revFilename) - if err != nil { - return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename) - } - i.Source.Repo.Revision = string(bytes.TrimSpace(bits)) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -}
M internal/importer/package.go → internal/importer/package.go
@@ -109,9 +109,8 @@ return l } -func (i *PackageIngester) Process( - ctx context.Context, -) (<-chan nix.Importable, <-chan error) { +func (i *PackageIngester) Process(parent context.Context) (<-chan nix.Importable, <-chan error) { + ctx, cancel := context.WithTimeout(parent, i.source.ImportTimeout) results := make(chan nix.Importable) errs := make(chan error) @@ -119,6 +118,7 @@ go func() { defer i.infile.Close() defer close(results) defer close(errs) + defer cancel() userRepo := i.source.Repo.Owner + "/" + i.source.Repo.Repo slog.Debug("starting decoder stream")
M internal/importer/utils.go → internal/importer/utils.go
@@ -1,11 +1,15 @@ package importer import ( + "bytes" "fmt" "net/url" + "os" + "searchix/internal/config" "searchix/internal/nix" "github.com/bcicen/jstream" + "github.com/pkg/errors" ) func ValueTypeToString(valueType jstream.ValueType) string { @@ -58,3 +62,21 @@ Name: fmt.Sprintf("<%s/%s>", channel, subPath), URL: makeGitHubFileURL(channelRepoMap[channel], ref, subPath, ""), }, nil } + +func setRepoRevision(filename string, source *config.Source) error { + if filename != "" { + bits, err := os.ReadFile(filename) + if err != nil { + return errors.WithMessagef( + err, + "unable to read revision file at %s", + filename, + ) + } + + source.Repo.Revision = string(bytes.TrimSpace(bits)) + + } + + return nil +}