From 0dbfe37fbddb95c184d845c79bbe014597d55fe8 Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Thu, 23 May 2024 13:14:45 +0200 Subject: feat: stream files directly from fetcher to importer Use IndexMeta to store the information relevant to making conditional updates in future runs. --- internal/importer/main.go | 174 +++++++++++++++++++++++++-------------------- internal/importer/utils.go | 57 +++------------ 2 files changed, 106 insertions(+), 125 deletions(-) (limited to 'internal/importer') diff --git a/internal/importer/main.go b/internal/importer/main.go index 6f462c3..6be5b45 100644 --- a/internal/importer/main.go +++ b/internal/importer/main.go @@ -2,56 +2,49 @@ package importer import ( "context" - "io" + "fmt" "log/slog" "os/exec" - "path" "searchix/internal/config" "searchix/internal/fetcher" "searchix/internal/index" "slices" "strings" + "time" "github.com/pkg/errors" ) -func Start( - cfg *config.Config, +func createSourceImporter( + parent context.Context, + meta *index.Meta, indexer *index.WriteIndex, forceUpdate bool, - onlyUpdateSources *[]string, -) error { - if len(cfg.Importer.Sources) == 0 { - slog.Info("No sources enabled") - - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration) - defer cancel() - - forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) - - for name, source := range cfg.Importer.Sources { - if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 { - if !slices.Contains(*onlyUpdateSources, name) { - continue - } - } - - logger := slog.With("name", name, "fetcher", source.Fetcher.String()) +) func(*config.Source) error { + return func(source *config.Source) error { + logger := slog.With( + "name", + source.Key, + "fetcher", + source.Fetcher.String(), + "timeout", + source.FetchTimeout.Duration, + ) logger.Debug("starting fetcher") - fetcherDataPath := path.Join(cfg.DataPath, "sources", source.Key) - - fetcher, err := fetcher.New(source, fetcherDataPath, logger) + fetcher, err := fetcher.New(source, logger) if err != nil { - logger.Error("error creating fetcher", "error", err) - - continue + return errors.WithMessage(err, "error creating fetcher") } - files, updated, err := fetcher.FetchIfNeeded(ctx) + sourceMeta := meta.GetSourceMeta(source.Key) + if forceUpdate { + sourceMeta.Updated = time.Time{} + } + previousUpdate := sourceMeta.Updated + ctx, cancel := context.WithTimeout(parent, source.FetchTimeout.Duration) + defer cancel() + files, err := fetcher.FetchIfNeeded(ctx, &sourceMeta) if err != nil { var exerr *exec.ExitError @@ -65,70 +58,52 @@ func Start( "status", exerr.ExitCode(), ) - - continue } - } else { - logger.Error("importer fetch failed", "error", err) - - continue } - continue + return errors.WithMessage(err, "importer fetch failed") } - logger.Info("importer fetch succeeded", "updated", updated) + logger.Info( + "importer fetch succeeded", + "previous", + previousUpdate, + "current", + sourceMeta.Updated, + "is_updated", + sourceMeta.Updated.After(previousUpdate), + "update_force", + forceUpdate, + ) + + if sourceMeta.Updated.After(previousUpdate) || forceUpdate { - if updated || forceUpdate { err = setRepoRevision(files.Revision, source) if err != nil { logger.Warn("could not set source repo revision", "error", err) } - var file io.ReadCloser var processor Processor + logger.Debug( + "creating processor", + "importer_type", + source.Importer, + "revision", + source.Repo.Revision, + ) switch source.Importer { case config.Options: - logger.Debug( - "creating processor", - "filename", - files.Options, - "revision", - source.Repo.Revision, - ) - file, err = openFileDecoded(files.Options) - if err != nil { - logger.Error("could not open file", "filename", files.Options, "error", err) - - continue - } - processor, err = NewOptionProcessor(file, source) + logger.Debug("processor created", "file", fmt.Sprintf("%T", files.Options)) + processor, err = NewOptionProcessor(files.Options, source) case config.Packages: - logger.Debug( - "creating processor", - "filename", - files.Packages, - "revision", - source.Repo.Revision, - ) - file, err = openFileDecoded(files.Packages) - if err != nil { - logger.Error("could not open file", "filename", files.Packages, "error", err) - - continue - } - processor, err = NewPackageProcessor(file, source) + processor, err = NewPackageProcessor(files.Packages, source) } if err != nil { - logger.Error("failed to create processor", "type", source.Importer, "error", err) - - continue + return errors.WithMessagef(err, "failed to create processor") } hadWarnings, err := process(ctx, indexer, processor, logger) if err != nil { - logger.Error("failed to process source", "error", err) - - continue + return errors.WithMessagef(err, "failed to process source") } if hadWarnings { @@ -137,6 +112,53 @@ func Start( logger.Info("importer succeeded") } } + + sourceMeta.Rev = source.Repo.Revision + meta.SetSourceMeta(source.Key, sourceMeta) + + return nil + } +} + +func Start( + cfg *config.Config, + indexer *index.WriteIndex, + forceUpdate bool, + onlyUpdateSources *[]string, +) error { + if len(cfg.Importer.Sources) == 0 { + slog.Info("No sources enabled") + + return nil + } + + slog.Debug("starting importer", "timeout", cfg.Importer.Timeout.Duration) + importCtx, cancelImport := context.WithTimeout( + context.Background(), + cfg.Importer.Timeout.Duration, + ) + defer cancelImport() + + forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) + + meta := indexer.Meta + + importSource := createSourceImporter(importCtx, meta, indexer, forceUpdate) + for name, source := range cfg.Importer.Sources { + if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 { + if !slices.Contains(*onlyUpdateSources, name) { + continue + } + } + err := importSource(source) + if err != nil { + slog.Error("import failed", "source", name, "error", err) + } + } + + err := indexer.SaveMeta() + if err != nil { + return errors.Wrap(err, "failed to save metadata") } return nil diff --git a/internal/importer/utils.go b/internal/importer/utils.go index 425b7bd..7c53173 100644 --- a/internal/importer/utils.go +++ b/internal/importer/utils.go @@ -1,16 +1,13 @@ package importer import ( - "bytes" "fmt" "io" "net/url" - "os" - "path" "searchix/internal/config" "searchix/internal/nix" + "strings" - "github.com/andybalholm/brotli" "github.com/bcicen/jstream" "github.com/pkg/errors" ) @@ -69,58 +66,20 @@ func MakeChannelLink(repo config.Repository, subPath string) (*nix.Link, error) }, nil } -func setRepoRevision(filename string, source *config.Source) error { - if filename != "" { - bits, err := os.ReadFile(filename) +func setRepoRevision(file io.ReadCloser, source *config.Source) error { + if file != nil { + defer file.Close() + var str strings.Builder + _, err := io.Copy(&str, file) if err != nil { return errors.WithMessagef( err, - "unable to read revision file at %s", - filename, + "unable to read revision file", ) } - source.Repo.Revision = string(bytes.TrimSpace(bits)) - + source.Repo.Revision = strings.TrimSpace(str.String()) } return nil } - -type brotliReadCloser struct { - src io.ReadCloser - *brotli.Reader -} - -func newBrotliReader(src io.ReadCloser) *brotliReadCloser { - return &brotliReadCloser{ - src: src, - Reader: brotli.NewReader(src), - } -} - -func (r *brotliReadCloser) Close() error { - return errors.Wrap(r.src.Close(), "failed to call close on underlying reader") -} - -func openFileDecoded(filename string) (io.ReadCloser, error) { - var reader io.ReadCloser - var err error - ext := path.Ext(filename) - reader, err = os.Open(filename) - if err != nil { - return nil, errors.WithMessagef(err, "failed to open file %s", filename) - } - switch ext { - case ".json": - // nothing to do - case ".br": - reader = newBrotliReader(reader) - default: - reader.Close() - - return nil, errors.Errorf("invalid file extension %s", ext) - } - - return reader, nil -} -- cgit 1.4.1