about summary refs log tree commit diff stats
path: root/internal/fetcher
diff options
context:
space:
mode:
authorAlan Pearce2024-05-23 13:14:45 +0200
committerAlan Pearce2024-05-23 13:14:45 +0200
commit0dbfe37fbddb95c184d845c79bbe014597d55fe8 (patch)
treee68a2db861211ceebe4c357a059a4cb511f707a9 /internal/fetcher
parent3053e41b1528ef898cccd44e056e4d167619af6b (diff)
downloadsearchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.tar.lz
searchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.tar.zst
searchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.zip
feat: stream files directly from fetcher to importer
Use IndexMeta to store the information relevant to making conditional
updates in future runs.
Diffstat (limited to 'internal/fetcher')
-rw-r--r--internal/fetcher/channel.go61
-rw-r--r--internal/fetcher/download.go56
-rw-r--r--internal/fetcher/http.go74
-rw-r--r--internal/fetcher/main.go17
-rw-r--r--internal/fetcher/nixpkgs-channel.go60
5 files changed, 125 insertions, 143 deletions
diff --git a/internal/fetcher/channel.go b/internal/fetcher/channel.go
index fd7427c..3756012 100644
--- a/internal/fetcher/channel.go
+++ b/internal/fetcher/channel.go
@@ -8,15 +8,15 @@ import (
 	"os/exec"
 	"path"
 	"searchix/internal/config"
-	"searchix/internal/file"
+	"searchix/internal/index"
 	"strconv"
 	"strings"
+	"time"
 
 	"github.com/pkg/errors"
 )
 
 type ChannelFetcher struct {
-	DataPath   string
 	Source     *config.Source
 	SourceFile string
 	Logger     *slog.Logger
@@ -24,15 +24,13 @@ type ChannelFetcher struct {
 
 func NewChannelFetcher(
 	source *config.Source,
-	dataPath string,
 	logger *slog.Logger,
 ) (*ChannelFetcher, error) {
 	switch source.Importer {
 	case config.Options:
 		return &ChannelFetcher{
-			DataPath: dataPath,
-			Source:   source,
-			Logger:   logger,
+			Source: source,
+			Logger: logger,
 		}, nil
 	default:
 		return nil, fmt.Errorf("unsupported importer type %s", source.Importer)
@@ -40,22 +38,9 @@ func NewChannelFetcher(
 }
 
 func (i *ChannelFetcher) FetchIfNeeded(
-	parent context.Context,
-) (f FetchedFiles, updated bool, err error) {
-	ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration)
-	defer cancel()
-
-	dest := i.DataPath
-
-	var before string
-	before, err = os.Readlink(dest)
-	if file.NeedNotExist(err) != nil {
-		err = errors.WithMessagef(err, "could not call readlink on file %s", dest)
-
-		return
-	}
-	i.Logger.Debug("stat before", "name", before)
-
+	ctx context.Context,
+	sourceMeta *index.SourceMeta,
+) (f FetchedFiles, err error) {
 	args := []string{
 		"--no-build-output",
 		"--timeout",
@@ -63,8 +48,7 @@ func (i *ChannelFetcher) FetchIfNeeded(
 		fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath),
 		"--attr",
 		i.Source.Attribute,
-		"--out-link",
-		dest,
+		"--no-out-link",
 	}
 
 	if i.Source.URL != "" {
@@ -80,35 +64,28 @@ func (i *ChannelFetcher) FetchIfNeeded(
 
 		return
 	}
-	i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out)))
 
-	outPath := path.Join(dest, i.Source.OutputPath)
+	outPath := path.Join(strings.TrimSpace(string(out)), i.Source.OutputPath, "options.json")
 	i.Logger.Debug(
 		"checking output path",
 		"outputPath",
 		outPath,
-		"dest",
-		dest,
-		"source",
-		i.Source.OutputPath,
 	)
-	var after string
-	after, err = os.Readlink(dest)
-	if err = file.NeedNotExist(err); err != nil {
-		err = errors.WithMessagef(
-			err,
-			"failed to stat output file from nix-build, filename: %s",
-			outPath,
-		)
 
-		return
+	if outPath != sourceMeta.Path {
+		sourceMeta.Path = outPath
+		sourceMeta.Updated = time.Now().Truncate(time.Second)
 	}
-	i.Logger.Debug("stat after", "name", after)
 
-	updated = before != after
+	file, err := os.Open(outPath)
+	if err != nil {
+		err = errors.WithMessage(err, "failed to open options.json")
+
+		return
+	}
 
 	f = FetchedFiles{
-		Options: path.Join(dest, i.Source.OutputPath, "options.json"),
+		Options: file,
 	}
 
 	return
diff --git a/internal/fetcher/download.go b/internal/fetcher/download.go
index 2c7b8fd..59ef8d1 100644
--- a/internal/fetcher/download.go
+++ b/internal/fetcher/download.go
@@ -5,15 +5,13 @@ import (
 	"fmt"
 	"log/slog"
 	"net/url"
-	"path"
 	"searchix/internal/config"
-	"searchix/internal/file"
+	"searchix/internal/index"
 
 	"github.com/pkg/errors"
 )
 
 type DownloadFetcher struct {
-	DataPath   string
 	Source     *config.Source
 	SourceFile string
 	Logger     *slog.Logger
@@ -21,15 +19,13 @@ type DownloadFetcher struct {
 
 func NewDownloadFetcher(
 	source *config.Source,
-	dataPath string,
 	logger *slog.Logger,
 ) (*DownloadFetcher, error) {
 	switch source.Importer {
 	case config.Options:
 		return &DownloadFetcher{
-			DataPath: dataPath,
-			Source:   source,
-			Logger:   logger,
+			Source: source,
+			Logger: logger,
 		}, nil
 	default:
 		return nil, fmt.Errorf("unsupported importer type %s", source.Importer)
@@ -42,22 +38,11 @@ var files = map[string]string{
 }
 
 func (i *DownloadFetcher) FetchIfNeeded(
-	parent context.Context,
-) (f FetchedFiles, updated bool, err error) {
-	ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration)
-	defer cancel()
-
-	root := i.DataPath
-
-	err = file.Mkdirp(root)
-	if err != nil {
-		err = errors.WithMessagef(err, "error creating directory for data: %s", root)
-
-		return
-	}
-
+	ctx context.Context,
+	sourceMeta *index.SourceMeta,
+) (f FetchedFiles, err error) {
 	var fetchURL string
-	for _, filename := range files {
+	for key, filename := range files {
 		fetchURL, err = url.JoinPath(i.Source.URL, filename)
 		if err != nil {
 			err = errors.WithMessagef(
@@ -70,23 +55,28 @@ func (i *DownloadFetcher) FetchIfNeeded(
 			return
 		}
 
-		outPath := path.Join(root, filename)
-
-		i.Logger.Debug("preparing to fetch URL", "url", fetchURL, "path", outPath)
+		i.Logger.Debug("preparing to fetch URL", "url", fetchURL)
 
-		updated, err = fetchFileIfNeeded(ctx, outPath, fetchURL)
+		body, mtime, err := fetchFileIfNeeded(ctx, sourceMeta.Updated, fetchURL)
 		if err != nil {
-			return
+			i.Logger.Warn("failed to fetch file", "url", fetchURL, "error", err)
+
+			return f, err
 		}
 		// don't bother to issue requests for the later files
-		if !updated {
+		if mtime.Before(sourceMeta.Updated) {
 			break
 		}
-	}
-
-	f = FetchedFiles{
-		Revision: path.Join(root, "revision"),
-		Options:  path.Join(root, "options.json"),
+		sourceMeta.Updated = mtime
+
+		switch key {
+		case "revision":
+			f.Revision = body
+		case "options":
+			f.Options = body
+		default:
+			return f, errors.Errorf("unknown file kind %s", key)
+		}
 	}
 
 	return
diff --git a/internal/fetcher/http.go b/internal/fetcher/http.go
index 9afbbc0..675c3b3 100644
--- a/internal/fetcher/http.go
+++ b/internal/fetcher/http.go
@@ -3,68 +3,90 @@ package fetcher
 import (
 	"context"
 	"fmt"
+	"io"
 	"log/slog"
 	"net/http"
-	"os"
 	"searchix/internal/config"
-	"searchix/internal/file"
 	"strings"
 	"time"
 
+	"github.com/andybalholm/brotli"
 	"github.com/pkg/errors"
 )
 
-func fetchFileIfNeeded(ctx context.Context, path string, url string) (needed bool, err error) {
-	stat, err := file.StatIfExists(path)
-	if err != nil {
-		return false, errors.WithMessagef(err, "could not stat file %s", path)
+type brotliReadCloser struct {
+	src io.ReadCloser
+	*brotli.Reader
+}
+
+func newBrotliReader(src io.ReadCloser) *brotliReadCloser {
+	return &brotliReadCloser{
+		src:    src,
+		Reader: brotli.NewReader(src),
 	}
+}
 
-	var mtime string
-	if stat != nil {
-		mtime = strings.Replace(stat.ModTime().UTC().Format(time.RFC1123), "UTC", "GMT", 1)
+func (r *brotliReadCloser) Close() error {
+	return errors.Wrap(r.src.Close(), "failed to call close on underlying reader")
+}
+
+func fetchFileIfNeeded(
+	ctx context.Context,
+	mtime time.Time,
+	url string,
+) (body io.ReadCloser, newMtime time.Time, err error) {
+	var ifModifiedSince string
+	if !mtime.IsZero() {
+		ifModifiedSince = strings.Replace(mtime.UTC().Format(time.RFC1123), "UTC", "GMT", 1)
 	}
 
 	req, err := http.NewRequestWithContext(ctx, "GET", url, http.NoBody)
 	if err != nil {
-		return false, errors.WithMessagef(err, "could not create HTTP request for %s", url)
+		err = errors.WithMessagef(err, "could not create HTTP request for %s", url)
+
+		return
 	}
 
 	req.Header.Set("User-Agent", fmt.Sprintf("Searchix %s", config.ShortSHA))
 
-	if mtime != "" {
-		req.Header.Set("If-Modified-Since", mtime)
+	if ifModifiedSince != "" {
+		req.Header.Set("If-Modified-Since", ifModifiedSince)
 	}
 	res, err := http.DefaultClient.Do(req)
 	if err != nil {
-		return false, errors.WithMessagef(err, "could not make HTTP request to %s", url)
+		err = errors.WithMessagef(err, "could not make HTTP request to %s", url)
+
+		return
 	}
-	defer res.Body.Close()
 
 	switch res.StatusCode {
 	case http.StatusNotModified:
-		needed = false
+		newMtime = mtime
+
+		return
 	case http.StatusOK:
-		newMtime, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified"))
+		newMtime, err = time.Parse(time.RFC1123, res.Header.Get("Last-Modified"))
 		if err != nil {
 			slog.Warn(
 				"could not parse Last-Modified header from response",
 				"value",
 				res.Header.Get("Last-Modified"),
 			)
+			newMtime = time.Now()
 		}
-		err = file.WriteToFile(path, res.Body)
-		if err != nil {
-			return false, errors.WithMessagef(err, "could not write response body to file %s", path)
-		}
-		err = os.Chtimes(path, time.Time{}, newMtime)
-		if err != nil {
-			slog.Warn("could not update mtime on file", "file", path)
+
+		switch ce := res.Header.Get("Content-Encoding"); ce {
+		case "br":
+			slog.Debug("using brotli encoding")
+			body = newBrotliReader(res.Body)
+		case "", "identity", "gzip":
+			body = res.Body
+		default:
+			err = fmt.Errorf("cannot handle a body with content-encoding %s", ce)
 		}
-		needed = true
 	default:
-		return false, fmt.Errorf("got response code %d, don't know what to do", res.StatusCode)
+		err = fmt.Errorf("got response code %d, don't know what to do", res.StatusCode)
 	}
 
-	return needed, nil
+	return
 }
diff --git a/internal/fetcher/main.go b/internal/fetcher/main.go
index 65f62db..7ea0b03 100644
--- a/internal/fetcher/main.go
+++ b/internal/fetcher/main.go
@@ -2,34 +2,35 @@ package fetcher
 
 import (
 	"context"
+	"io"
 	"log/slog"
 	"searchix/internal/config"
+	"searchix/internal/index"
 
 	"github.com/pkg/errors"
 )
 
 type FetchedFiles struct {
-	Revision string
-	Options  string
-	Packages string
+	Revision io.ReadCloser
+	Options  io.ReadCloser
+	Packages io.ReadCloser
 }
 
 type Fetcher interface {
-	FetchIfNeeded(context.Context) (FetchedFiles, bool, error)
+	FetchIfNeeded(context.Context, *index.SourceMeta) (FetchedFiles, error)
 }
 
 func New(
 	source *config.Source,
-	fetcherDataPath string,
 	logger *slog.Logger,
 ) (fetcher Fetcher, err error) {
 	switch source.Fetcher {
 	case config.ChannelNixpkgs:
-		fetcher, err = NewNixpkgsChannelFetcher(source, fetcherDataPath, logger)
+		fetcher, err = NewNixpkgsChannelFetcher(source, logger)
 	case config.Channel:
-		fetcher, err = NewChannelFetcher(source, fetcherDataPath, logger)
+		fetcher, err = NewChannelFetcher(source, logger)
 	case config.Download:
-		fetcher, err = NewDownloadFetcher(source, fetcherDataPath, logger)
+		fetcher, err = NewDownloadFetcher(source, logger)
 	default:
 		err = errors.Errorf("unsupported fetcher type %s", source.Fetcher.String())
 	}
diff --git a/internal/fetcher/nixpkgs-channel.go b/internal/fetcher/nixpkgs-channel.go
index 033b577..62fea13 100644
--- a/internal/fetcher/nixpkgs-channel.go
+++ b/internal/fetcher/nixpkgs-channel.go
@@ -5,17 +5,15 @@ import (
 	"fmt"
 	"log/slog"
 	"net/url"
-	"path"
 	"searchix/internal/config"
-	"searchix/internal/file"
+	"searchix/internal/index"
 
 	"github.com/pkg/errors"
 )
 
 type NixpkgsChannelFetcher struct {
-	DataPath string
-	Source   *config.Source
-	Logger   *slog.Logger
+	Source *config.Source
+	Logger *slog.Logger
 }
 
 func makeChannelURL(channel string, subPath string) (string, error) {
@@ -26,15 +24,13 @@ func makeChannelURL(channel string, subPath string) (string, error) {
 
 func NewNixpkgsChannelFetcher(
 	source *config.Source,
-	dataPath string,
 	logger *slog.Logger,
 ) (*NixpkgsChannelFetcher, error) {
 	switch source.Importer {
 	case config.Options, config.Packages:
 		return &NixpkgsChannelFetcher{
-			DataPath: dataPath,
-			Source:   source,
-			Logger:   logger,
+			Source: source,
+			Logger: logger,
 		}, nil
 	default:
 		return nil, fmt.Errorf("unsupported importer type %s", source.Importer)
@@ -48,19 +44,9 @@ const (
 )
 
 func (i *NixpkgsChannelFetcher) FetchIfNeeded(
-	parent context.Context,
-) (f FetchedFiles, updated bool, err error) {
-	ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration)
-	defer cancel()
-
-	root := i.DataPath
-
-	err = file.Mkdirp(root)
-	if err != nil {
-		err = errors.WithMessagef(err, "error creating directory for data: %s", root)
-
-		return
-	}
+	ctx context.Context,
+	sourceMeta *index.SourceMeta,
+) (f FetchedFiles, err error) {
 
 	filesToFetch := make([]string, 2)
 	filesToFetch[0] = revisionFilename
@@ -78,23 +64,29 @@ func (i *NixpkgsChannelFetcher) FetchIfNeeded(
 			return
 		}
 
-		outPath := path.Join(root, filename)
-
-		i.Logger.Debug("attempting to fetch file", "url", fetchURL, "outPath", outPath)
-		updated, err = fetchFileIfNeeded(ctx, outPath, fetchURL)
+		i.Logger.Debug("attempting to fetch file", "url", fetchURL)
+		body, mtime, err := fetchFileIfNeeded(ctx, sourceMeta.Updated, fetchURL)
 		if err != nil {
-			return
+			i.Logger.Warn("failed to fetch file", "url", fetchURL, "error", err)
+
+			return f, err
 		}
 		// don't bother to issue requests for the later files
-		if !updated {
+		if mtime.Before(sourceMeta.Updated) {
 			break
 		}
-	}
-
-	f = FetchedFiles{
-		Revision: path.Join(root, "git-revision"),
-		Options:  path.Join(root, "options.json.br"),
-		Packages: path.Join(root, "packages.json.br"),
+		sourceMeta.Updated = mtime
+
+		switch filename {
+		case revisionFilename:
+			f.Revision = body
+		case optionsFilename:
+			f.Options = body
+		case packagesFileName:
+			f.Packages = body
+		default:
+			return f, errors.Errorf("unknown file kind %s", filename)
+		}
 	}
 
 	return