about summary refs log tree commit diff stats
path: root/internal/importer
diff options
context:
space:
mode:
authorAlan Pearce2024-05-16 23:41:57 +0200
committerAlan Pearce2024-05-16 23:41:57 +0200
commita5e758d41c151c17ed03b39454470ba8dd0c3b99 (patch)
tree386333b5020477eabcf490773113b029e47a21ef /internal/importer
parentd558039919b6198a246a6a3fd007276191cb4b2f (diff)
downloadsearchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.tar.lz
searchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.tar.zst
searchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.zip
refactor: separate fetch and import logic
Diffstat (limited to 'internal/importer')
-rw-r--r--internal/importer/channel.go97
-rw-r--r--internal/importer/download-options.go87
-rw-r--r--internal/importer/http.go70
-rw-r--r--internal/importer/importer.go81
-rw-r--r--internal/importer/main.go62
-rw-r--r--internal/importer/nixpkgs-channel.go84
-rw-r--r--internal/importer/options.go (renamed from internal/importer/ingest.go)6
-rw-r--r--internal/importer/package.go6
-rw-r--r--internal/importer/utils.go22
9 files changed, 84 insertions, 431 deletions
diff --git a/internal/importer/channel.go b/internal/importer/channel.go
deleted file mode 100644
index 1bce1b0..0000000
--- a/internal/importer/channel.go
+++ /dev/null
@@ -1,97 +0,0 @@
-package importer
-
-import (
-	"context"
-	"fmt"
-	"log/slog"
-	"os"
-	"os/exec"
-	"path"
-	"searchix/internal/config"
-	"searchix/internal/file"
-	"searchix/internal/index"
-	"strconv"
-	"strings"
-
-	"github.com/pkg/errors"
-)
-
-type ChannelImporter struct {
-	DataPath   string
-	Source     *config.Source
-	SourceFile string
-	Logger     *slog.Logger
-}
-
-func (i *ChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) {
-	ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout)
-	defer cancel()
-
-	dest := i.DataPath
-
-	before, err := os.Readlink(dest)
-	if file.NeedNotExist(err) != nil {
-		return false, errors.WithMessagef(err, "could not call readlink on file %s", dest)
-	}
-	i.Logger.Debug("stat before", "name", before)
-
-	args := []string{
-		"--no-build-output",
-		"--timeout",
-		strconv.Itoa(int(i.Source.FetchTimeout.Seconds() - 1)),
-		fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath),
-		"--attr",
-		i.Source.Attribute,
-		"--out-link",
-		dest,
-	}
-
-	if i.Source.URL != "" {
-		args = append(args, "-I", fmt.Sprintf("%s=%s", i.Source.Channel, i.Source.URL))
-	}
-
-	i.Logger.Debug("nix-build command", "args", args)
-	cmd := exec.CommandContext(ctx, "nix-build", args...)
-	out, err := cmd.Output()
-	if err != nil {
-		return false, errors.WithMessage(err, "failed to run nix-build (--dry-run)")
-	}
-	i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out)))
-
-	outPath := path.Join(dest, i.Source.OutputPath)
-	i.Logger.Debug(
-		"checking output path",
-		"outputPath",
-		outPath,
-		"dest",
-		dest,
-		"source",
-		i.Source.OutputPath,
-	)
-	after, err := os.Readlink(dest)
-	if err := file.NeedNotExist(err); err != nil {
-		return false, errors.WithMessagef(
-			err,
-			"failed to stat output file from nix-build, filename: %s",
-			outPath,
-		)
-	}
-	i.Logger.Debug("stat after", "name", after)
-
-	return before != after, nil
-}
-
-func (i *ChannelImporter) Import(parent context.Context, indexer *index.WriteIndex) (bool, error) {
-	if i.Source.OutputPath == "" {
-		return false, errors.New("no output path specified")
-	}
-
-	filename := path.Join(i.DataPath, i.SourceFile, i.Source.OutputPath)
-	i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename)
-
-	return processOptions(parent, indexer, &importConfig{
-		Source:   i.Source,
-		Filename: filename,
-		Logger:   i.Logger,
-	})
-}
diff --git a/internal/importer/download-options.go b/internal/importer/download-options.go
deleted file mode 100644
index 6727138..0000000
--- a/internal/importer/download-options.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package importer
-
-import (
-	"bytes"
-	"context"
-	"log/slog"
-	"net/url"
-	"os"
-	"path"
-	"searchix/internal/config"
-	"searchix/internal/file"
-	"searchix/internal/index"
-
-	"github.com/pkg/errors"
-)
-
-type DownloadOptionsImporter struct {
-	DataPath   string
-	Source     *config.Source
-	SourceFile string
-	Logger     *slog.Logger
-}
-
-var optionsFiles = map[string]string{
-	"revision": "revision",
-	"options":  "options.json",
-}
-
-func (i *DownloadOptionsImporter) FetchIfNeeded(parent context.Context) (bool, error) {
-	ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout)
-	defer cancel()
-
-	root := i.DataPath
-
-	err := file.Mkdirp(root)
-	if err != nil {
-		return false, errors.WithMessagef(err, "error creating directory for data: %s", root)
-	}
-
-	var updated bool
-	for _, filename := range optionsFiles {
-		url, err := url.JoinPath(i.Source.URL, filename)
-		if err != nil {
-			return false, errors.WithMessagef(
-				err,
-				"could not build URL with elements %s and %s",
-				i.Source.URL,
-				filename,
-			)
-		}
-
-		path := path.Join(root, filename)
-
-		i.Logger.Debug("preparing to fetch URL", "url", url, "path", path)
-
-		updated, err = fetchFileIfNeeded(ctx, path, url)
-		if err != nil {
-			return false, err
-		}
-		// don't bother to issue requests for the later files
-		if !updated {
-			return false, err
-		}
-	}
-
-	return updated, nil
-}
-
-func (i *DownloadOptionsImporter) Import(
-	parent context.Context,
-	indexer *index.WriteIndex,
-) (bool, error) {
-	filename := path.Join(i.DataPath, optionsFiles["options"])
-	revFilename := path.Join(i.DataPath, optionsFiles["revision"])
-	bits, err := os.ReadFile(revFilename)
-	if err != nil {
-		return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename)
-	}
-	i.Source.Repo.Revision = string(bytes.TrimSpace(bits))
-	i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename)
-
-	return processOptions(parent, indexer, &importConfig{
-		Source:   i.Source,
-		Filename: filename,
-		Logger:   i.Logger,
-	})
-}
diff --git a/internal/importer/http.go b/internal/importer/http.go
deleted file mode 100644
index b496177..0000000
--- a/internal/importer/http.go
+++ /dev/null
@@ -1,70 +0,0 @@
-package importer
-
-import (
-	"context"
-	"fmt"
-	"log/slog"
-	"net/http"
-	"os"
-	"searchix/internal/config"
-	"searchix/internal/file"
-	"strings"
-	"time"
-
-	"github.com/pkg/errors"
-)
-
-func fetchFileIfNeeded(ctx context.Context, path string, url string) (needed bool, err error) {
-	stat, err := file.StatIfExists(path)
-	if err != nil {
-		return false, errors.WithMessagef(err, "could not stat file %s", path)
-	}
-
-	var mtime string
-	if stat != nil {
-		mtime = strings.Replace(stat.ModTime().UTC().Format(time.RFC1123), "UTC", "GMT", 1)
-	}
-
-	req, err := http.NewRequestWithContext(ctx, "GET", url, http.NoBody)
-	if err != nil {
-		return false, errors.WithMessagef(err, "could not create HTTP request for %s", url)
-	}
-
-	req.Header.Set("User-Agent", fmt.Sprintf("Searchix %s", config.ShortSHA))
-
-	if mtime != "" {
-		req.Header.Set("If-Modified-Since", mtime)
-	}
-	res, err := http.DefaultClient.Do(req)
-	if err != nil {
-		return false, errors.WithMessagef(err, "could not make HTTP request to %s", url)
-	}
-	defer res.Body.Close()
-
-	switch res.StatusCode {
-	case http.StatusNotModified:
-		needed = false
-	case http.StatusOK:
-		newMtime, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified"))
-		if err != nil {
-			slog.Warn(
-				"could not parse Last-Modified header from response",
-				"value",
-				res.Header.Get("Last-Modified"),
-			)
-		}
-		err = file.WriteToFile(path, res.Body)
-		if err != nil {
-			return false, errors.WithMessagef(err, "could not write response body to file %s", path)
-		}
-		err = os.Chtimes(path, time.Time{}, newMtime)
-		if err != nil {
-			slog.Warn("could not update mtime on file", "file", path)
-		}
-		needed = true
-	default:
-		return false, fmt.Errorf("got response code %d, don't know what to do", res.StatusCode)
-	}
-
-	return needed, nil
-}
diff --git a/internal/importer/importer.go b/internal/importer/importer.go
index 255f70e..57118c8 100644
--- a/internal/importer/importer.go
+++ b/internal/importer/importer.go
@@ -3,79 +3,32 @@ package importer
 import (
 	"context"
 	"log/slog"
-	"searchix/internal/config"
 	"searchix/internal/index"
+	"searchix/internal/nix"
 	"sync"
 )
 
 type Importer interface {
-	FetchIfNeeded(context.Context) (bool, error)
 	Import(context.Context, *index.WriteIndex) (bool, error)
 }
 
-func NewNixpkgsChannelImporter(
-	source *config.Source,
-	dataPath string,
-	logger *slog.Logger,
-) *NixpkgsChannelImporter {
-	return &NixpkgsChannelImporter{
-		DataPath: dataPath,
-		Source:   source,
-		Logger:   logger,
-	}
-}
-
-func NewChannelImporter(
-	source *config.Source,
-	dataPath string,
-	logger *slog.Logger,
-) *ChannelImporter {
-	return &ChannelImporter{
-		DataPath: dataPath,
-		Source:   source,
-		Logger:   logger,
-	}
-}
-
-func NewDownloadOptionsImporter(
-	source *config.Source,
-	dataPath string,
-	logger *slog.Logger,
-) *DownloadOptionsImporter {
-	return &DownloadOptionsImporter{
-		DataPath: dataPath,
-		Source:   source,
-		Logger:   logger,
-	}
-}
-
-type importConfig struct {
-	Filename string
-	Source   *config.Source
-	Logger   *slog.Logger
+type Processor interface {
+	Process(context.Context) (<-chan nix.Importable, <-chan error)
 }
 
-func processOptions(
-	parent context.Context,
+func process(
+	ctx context.Context,
 	indexer *index.WriteIndex,
-	conf *importConfig,
-) (bool, error) {
-	ctx, cancel := context.WithTimeout(parent, conf.Source.ImportTimeout)
-	defer cancel()
-
-	conf.Logger.Debug("creating option processor", "filename", conf.Filename)
-	processor, err := NewOptionProcessor(conf.Filename, conf.Source)
-	if err != nil {
-		return true, err
-	}
-
+	processor Processor,
+	logger *slog.Logger,
+) bool {
 	wg := sync.WaitGroup{}
 
 	wg.Add(1)
-	options, pErrs := processor.Process(ctx)
+	objects, pErrs := processor.Process(ctx)
 
 	wg.Add(1)
-	iErrs := indexer.Import(ctx, options)
+	iErrs := indexer.Import(ctx, objects)
 
 	var hadErrors bool
 	go func() {
@@ -85,29 +38,29 @@ func processOptions(
 				if !running {
 					wg.Done()
 					iErrs = nil
-					conf.Logger.Info("ingest completed")
+					logger.Debug("ingest completed")
 
 					continue
 				}
 				hadErrors = true
-				conf.Logger.Warn("error ingesting option", "error", err)
+				logger.Warn("error ingesting object", "error", err)
 			case err, running := <-pErrs:
 				if !running {
 					wg.Done()
 					pErrs = nil
-					conf.Logger.Debug("processing completed")
+					logger.Debug("processing completed")
 
 					continue
 				}
 				hadErrors = true
-				conf.Logger.Warn("error processing option", "error", err)
+				logger.Warn("error processing object", "error", err)
 			}
 		}
 	}()
 
-	conf.Logger.Debug("options processing", "state", "waiting")
+	logger.Debug("object processing", "state", "waiting")
 	wg.Wait()
-	conf.Logger.Debug("options processing", "state", "complete")
+	logger.Debug("object processing", "state", "complete")
 
-	return hadErrors, nil
+	return hadErrors
 }
diff --git a/internal/importer/main.go b/internal/importer/main.go
index 0b7a99d..d2b66e1 100644
--- a/internal/importer/main.go
+++ b/internal/importer/main.go
@@ -2,14 +2,15 @@ package importer
 
 import (
 	"context"
-	"errors"
-	"log"
 	"log/slog"
 	"os/exec"
 	"path"
 	"searchix/internal/config"
+	"searchix/internal/fetcher"
 	"searchix/internal/index"
 	"strings"
+
+	"github.com/pkg/errors"
 )
 
 func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error {
@@ -22,27 +23,20 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error {
 	ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration)
 	defer cancel()
 
-	var imp Importer
 	for name, source := range cfg.Importer.Sources {
-		logger := slog.With("name", name, "importer", source.Type.String())
-		logger.Debug("starting importer")
+		logger := slog.With("name", name, "fetcher", source.Fetcher.String())
+		logger.Debug("starting fetcher")
 
-		importerDataPath := path.Join(cfg.DataPath, "sources", source.Channel)
+		fetcherDataPath := path.Join(cfg.DataPath, "sources", source.Key)
 
-		switch source.Type {
-		case config.ChannelNixpkgs:
-			imp = NewNixpkgsChannelImporter(source, importerDataPath, logger)
-		case config.Channel:
-			imp = NewChannelImporter(source, importerDataPath, logger)
-		case config.DownloadOptions:
-			imp = NewDownloadOptionsImporter(source, importerDataPath, logger)
-		default:
-			log.Printf("unsupported importer type %s", source.Type.String())
+		fetcher, err := fetcher.New(source, fetcherDataPath, logger)
+		if err != nil {
+			logger.Warn("error creating fetcher", "error", err)
 
 			continue
 		}
 
-		updated, err := imp.FetchIfNeeded(ctx)
+		files, updated, err := fetcher.FetchIfNeeded(ctx)
 
 		if err != nil {
 			var exerr *exec.ExitError
@@ -60,16 +54,38 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error {
 		logger.Info("importer fetch succeeded", "updated", updated)
 
 		if updated || replace {
-			hadWarnings, err := imp.Import(ctx, indexer)
-
+			err = setRepoRevision(files.Revision, source)
 			if err != nil {
-				msg := err.Error()
-				for _, line := range strings.Split(strings.TrimSpace(msg), "\n") {
-					logger.Error("importer init failed", "error", line)
-				}
+				logger.Warn("could not set source repo revision", "error", err)
+			}
 
-				continue
+			var processor Processor
+			switch source.Importer {
+			case config.Options:
+				logger.Debug(
+					"creating processor",
+					"filename",
+					files.Options,
+					"revision",
+					source.Repo.Revision,
+				)
+				processor, err = NewOptionProcessor(files.Options, source)
+			case config.Packages:
+				logger.Debug(
+					"creating processor",
+					"filename",
+					files.Packages,
+					"revision",
+					source.Repo.Revision,
+				)
+				processor, err = NewPackageProcessor(files.Packages, source)
 			}
+			if err != nil {
+				logger.Warn("failed to create processor", "type", source.Importer, "error", err)
+			}
+
+			hadWarnings := process(ctx, indexer, processor, logger)
+
 			if hadWarnings {
 				logger.Warn("importer succeeded, but with warnings/errors")
 			} else {
diff --git a/internal/importer/nixpkgs-channel.go b/internal/importer/nixpkgs-channel.go
deleted file mode 100644
index d302154..0000000
--- a/internal/importer/nixpkgs-channel.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package importer
-
-import (
-	"bytes"
-	"context"
-	"log/slog"
-	"net/url"
-	"os"
-	"path"
-	"searchix/internal/config"
-	"searchix/internal/file"
-	"searchix/internal/index"
-
-	"github.com/pkg/errors"
-)
-
-type NixpkgsChannelImporter struct {
-	DataPath string
-	Source   *config.Source
-	Logger   *slog.Logger
-}
-
-func makeChannelURL(channel string, subPath string) (string, error) {
-	url, err := url.JoinPath("https://channels.nixos.org/", channel, subPath)
-
-	return url, errors.WithMessagef(err, "error creating URL")
-}
-
-var filesToFetch = map[string]string{
-	"revision": "git-revision",
-	"options":  "options.json.br",
-}
-
-func (i *NixpkgsChannelImporter) FetchIfNeeded(parent context.Context) (bool, error) {
-	ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout)
-	defer cancel()
-
-	root := i.DataPath
-
-	err := file.Mkdirp(root)
-	if err != nil {
-		return false, errors.WithMessagef(err, "error creating directory for data: %s", root)
-	}
-
-	for _, filename := range filesToFetch {
-		url, err := makeChannelURL(i.Source.Channel, filename)
-		if err != nil {
-			return false, err
-		}
-
-		path := path.Join(root, filename)
-
-		updated, err := fetchFileIfNeeded(ctx, path, url)
-		if err != nil {
-			return false, err
-		}
-		// don't bother to issue requests for the later files
-		if !updated {
-			return false, err
-		}
-	}
-
-	return true, nil
-}
-
-func (i *NixpkgsChannelImporter) Import(
-	parent context.Context,
-	indexer *index.WriteIndex,
-) (bool, error) {
-	filename := path.Join(i.DataPath, filesToFetch["options"])
-	revFilename := path.Join(i.DataPath, filesToFetch["revision"])
-	bits, err := os.ReadFile(revFilename)
-	if err != nil {
-		return false, errors.WithMessagef(err, "unable to read revision file at %s", revFilename)
-	}
-	i.Source.Repo.Revision = string(bytes.TrimSpace(bits))
-	i.Logger.Debug("preparing import run", "revision", i.Source.Repo.Revision, "filename", filename)
-
-	return processOptions(parent, indexer, &importConfig{
-		Source:   i.Source,
-		Filename: filename,
-		Logger:   i.Logger,
-	})
-}
diff --git a/internal/importer/ingest.go b/internal/importer/options.go
index 9b92ae8..ec2c20f 100644
--- a/internal/importer/ingest.go
+++ b/internal/importer/options.go
@@ -91,9 +91,8 @@ func NewOptionProcessor(inpath string, source *config.Source) (*OptionIngester,
 	return &i, nil
 }
 
-func (i *OptionIngester) Process(
-	ctx context.Context,
-) (<-chan nix.Importable, <-chan error) {
+func (i *OptionIngester) Process(parent context.Context) (<-chan nix.Importable, <-chan error) {
+	ctx, cancel := context.WithTimeout(parent, i.source.ImportTimeout)
 	results := make(chan nix.Importable)
 	errs := make(chan error)
 
@@ -101,6 +100,7 @@ func (i *OptionIngester) Process(
 		defer i.infile.Close()
 		defer close(results)
 		defer close(errs)
+		defer cancel()
 
 		slog.Debug("starting decoder stream")
 	outer:
diff --git a/internal/importer/package.go b/internal/importer/package.go
index 49e313d..3e0ec83 100644
--- a/internal/importer/package.go
+++ b/internal/importer/package.go
@@ -109,9 +109,8 @@ func convertToLicense(in map[string]any) *nix.License {
 	return l
 }
 
-func (i *PackageIngester) Process(
-	ctx context.Context,
-) (<-chan nix.Importable, <-chan error) {
+func (i *PackageIngester) Process(parent context.Context) (<-chan nix.Importable, <-chan error) {
+	ctx, cancel := context.WithTimeout(parent, i.source.ImportTimeout)
 	results := make(chan nix.Importable)
 	errs := make(chan error)
 
@@ -119,6 +118,7 @@ func (i *PackageIngester) Process(
 		defer i.infile.Close()
 		defer close(results)
 		defer close(errs)
+		defer cancel()
 
 		userRepo := i.source.Repo.Owner + "/" + i.source.Repo.Repo
 		slog.Debug("starting decoder stream")
diff --git a/internal/importer/utils.go b/internal/importer/utils.go
index 13d4702..3eb034f 100644
--- a/internal/importer/utils.go
+++ b/internal/importer/utils.go
@@ -1,11 +1,15 @@
 package importer
 
 import (
+	"bytes"
 	"fmt"
 	"net/url"
+	"os"
+	"searchix/internal/config"
 	"searchix/internal/nix"
 
 	"github.com/bcicen/jstream"
+	"github.com/pkg/errors"
 )
 
 func ValueTypeToString(valueType jstream.ValueType) string {
@@ -58,3 +62,21 @@ func MakeChannelLink(channel string, ref string, subPath string) (*nix.Link, err
 		URL:  makeGitHubFileURL(channelRepoMap[channel], ref, subPath, ""),
 	}, nil
 }
+
+func setRepoRevision(filename string, source *config.Source) error {
+	if filename != "" {
+		bits, err := os.ReadFile(filename)
+		if err != nil {
+			return errors.WithMessagef(
+				err,
+				"unable to read revision file at %s",
+				filename,
+			)
+		}
+
+		source.Repo.Revision = string(bytes.TrimSpace(bits))
+
+	}
+
+	return nil
+}