From a5e758d41c151c17ed03b39454470ba8dd0c3b99 Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Thu, 16 May 2024 23:41:57 +0200 Subject: refactor: separate fetch and import logic --- internal/importer/channel.go | 97 ------------------ internal/importer/download-options.go | 87 ---------------- internal/importer/http.go | 70 ------------- internal/importer/importer.go | 81 +++------------ internal/importer/ingest.go | 188 ---------------------------------- internal/importer/main.go | 62 ++++++----- internal/importer/nixpkgs-channel.go | 84 --------------- internal/importer/options.go | 188 ++++++++++++++++++++++++++++++++++ internal/importer/package.go | 6 +- internal/importer/utils.go | 22 ++++ 10 files changed, 269 insertions(+), 616 deletions(-) delete mode 100644 internal/importer/channel.go delete mode 100644 internal/importer/download-options.go delete mode 100644 internal/importer/http.go delete mode 100644 internal/importer/ingest.go delete mode 100644 internal/importer/nixpkgs-channel.go create mode 100644 internal/importer/options.go (limited to 'internal/importer') diff --git a/internal/importer/channel.go b/internal/importer/channel.go deleted file mode 100644 index 1bce1b0..0000000 --- a/internal/importer/channel.go +++ /dev/null @@ -1,97 +0,0 @@ -package importer - -import ( - "context" - "fmt" - "log/slog" - "os" - "os/exec" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - "strconv" - "strings" - - "github.com/pkg/errors" -) - -type ChannelImporter struct { - DataPath string - Source *config.Source - SourceFile string - Logger *slog.Logger -} - -func (i *ChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - dest := i.DataPath - - before, err := os.Readlink(dest) - if file.NeedNotExist(err) != nil { - return false, errors.WithMessagef(err, "could not call readlink on file %s", dest) - } - i.Logger.Debug("stat before", "name", before) - - args := []string{ - "--no-build-output", - "--timeout", - strconv.Itoa(int(i.Source.FetchTimeout.Seconds() - 1)), - fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath), - "--attr", - i.Source.Attribute, - "--out-link", - dest, - } - - if i.Source.URL != "" { - args = append(args, "-I", fmt.Sprintf("%s=%s", i.Source.Channel, i.Source.URL)) - } - - i.Logger.Debug("nix-build command", "args", args) - cmd := exec.CommandContext(ctx, "nix-build", args...) - out, err := cmd.Output() - if err != nil { - return false, errors.WithMessage(err, "failed to run nix-build (--dry-run)") - } - i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out))) - - outPath := path.Join(dest, i.Source.OutputPath) - i.Logger.Debug( - "checking output path", - "outputPath", - outPath, - "dest", - dest, - "source", - i.Source.OutputPath, - ) - after, err := os.Readlink(dest) - if err := file.NeedNotExist(err); err != nil { - return false, errors.WithMessagef( - err, - "failed to stat output file from nix-build, filename: %s", - outPath, - ) - } - i.Logger.Debug("stat after", "name", after) - - return before != after, nil -} - -func (i *ChannelImporter) Import(parent context.Context, indexer *index.WriteIndex) (bool, error) { - if i.Source.OutputPath == "" { - return false, errors.New("no output path specified") - } - - filename := path.Join(i.DataPath, i.SourceFile, i.Source.OutputPath) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -} diff --git a/internal/importer/download-options.go b/internal/importer/download-options.go deleted file mode 100644 index 6727138..0000000 --- a/internal/importer/download-options.go +++ /dev/null @@ -1,87 +0,0 @@ -package importer - -import ( - "bytes" - "context" - "log/slog" - "net/url" - "os" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - - "github.com/pkg/errors" -) - -type DownloadOptionsImporter struct { - DataPath string - Source *config.Source - SourceFile string - Logger *slog.Logger -} - -var optionsFiles = map[string]string{ - "revision": "revision", - "options": "options.json", -} - -func (i *DownloadOptionsImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - root := i.DataPath - - err := file.Mkdirp(root) - if err != nil { - return false, errors.WithMessagef(err, "error creating directory for data: %s", root) - } - - var updated bool - for _, filename := range optionsFiles { - url, err := url.JoinPath(i.Source.URL, filename) - if err != nil { - return false, errors.WithMessagef( - err, - "could not build URL with elements %s and %s", - i.Source.URL, - filename, - ) - } - - path := path.Join(root, filename) - - i.Logger.Debug("preparing to fetch URL", "url", url, "path", path) - - updated, err = fetchFileIfNeeded(ctx, path, url) - if err != nil { - return false, err - } - // don't bother to issue requests for the later files - if !updated { - return false, err - } - } - - return updated, nil -} - -func (i *DownloadOptionsImporter) Import( - parent context.Context, - indexer *index.WriteIndex, -) (bool, error) { - filename := path.Join(i.DataPath, optionsFiles["options"]) - revFilename := path.Join(i.DataPath, optionsFiles["revision"]) - bits, err := os.ReadFile(revFilename) - if err != nil { - return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename) - } - i.Source.Repo.Revision = string(bytes.TrimSpace(bits)) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -} diff --git a/internal/importer/http.go b/internal/importer/http.go deleted file mode 100644 index b496177..0000000 --- a/internal/importer/http.go +++ /dev/null @@ -1,70 +0,0 @@ -package importer - -import ( - "context" - "fmt" - "log/slog" - "net/http" - "os" - "searchix/internal/config" - "searchix/internal/file" - "strings" - "time" - - "github.com/pkg/errors" -) - -func fetchFileIfNeeded(ctx context.Context, path string, url string) (needed bool, err error) { - stat, err := file.StatIfExists(path) - if err != nil { - return false, errors.WithMessagef(err, "could not stat file %s", path) - } - - var mtime string - if stat != nil { - mtime = strings.Replace(stat.ModTime().UTC().Format(time.RFC1123), "UTC", "GMT", 1) - } - - req, err := http.NewRequestWithContext(ctx, "GET", url, http.NoBody) - if err != nil { - return false, errors.WithMessagef(err, "could not create HTTP request for %s", url) - } - - req.Header.Set("User-Agent", fmt.Sprintf("Searchix %s", config.ShortSHA)) - - if mtime != "" { - req.Header.Set("If-Modified-Since", mtime) - } - res, err := http.DefaultClient.Do(req) - if err != nil { - return false, errors.WithMessagef(err, "could not make HTTP request to %s", url) - } - defer res.Body.Close() - - switch res.StatusCode { - case http.StatusNotModified: - needed = false - case http.StatusOK: - newMtime, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) - if err != nil { - slog.Warn( - "could not parse Last-Modified header from response", - "value", - res.Header.Get("Last-Modified"), - ) - } - err = file.WriteToFile(path, res.Body) - if err != nil { - return false, errors.WithMessagef(err, "could not write response body to file %s", path) - } - err = os.Chtimes(path, time.Time{}, newMtime) - if err != nil { - slog.Warn("could not update mtime on file", "file", path) - } - needed = true - default: - return false, fmt.Errorf("got response code %d, don't know what to do", res.StatusCode) - } - - return needed, nil -} diff --git a/internal/importer/importer.go b/internal/importer/importer.go index 255f70e..57118c8 100644 --- a/internal/importer/importer.go +++ b/internal/importer/importer.go @@ -3,79 +3,32 @@ package importer import ( "context" "log/slog" - "searchix/internal/config" "searchix/internal/index" + "searchix/internal/nix" "sync" ) type Importer interface { - FetchIfNeeded(context.Context) (bool, error) Import(context.Context, *index.WriteIndex) (bool, error) } -func NewNixpkgsChannelImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *NixpkgsChannelImporter { - return &NixpkgsChannelImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -func NewChannelImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *ChannelImporter { - return &ChannelImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -func NewDownloadOptionsImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *DownloadOptionsImporter { - return &DownloadOptionsImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -type importConfig struct { - Filename string - Source *config.Source - Logger *slog.Logger +type Processor interface { + Process(context.Context) (<-chan nix.Importable, <-chan error) } -func processOptions( - parent context.Context, +func process( + ctx context.Context, indexer *index.WriteIndex, - conf *importConfig, -) (bool, error) { - ctx, cancel := context.WithTimeout(parent, conf.Source.ImportTimeout) - defer cancel() - - conf.Logger.Debug("creating option processor", "filename", conf.Filename) - processor, err := NewOptionProcessor(conf.Filename, conf.Source) - if err != nil { - return true, err - } - + processor Processor, + logger *slog.Logger, +) bool { wg := sync.WaitGroup{} wg.Add(1) - options, pErrs := processor.Process(ctx) + objects, pErrs := processor.Process(ctx) wg.Add(1) - iErrs := indexer.Import(ctx, options) + iErrs := indexer.Import(ctx, objects) var hadErrors bool go func() { @@ -85,29 +38,29 @@ func processOptions( if !running { wg.Done() iErrs = nil - conf.Logger.Info("ingest completed") + logger.Debug("ingest completed") continue } hadErrors = true - conf.Logger.Warn("error ingesting option", "error", err) + logger.Warn("error ingesting object", "error", err) case err, running := <-pErrs: if !running { wg.Done() pErrs = nil - conf.Logger.Debug("processing completed") + logger.Debug("processing completed") continue } hadErrors = true - conf.Logger.Warn("error processing option", "error", err) + logger.Warn("error processing object", "error", err) } } }() - conf.Logger.Debug("options processing", "state", "waiting") + logger.Debug("object processing", "state", "waiting") wg.Wait() - conf.Logger.Debug("options processing", "state", "complete") + logger.Debug("object processing", "state", "complete") - return hadErrors, nil + return hadErrors } diff --git a/internal/importer/ingest.go b/internal/importer/ingest.go deleted file mode 100644 index 9b92ae8..0000000 --- a/internal/importer/ingest.go +++ /dev/null @@ -1,188 +0,0 @@ -package importer - -import ( - "context" - "log/slog" - "os" - "reflect" - "searchix/internal/config" - "searchix/internal/nix" - - "github.com/bcicen/jstream" - "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" -) - -type nixValueJSON struct { - Type string `mapstructure:"_type"` - Text string -} - -type linkJSON struct { - Name string - URL string `json:"url"` -} - -type nixOptionJSON struct { - Declarations []linkJSON - Default *nixValueJSON - Description string - Example *nixValueJSON - Loc []string - ReadOnly bool - RelatedPackages string - Type string -} - -func convertValue(nj *nixValueJSON) *nix.Value { - if nj == nil { - return nil - } - switch nj.Type { - case "", "literalExpression": - return &nix.Value{ - Text: nj.Text, - } - case "literalMD": - return &nix.Value{ - Markdown: nix.Markdown(nj.Text), - } - default: - slog.Warn("got unexpected Value type", "type", nj.Type, "text", nj.Text) - - return nil - } -} - -type OptionIngester struct { - dec *jstream.Decoder - ms *mapstructure.Decoder - optJSON nixOptionJSON - infile *os.File - source *config.Source -} - -func NewOptionProcessor(inpath string, source *config.Source) (*OptionIngester, error) { - infile, err := os.Open(inpath) - if err != nil { - return nil, errors.WithMessagef(err, "failed to open input file %s", inpath) - } - i := OptionIngester{ - dec: jstream.NewDecoder(infile, 1).EmitKV(), - optJSON: nixOptionJSON{}, - infile: infile, - source: source, - } - - ms, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ - ErrorUnused: true, - ZeroFields: true, - Result: &i.optJSON, - Squash: true, - DecodeHook: mapstructure.TextUnmarshallerHookFunc(), - }) - if err != nil { - defer infile.Close() - - return nil, errors.WithMessage(err, "could not create mapstructure decoder") - } - i.ms = ms - - return &i, nil -} - -func (i *OptionIngester) Process( - ctx context.Context, -) (<-chan nix.Importable, <-chan error) { - results := make(chan nix.Importable) - errs := make(chan error) - - go func() { - defer i.infile.Close() - defer close(results) - defer close(errs) - - slog.Debug("starting decoder stream") - outer: - for mv := range i.dec.Stream() { - select { - case <-ctx.Done(): - break outer - default: - } - if err := i.dec.Err(); err != nil { - errs <- errors.WithMessage(err, "could not decode JSON") - - continue - } - if mv.ValueType != jstream.Object { - errs <- errors.Errorf("unexpected object type %s", ValueTypeToString(mv.ValueType)) - - continue - } - kv := mv.Value.(jstream.KV) - x := kv.Value.(map[string]interface{}) - - var decls []*nix.Link - for _, decl := range x["declarations"].([]interface{}) { - i.optJSON = nixOptionJSON{} - - switch decl := reflect.ValueOf(decl); decl.Kind() { - case reflect.String: - s := decl.String() - link, err := MakeChannelLink(i.source.Channel, i.source.Repo.Revision, s) - if err != nil { - errs <- errors.WithMessagef(err, - "could not make a channel link for channel %s, revision %s and subpath %s", - i.source.Channel, i.source.Repo.Revision, s, - ) - - continue - } - decls = append(decls, link) - case reflect.Map: - v := decl.Interface().(map[string]interface{}) - link := nix.Link{ - Name: v["name"].(string), - URL: v["url"].(string), - } - decls = append(decls, &link) - default: - errs <- errors.Errorf("unexpected declaration type %s", decl.Kind().String()) - - continue - } - } - if len(decls) > 0 { - x["declarations"] = decls - } - - err := i.ms.Decode(x) // stores in optJSON - if err != nil { - errs <- errors.WithMessagef(err, "failed to decode option %#v", x) - - continue - } - - var decs = make([]nix.Link, len(i.optJSON.Declarations)) - for i, d := range i.optJSON.Declarations { - decs[i] = nix.Link(d) - } - - // slog.Debug("sending option", "name", kv.Key) - results <- nix.Option{ - Name: kv.Key, - Source: i.source.Key, - Declarations: decs, - Default: convertValue(i.optJSON.Default), - Description: nix.Markdown(i.optJSON.Description), - Example: convertValue(i.optJSON.Example), - RelatedPackages: nix.Markdown(i.optJSON.RelatedPackages), - Loc: i.optJSON.Loc, - Type: i.optJSON.Type, - } - } - }() - - return results, errs -} diff --git a/internal/importer/main.go b/internal/importer/main.go index 0b7a99d..d2b66e1 100644 --- a/internal/importer/main.go +++ b/internal/importer/main.go @@ -2,14 +2,15 @@ package importer import ( "context" - "errors" - "log" "log/slog" "os/exec" "path" "searchix/internal/config" + "searchix/internal/fetcher" "searchix/internal/index" "strings" + + "github.com/pkg/errors" ) func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { @@ -22,27 +23,20 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration) defer cancel() - var imp Importer for name, source := range cfg.Importer.Sources { - logger := slog.With("name", name, "importer", source.Type.String()) - logger.Debug("starting importer") + logger := slog.With("name", name, "fetcher", source.Fetcher.String()) + logger.Debug("starting fetcher") - importerDataPath := path.Join(cfg.DataPath, "sources", source.Channel) + fetcherDataPath := path.Join(cfg.DataPath, "sources", source.Key) - switch source.Type { - case config.ChannelNixpkgs: - imp = NewNixpkgsChannelImporter(source, importerDataPath, logger) - case config.Channel: - imp = NewChannelImporter(source, importerDataPath, logger) - case config.DownloadOptions: - imp = NewDownloadOptionsImporter(source, importerDataPath, logger) - default: - log.Printf("unsupported importer type %s", source.Type.String()) + fetcher, err := fetcher.New(source, fetcherDataPath, logger) + if err != nil { + logger.Warn("error creating fetcher", "error", err) continue } - updated, err := imp.FetchIfNeeded(ctx) + files, updated, err := fetcher.FetchIfNeeded(ctx) if err != nil { var exerr *exec.ExitError @@ -60,16 +54,38 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { logger.Info("importer fetch succeeded", "updated", updated) if updated || replace { - hadWarnings, err := imp.Import(ctx, indexer) - + err = setRepoRevision(files.Revision, source) if err != nil { - msg := err.Error() - for _, line := range strings.Split(strings.TrimSpace(msg), "\n") { - logger.Error("importer init failed", "error", line) - } + logger.Warn("could not set source repo revision", "error", err) + } - continue + var processor Processor + switch source.Importer { + case config.Options: + logger.Debug( + "creating processor", + "filename", + files.Options, + "revision", + source.Repo.Revision, + ) + processor, err = NewOptionProcessor(files.Options, source) + case config.Packages: + logger.Debug( + "creating processor", + "filename", + files.Packages, + "revision", + source.Repo.Revision, + ) + processor, err = NewPackageProcessor(files.Packages, source) } + if err != nil { + logger.Warn("failed to create processor", "type", source.Importer, "error", err) + } + + hadWarnings := process(ctx, indexer, processor, logger) + if hadWarnings { logger.Warn("importer succeeded, but with warnings/errors") } else { diff --git a/internal/importer/nixpkgs-channel.go b/internal/importer/nixpkgs-channel.go deleted file mode 100644 index d302154..0000000 --- a/internal/importer/nixpkgs-channel.go +++ /dev/null @@ -1,84 +0,0 @@ -package importer - -import ( - "bytes" - "context" - "log/slog" - "net/url" - "os" - "path" - "searchix/internal/config" - "searchix/internal/file" - "searchix/internal/index" - - "github.com/pkg/errors" -) - -type NixpkgsChannelImporter struct { - DataPath string - Source *config.Source - Logger *slog.Logger -} - -func makeChannelURL(channel string, subPath string) (string, error) { - url, err := url.JoinPath("https://channels.nixos.org/", channel, subPath) - - return url, errors.WithMessagef(err, "error creating URL") -} - -var filesToFetch = map[string]string{ - "revision": "git-revision", - "options": "options.json.br", -} - -func (i *NixpkgsChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) - defer cancel() - - root := i.DataPath - - err := file.Mkdirp(root) - if err != nil { - return false, errors.WithMessagef(err, "error creating directory for data: %s", root) - } - - for _, filename := range filesToFetch { - url, err := makeChannelURL(i.Source.Channel, filename) - if err != nil { - return false, err - } - - path := path.Join(root, filename) - - updated, err := fetchFileIfNeeded(ctx, path, url) - if err != nil { - return false, err - } - // don't bother to issue requests for the later files - if !updated { - return false, err - } - } - - return true, nil -} - -func (i *NixpkgsChannelImporter) Import( - parent context.Context, - indexer *index.WriteIndex, -) (bool, error) { - filename := path.Join(i.DataPath, filesToFetch["options"]) - revFilename := path.Join(i.DataPath, filesToFetch["revision"]) - bits, err := os.ReadFile(revFilename) - if err != nil { - return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename) - } - i.Source.Repo.Revision = string(bytes.TrimSpace(bits)) - i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) - - return processOptions(parent, indexer, &importConfig{ - Source: i.Source, - Filename: filename, - Logger: i.Logger, - }) -} diff --git a/internal/importer/options.go b/internal/importer/options.go new file mode 100644 index 0000000..ec2c20f --- /dev/null +++ b/internal/importer/options.go @@ -0,0 +1,188 @@ +package importer + +import ( + "context" + "log/slog" + "os" + "reflect" + "searchix/internal/config" + "searchix/internal/nix" + + "github.com/bcicen/jstream" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +type nixValueJSON struct { + Type string `mapstructure:"_type"` + Text string +} + +type linkJSON struct { + Name string + URL string `json:"url"` +} + +type nixOptionJSON struct { + Declarations []linkJSON + Default *nixValueJSON + Description string + Example *nixValueJSON + Loc []string + ReadOnly bool + RelatedPackages string + Type string +} + +func convertValue(nj *nixValueJSON) *nix.Value { + if nj == nil { + return nil + } + switch nj.Type { + case "", "literalExpression": + return &nix.Value{ + Text: nj.Text, + } + case "literalMD": + return &nix.Value{ + Markdown: nix.Markdown(nj.Text), + } + default: + slog.Warn("got unexpected Value type", "type", nj.Type, "text", nj.Text) + + return nil + } +} + +type OptionIngester struct { + dec *jstream.Decoder + ms *mapstructure.Decoder + optJSON nixOptionJSON + infile *os.File + source *config.Source +} + +func NewOptionProcessor(inpath string, source *config.Source) (*OptionIngester, error) { + infile, err := os.Open(inpath) + if err != nil { + return nil, errors.WithMessagef(err, "failed to open input file %s", inpath) + } + i := OptionIngester{ + dec: jstream.NewDecoder(infile, 1).EmitKV(), + optJSON: nixOptionJSON{}, + infile: infile, + source: source, + } + + ms, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + ErrorUnused: true, + ZeroFields: true, + Result: &i.optJSON, + Squash: true, + DecodeHook: mapstructure.TextUnmarshallerHookFunc(), + }) + if err != nil { + defer infile.Close() + + return nil, errors.WithMessage(err, "could not create mapstructure decoder") + } + i.ms = ms + + return &i, nil +} + +func (i *OptionIngester) Process(parent context.Context) (<-chan nix.Importable, <-chan error) { + ctx, cancel := context.WithTimeout(parent, i.source.ImportTimeout) + results := make(chan nix.Importable) + errs := make(chan error) + + go func() { + defer i.infile.Close() + defer close(results) + defer close(errs) + defer cancel() + + slog.Debug("starting decoder stream") + outer: + for mv := range i.dec.Stream() { + select { + case <-ctx.Done(): + break outer + default: + } + if err := i.dec.Err(); err != nil { + errs <- errors.WithMessage(err, "could not decode JSON") + + continue + } + if mv.ValueType != jstream.Object { + errs <- errors.Errorf("unexpected object type %s", ValueTypeToString(mv.ValueType)) + + continue + } + kv := mv.Value.(jstream.KV) + x := kv.Value.(map[string]interface{}) + + var decls []*nix.Link + for _, decl := range x["declarations"].([]interface{}) { + i.optJSON = nixOptionJSON{} + + switch decl := reflect.ValueOf(decl); decl.Kind() { + case reflect.String: + s := decl.String() + link, err := MakeChannelLink(i.source.Channel, i.source.Repo.Revision, s) + if err != nil { + errs <- errors.WithMessagef(err, + "could not make a channel link for channel %s, revision %s and subpath %s", + i.source.Channel, i.source.Repo.Revision, s, + ) + + continue + } + decls = append(decls, link) + case reflect.Map: + v := decl.Interface().(map[string]interface{}) + link := nix.Link{ + Name: v["name"].(string), + URL: v["url"].(string), + } + decls = append(decls, &link) + default: + errs <- errors.Errorf("unexpected declaration type %s", decl.Kind().String()) + + continue + } + } + if len(decls) > 0 { + x["declarations"] = decls + } + + err := i.ms.Decode(x) // stores in optJSON + if err != nil { + errs <- errors.WithMessagef(err, "failed to decode option %#v", x) + + continue + } + + var decs = make([]nix.Link, len(i.optJSON.Declarations)) + for i, d := range i.optJSON.Declarations { + decs[i] = nix.Link(d) + } + + // slog.Debug("sending option", "name", kv.Key) + results <- nix.Option{ + Name: kv.Key, + Source: i.source.Key, + Declarations: decs, + Default: convertValue(i.optJSON.Default), + Description: nix.Markdown(i.optJSON.Description), + Example: convertValue(i.optJSON.Example), + RelatedPackages: nix.Markdown(i.optJSON.RelatedPackages), + Loc: i.optJSON.Loc, + Type: i.optJSON.Type, + } + } + }() + + return results, errs +} diff --git a/internal/importer/package.go b/internal/importer/package.go index 49e313d..3e0ec83 100644 --- a/internal/importer/package.go +++ b/internal/importer/package.go @@ -109,9 +109,8 @@ func convertToLicense(in map[string]any) *nix.License { return l } -func (i *PackageIngester) Process( - ctx context.Context, -) (<-chan nix.Importable, <-chan error) { +func (i *PackageIngester) Process(parent context.Context) (<-chan nix.Importable, <-chan error) { + ctx, cancel := context.WithTimeout(parent, i.source.ImportTimeout) results := make(chan nix.Importable) errs := make(chan error) @@ -119,6 +118,7 @@ func (i *PackageIngester) Process( defer i.infile.Close() defer close(results) defer close(errs) + defer cancel() userRepo := i.source.Repo.Owner + "/" + i.source.Repo.Repo slog.Debug("starting decoder stream") diff --git a/internal/importer/utils.go b/internal/importer/utils.go index 13d4702..3eb034f 100644 --- a/internal/importer/utils.go +++ b/internal/importer/utils.go @@ -1,11 +1,15 @@ package importer import ( + "bytes" "fmt" "net/url" + "os" + "searchix/internal/config" "searchix/internal/nix" "github.com/bcicen/jstream" + "github.com/pkg/errors" ) func ValueTypeToString(valueType jstream.ValueType) string { @@ -58,3 +62,21 @@ func MakeChannelLink(channel string, ref string, subPath string) (*nix.Link, err URL: makeGitHubFileURL(channelRepoMap[channel], ref, subPath, ""), }, nil } + +func setRepoRevision(filename string, source *config.Source) error { + if filename != "" { + bits, err := os.ReadFile(filename) + if err != nil { + return errors.WithMessagef( + err, + "unable to read revision file at %s", + filename, + ) + } + + source.Repo.Revision = string(bytes.TrimSpace(bits)) + + } + + return nil +} -- cgit 1.4.1