From e062ca72b222b890e345548bd8422d5df98e9fef Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Thu, 9 May 2024 16:47:41 +0200 Subject: feat: import sources from configuration in go code and index options --- internal/importer/channel.go | 82 ++++++++++++ internal/importer/http.go | 63 ++++++++++ internal/importer/importer.go | 112 +++++++++++++++++ internal/importer/ingest.go | 237 +++++++++++++++++++++++++++++++++++ internal/importer/nixpkgs-channel.go | 82 ++++++++++++ internal/importer/repository.go | 44 +++++++ internal/importer/source-type.go | 44 +++++++ 7 files changed, 664 insertions(+) create mode 100644 internal/importer/channel.go create mode 100644 internal/importer/http.go create mode 100644 internal/importer/importer.go create mode 100644 internal/importer/ingest.go create mode 100644 internal/importer/nixpkgs-channel.go create mode 100644 internal/importer/repository.go create mode 100644 internal/importer/source-type.go (limited to 'internal/importer') diff --git a/internal/importer/channel.go b/internal/importer/channel.go new file mode 100644 index 0000000..4d051cc --- /dev/null +++ b/internal/importer/channel.go @@ -0,0 +1,82 @@ +package importer + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path" + "searchix/internal/file" + "searchix/internal/search" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +type ChannelImporter struct { + DataPath string + Source Source + SourceFile string + Logger *slog.Logger + indexPath string +} + +func (i *ChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) { + ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) + defer cancel() + + dest := i.DataPath + + before, err := os.Readlink(dest) + if file.NeedNotExist(err) != nil { + return false, errors.WithMessagef(err, "could not call readlink on file %s", dest) + } + i.Logger.Debug("stat before", "name", before) + + args := []string{ + "--no-build-output", + "--timeout", + strconv.Itoa(int(i.Source.FetchTimeout.Seconds() - 1)), + fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath), + "--attr", + i.Source.Attribute, + "--out-link", + dest, + } + + i.Logger.Debug("nix-build command", "args", args) + cmd := exec.CommandContext(ctx, "nix-build", args...) + out, err := cmd.Output() + if err != nil { + return false, errors.WithMessage(err, "failed to run nix-build (--dry-run)") + } + i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out))) + + outPath := path.Join(dest, i.Source.OutputPath) + i.Logger.Debug("checking output path", "outputPath", outPath, "dest", dest, "source", i.Source.OutputPath) + after, err := os.Readlink(dest) + if err := file.NeedNotExist(err); err != nil { + return false, errors.WithMessagef(err, "failed to stat output file from nix-build, filename: %s", outPath) + } + i.Logger.Debug("stat after", "name", after) + + return before != after, nil +} + +func (i *ChannelImporter) Import(parent context.Context, indexer *search.WriteIndex) (bool, error) { + if i.Source.OutputPath == "" { + return false, errors.New("no output path specified") + } + + filename := path.Join(i.DataPath, i.SourceFile, i.Source.OutputPath) + i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) + + return processOptions(parent, indexer, &importConfig{ + IndexPath: i.indexPath, + Source: i.Source, + Filename: filename, + Logger: i.Logger, + }) +} diff --git a/internal/importer/http.go b/internal/importer/http.go new file mode 100644 index 0000000..1bf2428 --- /dev/null +++ b/internal/importer/http.go @@ -0,0 +1,63 @@ +package importer + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "os" + "searchix/internal/file" + "strings" + "time" + + "github.com/pkg/errors" +) + +func fetchFileIfNeeded(ctx context.Context, path string, url string) (needed bool, err error) { + stat, err := file.StatIfExists(path) + if err != nil { + return false, errors.WithMessagef(err, "could not stat file %s", path) + } + + var mtime string + if stat != nil { + mtime = strings.Replace(stat.ModTime().UTC().Format(time.RFC1123), "UTC", "GMT", 1) + } + + req, err := http.NewRequestWithContext(ctx, "GET", url, http.NoBody) + if err != nil { + return false, errors.WithMessagef(err, "could not create HTTP request for %s", url) + } + + if mtime != "" { + req.Header.Set("If-Modified-Since", mtime) + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return false, errors.WithMessagef(err, "could not make HTTP request to %s", url) + } + defer res.Body.Close() + + switch res.StatusCode { + case http.StatusNotModified: + needed = false + case http.StatusOK: + newMtime, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) + if err != nil { + slog.Warn("could not parse Last-Modified header from response", "value", res.Header.Get("Last-Modified")) + } + err = file.WriteToFile(path, res.Body) + if err != nil { + return false, errors.WithMessagef(err, "could not write response body to file %s", path) + } + err = os.Chtimes(path, time.Time{}, newMtime) + if err != nil { + slog.Warn("could not update mtime on file", "file", path) + } + needed = true + default: + return false, fmt.Errorf("got response code %d, don't know what to do", res.StatusCode) + } + + return needed, nil +} diff --git a/internal/importer/importer.go b/internal/importer/importer.go new file mode 100644 index 0000000..2318fe4 --- /dev/null +++ b/internal/importer/importer.go @@ -0,0 +1,112 @@ +package importer + +import ( + "context" + "log/slog" + "path" + "searchix/internal/search" + "sync" + "time" +) + +type Source struct { + Name string + Enable bool + Type Type + Channel string + Attribute string + ImportPath string `toml:"import-path"` + FetchTimeout time.Duration `toml:"fetch-timeout"` + ImportTimeout time.Duration `toml:"import-timeout"` + OutputPath string `toml:"output-path"` + Repo Repository +} + +type Importer interface { + FetchIfNeeded(context.Context) (bool, error) + Import(context.Context, *search.WriteIndex) (bool, error) +} + +func NewNixpkgsChannelImporter(source Source, dataPath string, logger *slog.Logger) *NixpkgsChannelImporter { + indexPath := dataPath + fullpath := path.Join(dataPath, source.Channel) + + return &NixpkgsChannelImporter{ + DataPath: fullpath, + Source: source, + Logger: logger, + indexPath: indexPath, + } +} + +func NewChannelImporter(source Source, dataPath string, logger *slog.Logger) *ChannelImporter { + indexPath := dataPath + fullpath := path.Join(dataPath, source.Channel) + + return &ChannelImporter{ + DataPath: fullpath, + Source: source, + Logger: logger, + indexPath: indexPath, + } +} + +type importConfig struct { + IndexPath string + Filename string + Source Source + Logger *slog.Logger +} + +func processOptions(parent context.Context, indexer *search.WriteIndex, conf *importConfig) (bool, error) { + ctx, cancel := context.WithTimeout(parent, conf.Source.ImportTimeout) + defer cancel() + + conf.Logger.Debug("creating option processor", "filename", conf.Filename) + processor, err := NewOptionProcessor(conf.Filename, conf.Source) + if err != nil { + return true, err + } + + wg := sync.WaitGroup{} + + wg.Add(1) + options, pErrs := processor.Process(ctx) + + wg.Add(1) + iErrs := indexer.ImportOptions(ctx, options) + + var hadErrors bool + go func() { + for { + select { + case err, running := <-iErrs: + if !running { + wg.Done() + iErrs = nil + slog.Info("ingest completed") + + continue + } + hadErrors = true + conf.Logger.Warn("error ingesting option", "error", err) + case err, running := <-pErrs: + if !running { + wg.Done() + pErrs = nil + slog.Info("processing completed") + + continue + } + hadErrors = true + conf.Logger.Warn("error processing option", "error", err) + } + } + }() + + slog.Debug("options processing", "state", "waiting") + wg.Wait() + slog.Debug("options processing", "state", "complete") + + return hadErrors, nil +} diff --git a/internal/importer/ingest.go b/internal/importer/ingest.go new file mode 100644 index 0000000..b9db80c --- /dev/null +++ b/internal/importer/ingest.go @@ -0,0 +1,237 @@ +package importer + +import ( + "context" + "fmt" + "log/slog" + "net/url" + "os" + "reflect" + "searchix/internal/options" + "strings" + + "github.com/bcicen/jstream" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +type nixValueJSON struct { + Type string `mapstructure:"_type"` + Text string +} + +type linkJSON struct { + Name string + URL string `json:"url"` +} + +type nixOptionJSON struct { + Declarations []linkJSON + Default *nixValueJSON + Description string + Example *nixValueJSON + Loc []string + ReadOnly bool + RelatedPackages string + Type string +} + +func ValueTypeToString(valueType jstream.ValueType) string { + switch valueType { + case jstream.Unknown: + return "unknown" + case jstream.Null: + return "null" + case jstream.String: + return "string" + case jstream.Number: + return "number" + case jstream.Boolean: + return "boolean" + case jstream.Array: + return "array" + case jstream.Object: + return "object" + } + + return "very strange" +} + +func makeGitHubFileURL(userRepo string, ref string, subPath string) string { + url, _ := url.JoinPath("https://github.com/", userRepo, "blob", ref, subPath) + + return url +} + +// make configurable? +var channelRepoMap = map[string]string{ + "nixpkgs": "NixOS/nixpkgs", + "nix-darwin": "LnL7/nix-darwin", + "home-manager": "nix-community/home-manager", +} + +func MakeChannelLink(channel string, ref string, subPath string) (*options.Link, error) { + if channelRepoMap[channel] == "" { + return nil, fmt.Errorf("don't know what repository relates to channel <%s>", channel) + } + + return &options.Link{ + Name: fmt.Sprintf("<%s/%s>", channel, subPath), + URL: makeGitHubFileURL(channelRepoMap[channel], ref, subPath), + }, nil +} + +func convertNixValue(nj *nixValueJSON) *options.NixValue { + if nj == nil { + return nil + } + switch nj.Type { + case "", "literalExpression": + return &options.NixValue{ + Text: nj.Text, + } + case "literalMD": + return &options.NixValue{ + Markdown: options.Markdown(nj.Text), + } + default: + slog.Warn("got unexpected NixValue type", "type", nj.Type, "text", nj.Text) + + return nil + } +} + +type OptionIngester struct { + dec *jstream.Decoder + ms *mapstructure.Decoder + optJSON nixOptionJSON + infile *os.File + source Source +} + +type Ingester[T options.NixOption] interface { + Process() (<-chan *T, <-chan error) +} + +func NewOptionProcessor(inpath string, source Source) (*OptionIngester, error) { + infile, err := os.Open(inpath) + if err != nil { + return nil, errors.WithMessagef(err, "failed to open input file %s", inpath) + } + i := OptionIngester{ + dec: jstream.NewDecoder(infile, 1).EmitKV(), + optJSON: nixOptionJSON{}, + infile: infile, + source: source, + } + + ms, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + ErrorUnused: true, + ZeroFields: true, + Result: &i.optJSON, + Squash: true, + DecodeHook: mapstructure.TextUnmarshallerHookFunc(), + }) + if err != nil { + defer infile.Close() + + return nil, errors.WithMessage(err, "could not create mapstructure decoder") + } + i.ms = ms + + return &i, nil +} + +func (i *OptionIngester) Process(ctx context.Context) (<-chan *options.NixOption, <-chan error) { + results := make(chan *options.NixOption) + errs := make(chan error) + + go func() { + defer i.infile.Close() + defer close(results) + defer close(errs) + + slog.Debug("starting decoder stream") + outer: + for mv := range i.dec.Stream() { + select { + case <-ctx.Done(): + break outer + default: + } + if err := i.dec.Err(); err != nil { + errs <- errors.WithMessage(err, "could not decode JSON") + + continue + } + if mv.ValueType != jstream.Object { + errs <- errors.Errorf("unexpected object type %s", ValueTypeToString(mv.ValueType)) + + continue + } + kv := mv.Value.(jstream.KV) + x := kv.Value.(map[string]interface{}) + + var decls []*options.Link + for _, decl := range x["declarations"].([]interface{}) { + i.optJSON = nixOptionJSON{} + + switch decl := reflect.ValueOf(decl); decl.Kind() { + case reflect.String: + s := decl.String() + link, err := MakeChannelLink(i.source.Channel, i.source.Repo.Revision, s) + if err != nil { + errs <- errors.WithMessagef(err, + "could not make a channel link for channel %s, revision %s and subpath %s", + i.source.Channel, i.source.Repo.Revision, s, + ) + + continue + } + decls = append(decls, link) + case reflect.Map: + v := decl.Interface().(map[string]interface{}) + link := options.Link{ + Name: v["name"].(string), + URL: v["url"].(string), + } + decls = append(decls, &link) + default: + errs <- errors.Errorf("unexpected declaration type %s", decl.Kind().String()) + + continue + } + } + if len(decls) > 0 { + x["declarations"] = decls + } + + err := i.ms.Decode(x) // stores in optJSON + if err != nil { + errs <- errors.WithMessagef(err, "failed to decode option %#v", x) + + continue + } + + var decs = make([]options.Link, len(i.optJSON.Declarations)) + for i, d := range i.optJSON.Declarations { + decs[i] = options.Link(d) + } + + // slog.Debug("sending option", "name", kv.Key) + results <- &options.NixOption{ + Option: kv.Key, + Source: strings.ToLower(i.source.Name), + Declarations: decs, + Default: convertNixValue(i.optJSON.Default), + Description: options.Markdown(i.optJSON.Description), + Example: convertNixValue(i.optJSON.Example), + RelatedPackages: options.Markdown(i.optJSON.RelatedPackages), + Loc: i.optJSON.Loc, + Type: i.optJSON.Type, + } + } + }() + + return results, errs +} diff --git a/internal/importer/nixpkgs-channel.go b/internal/importer/nixpkgs-channel.go new file mode 100644 index 0000000..0e5be62 --- /dev/null +++ b/internal/importer/nixpkgs-channel.go @@ -0,0 +1,82 @@ +package importer + +import ( + "bytes" + "context" + "log/slog" + "net/url" + "os" + "path" + "searchix/internal/file" + "searchix/internal/search" + + "github.com/pkg/errors" +) + +type NixpkgsChannelImporter struct { + DataPath string + Source Source + Logger *slog.Logger + indexPath string +} + +func makeChannelURL(channel string, subPath string) (string, error) { + url, err := url.JoinPath("https://channels.nixos.org/", channel, subPath) + + return url, errors.WithMessagef(err, "error creating URL") +} + +var filesToFetch = map[string]string{ + "revision": "git-revision", + "options": "options.json.br", +} + +func (i *NixpkgsChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) { + ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout) + defer cancel() + + root := i.DataPath + + err := file.Mkdirp(root) + if err != nil { + return false, errors.WithMessagef(err, "error creating directory for data: %s", root) + } + + for _, filename := range filesToFetch { + url, err := makeChannelURL(i.Source.Channel, filename) + if err != nil { + return false, err + } + + path := path.Join(root, filename) + + updated, err := fetchFileIfNeeded(ctx, path, url) + if err != nil { + return false, err + } + // don't bother to issue requests for the later files + if !updated { + return false, err + } + } + + return true, nil +} + +func (i *NixpkgsChannelImporter) Import(parent context.Context, indexer *search.WriteIndex) (bool, error) { + filename := path.Join(i.DataPath, filesToFetch["options"]) + revFilename := path.Join(i.DataPath, filesToFetch["revision"]) + bits, err := os.ReadFile(revFilename) + if err != nil { + return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename) + } + i.Source.Repo.Revision = string(bytes.TrimSpace(bits)) + i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename) + + return processOptions(parent, indexer, &importConfig{ + IndexPath: i.indexPath, + Source: i.Source, + Filename: filename, + Logger: i.Logger, + }) +} diff --git a/internal/importer/repository.go b/internal/importer/repository.go new file mode 100644 index 0000000..6cfd55e --- /dev/null +++ b/internal/importer/repository.go @@ -0,0 +1,44 @@ +package importer + +import ( + "fmt" + "strings" +) + +type RepoType int + +const ( + GitHub = iota + 1 +) + +type Repository struct { + Type string `default:"github"` + Owner string + Repo string + Revision string +} + +func (f RepoType) String() string { + switch f { + case GitHub: + return "github" + default: + return fmt.Sprintf("RepoType(%d)", f) + } +} + +func parseRepoType(name string) (RepoType, error) { + switch strings.ToLower(name) { + case "github": + return GitHub, nil + default: + return Unknown, fmt.Errorf("unsupported repo type %s", name) + } +} + +func (f *RepoType) UnmarshalText(text []byte) error { + var err error + *f, err = parseRepoType(string(text)) + + return err +} diff --git a/internal/importer/source-type.go b/internal/importer/source-type.go new file mode 100644 index 0000000..5d84547 --- /dev/null +++ b/internal/importer/source-type.go @@ -0,0 +1,44 @@ +package importer + +import ( + "fmt" + + "github.com/stoewer/go-strcase" +) + +type Type int + +const ( + Unknown = iota + Channel + ChannelNixpkgs +) + +func (f Type) String() string { + switch f { + case Channel: + return "channel" + case ChannelNixpkgs: + return "channel-nixpkgs" + } + + return fmt.Sprintf("Fetcher(%d)", f) +} + +func parseType(name string) (Type, error) { + switch strcase.KebabCase(name) { + case "channel": + return Channel, nil + case "channel-nixpkgs": + return ChannelNixpkgs, nil + default: + return Unknown, fmt.Errorf("unsupported fetcher %s", name) + } +} + +func (f *Type) UnmarshalText(text []byte) error { + var err error + *f, err = parseType(string(text)) + + return err +} -- cgit 1.4.1