From 0dbfe37fbddb95c184d845c79bbe014597d55fe8 Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Thu, 23 May 2024 13:14:45 +0200 Subject: feat: stream files directly from fetcher to importer Use IndexMeta to store the information relevant to making conditional updates in future runs. --- internal/fetcher/channel.go | 61 ++++--------- internal/fetcher/download.go | 56 +++++------- internal/fetcher/http.go | 74 +++++++++------ internal/fetcher/main.go | 17 ++-- internal/fetcher/nixpkgs-channel.go | 60 ++++++------- internal/importer/main.go | 174 ++++++++++++++++++++---------------- internal/importer/utils.go | 57 ++---------- internal/index/index_meta.go | 50 +++++++++-- internal/index/indexer.go | 26 +++--- 9 files changed, 288 insertions(+), 287 deletions(-) diff --git a/internal/fetcher/channel.go b/internal/fetcher/channel.go index fd7427c..3756012 100644 --- a/internal/fetcher/channel.go +++ b/internal/fetcher/channel.go @@ -8,15 +8,15 @@ import ( "os/exec" "path" "searchix/internal/config" - "searchix/internal/file" + "searchix/internal/index" "strconv" "strings" + "time" "github.com/pkg/errors" ) type ChannelFetcher struct { - DataPath string Source *config.Source SourceFile string Logger *slog.Logger @@ -24,15 +24,13 @@ type ChannelFetcher struct { func NewChannelFetcher( source *config.Source, - dataPath string, logger *slog.Logger, ) (*ChannelFetcher, error) { switch source.Importer { case config.Options: return &ChannelFetcher{ - DataPath: dataPath, - Source: source, - Logger: logger, + Source: source, + Logger: logger, }, nil default: return nil, fmt.Errorf("unsupported importer type %s", source.Importer) @@ -40,22 +38,9 @@ func NewChannelFetcher( } func (i *ChannelFetcher) FetchIfNeeded( - parent context.Context, -) (f FetchedFiles, updated bool, err error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration) - defer cancel() - - dest := i.DataPath - - var before string - before, err = os.Readlink(dest) - if file.NeedNotExist(err) != nil { - err = errors.WithMessagef(err, "could not call readlink on file %s", dest) - - return - } - i.Logger.Debug("stat before", "name", before) - + ctx context.Context, + sourceMeta *index.SourceMeta, +) (f FetchedFiles, err error) { args := []string{ "--no-build-output", "--timeout", @@ -63,8 +48,7 @@ func (i *ChannelFetcher) FetchIfNeeded( fmt.Sprintf("<%s/%s>", i.Source.Channel, i.Source.ImportPath), "--attr", i.Source.Attribute, - "--out-link", - dest, + "--no-out-link", } if i.Source.URL != "" { @@ -80,35 +64,28 @@ func (i *ChannelFetcher) FetchIfNeeded( return } - i.Logger.Debug("nix-build", "output", strings.TrimSpace(string(out))) - outPath := path.Join(dest, i.Source.OutputPath) + outPath := path.Join(strings.TrimSpace(string(out)), i.Source.OutputPath, "options.json") i.Logger.Debug( "checking output path", "outputPath", outPath, - "dest", - dest, - "source", - i.Source.OutputPath, ) - var after string - after, err = os.Readlink(dest) - if err = file.NeedNotExist(err); err != nil { - err = errors.WithMessagef( - err, - "failed to stat output file from nix-build, filename: %s", - outPath, - ) - return + if outPath != sourceMeta.Path { + sourceMeta.Path = outPath + sourceMeta.Updated = time.Now().Truncate(time.Second) } - i.Logger.Debug("stat after", "name", after) - updated = before != after + file, err := os.Open(outPath) + if err != nil { + err = errors.WithMessage(err, "failed to open options.json") + + return + } f = FetchedFiles{ - Options: path.Join(dest, i.Source.OutputPath, "options.json"), + Options: file, } return diff --git a/internal/fetcher/download.go b/internal/fetcher/download.go index 2c7b8fd..59ef8d1 100644 --- a/internal/fetcher/download.go +++ b/internal/fetcher/download.go @@ -5,15 +5,13 @@ import ( "fmt" "log/slog" "net/url" - "path" "searchix/internal/config" - "searchix/internal/file" + "searchix/internal/index" "github.com/pkg/errors" ) type DownloadFetcher struct { - DataPath string Source *config.Source SourceFile string Logger *slog.Logger @@ -21,15 +19,13 @@ type DownloadFetcher struct { func NewDownloadFetcher( source *config.Source, - dataPath string, logger *slog.Logger, ) (*DownloadFetcher, error) { switch source.Importer { case config.Options: return &DownloadFetcher{ - DataPath: dataPath, - Source: source, - Logger: logger, + Source: source, + Logger: logger, }, nil default: return nil, fmt.Errorf("unsupported importer type %s", source.Importer) @@ -42,22 +38,11 @@ var files = map[string]string{ } func (i *DownloadFetcher) FetchIfNeeded( - parent context.Context, -) (f FetchedFiles, updated bool, err error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration) - defer cancel() - - root := i.DataPath - - err = file.Mkdirp(root) - if err != nil { - err = errors.WithMessagef(err, "error creating directory for data: %s", root) - - return - } - + ctx context.Context, + sourceMeta *index.SourceMeta, +) (f FetchedFiles, err error) { var fetchURL string - for _, filename := range files { + for key, filename := range files { fetchURL, err = url.JoinPath(i.Source.URL, filename) if err != nil { err = errors.WithMessagef( @@ -70,23 +55,28 @@ func (i *DownloadFetcher) FetchIfNeeded( return } - outPath := path.Join(root, filename) - - i.Logger.Debug("preparing to fetch URL", "url", fetchURL, "path", outPath) + i.Logger.Debug("preparing to fetch URL", "url", fetchURL) - updated, err = fetchFileIfNeeded(ctx, outPath, fetchURL) + body, mtime, err := fetchFileIfNeeded(ctx, sourceMeta.Updated, fetchURL) if err != nil { - return + i.Logger.Warn("failed to fetch file", "url", fetchURL, "error", err) + + return f, err } // don't bother to issue requests for the later files - if !updated { + if mtime.Before(sourceMeta.Updated) { break } - } - - f = FetchedFiles{ - Revision: path.Join(root, "revision"), - Options: path.Join(root, "options.json"), + sourceMeta.Updated = mtime + + switch key { + case "revision": + f.Revision = body + case "options": + f.Options = body + default: + return f, errors.Errorf("unknown file kind %s", key) + } } return diff --git a/internal/fetcher/http.go b/internal/fetcher/http.go index 9afbbc0..675c3b3 100644 --- a/internal/fetcher/http.go +++ b/internal/fetcher/http.go @@ -3,68 +3,90 @@ package fetcher import ( "context" "fmt" + "io" "log/slog" "net/http" - "os" "searchix/internal/config" - "searchix/internal/file" "strings" "time" + "github.com/andybalholm/brotli" "github.com/pkg/errors" ) -func fetchFileIfNeeded(ctx context.Context, path string, url string) (needed bool, err error) { - stat, err := file.StatIfExists(path) - if err != nil { - return false, errors.WithMessagef(err, "could not stat file %s", path) +type brotliReadCloser struct { + src io.ReadCloser + *brotli.Reader +} + +func newBrotliReader(src io.ReadCloser) *brotliReadCloser { + return &brotliReadCloser{ + src: src, + Reader: brotli.NewReader(src), } +} - var mtime string - if stat != nil { - mtime = strings.Replace(stat.ModTime().UTC().Format(time.RFC1123), "UTC", "GMT", 1) +func (r *brotliReadCloser) Close() error { + return errors.Wrap(r.src.Close(), "failed to call close on underlying reader") +} + +func fetchFileIfNeeded( + ctx context.Context, + mtime time.Time, + url string, +) (body io.ReadCloser, newMtime time.Time, err error) { + var ifModifiedSince string + if !mtime.IsZero() { + ifModifiedSince = strings.Replace(mtime.UTC().Format(time.RFC1123), "UTC", "GMT", 1) } req, err := http.NewRequestWithContext(ctx, "GET", url, http.NoBody) if err != nil { - return false, errors.WithMessagef(err, "could not create HTTP request for %s", url) + err = errors.WithMessagef(err, "could not create HTTP request for %s", url) + + return } req.Header.Set("User-Agent", fmt.Sprintf("Searchix %s", config.ShortSHA)) - if mtime != "" { - req.Header.Set("If-Modified-Since", mtime) + if ifModifiedSince != "" { + req.Header.Set("If-Modified-Since", ifModifiedSince) } res, err := http.DefaultClient.Do(req) if err != nil { - return false, errors.WithMessagef(err, "could not make HTTP request to %s", url) + err = errors.WithMessagef(err, "could not make HTTP request to %s", url) + + return } - defer res.Body.Close() switch res.StatusCode { case http.StatusNotModified: - needed = false + newMtime = mtime + + return case http.StatusOK: - newMtime, err := time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) + newMtime, err = time.Parse(time.RFC1123, res.Header.Get("Last-Modified")) if err != nil { slog.Warn( "could not parse Last-Modified header from response", "value", res.Header.Get("Last-Modified"), ) + newMtime = time.Now() } - err = file.WriteToFile(path, res.Body) - if err != nil { - return false, errors.WithMessagef(err, "could not write response body to file %s", path) - } - err = os.Chtimes(path, time.Time{}, newMtime) - if err != nil { - slog.Warn("could not update mtime on file", "file", path) + + switch ce := res.Header.Get("Content-Encoding"); ce { + case "br": + slog.Debug("using brotli encoding") + body = newBrotliReader(res.Body) + case "", "identity", "gzip": + body = res.Body + default: + err = fmt.Errorf("cannot handle a body with content-encoding %s", ce) } - needed = true default: - return false, fmt.Errorf("got response code %d, don't know what to do", res.StatusCode) + err = fmt.Errorf("got response code %d, don't know what to do", res.StatusCode) } - return needed, nil + return } diff --git a/internal/fetcher/main.go b/internal/fetcher/main.go index 65f62db..7ea0b03 100644 --- a/internal/fetcher/main.go +++ b/internal/fetcher/main.go @@ -2,34 +2,35 @@ package fetcher import ( "context" + "io" "log/slog" "searchix/internal/config" + "searchix/internal/index" "github.com/pkg/errors" ) type FetchedFiles struct { - Revision string - Options string - Packages string + Revision io.ReadCloser + Options io.ReadCloser + Packages io.ReadCloser } type Fetcher interface { - FetchIfNeeded(context.Context) (FetchedFiles, bool, error) + FetchIfNeeded(context.Context, *index.SourceMeta) (FetchedFiles, error) } func New( source *config.Source, - fetcherDataPath string, logger *slog.Logger, ) (fetcher Fetcher, err error) { switch source.Fetcher { case config.ChannelNixpkgs: - fetcher, err = NewNixpkgsChannelFetcher(source, fetcherDataPath, logger) + fetcher, err = NewNixpkgsChannelFetcher(source, logger) case config.Channel: - fetcher, err = NewChannelFetcher(source, fetcherDataPath, logger) + fetcher, err = NewChannelFetcher(source, logger) case config.Download: - fetcher, err = NewDownloadFetcher(source, fetcherDataPath, logger) + fetcher, err = NewDownloadFetcher(source, logger) default: err = errors.Errorf("unsupported fetcher type %s", source.Fetcher.String()) } diff --git a/internal/fetcher/nixpkgs-channel.go b/internal/fetcher/nixpkgs-channel.go index 033b577..62fea13 100644 --- a/internal/fetcher/nixpkgs-channel.go +++ b/internal/fetcher/nixpkgs-channel.go @@ -5,17 +5,15 @@ import ( "fmt" "log/slog" "net/url" - "path" "searchix/internal/config" - "searchix/internal/file" + "searchix/internal/index" "github.com/pkg/errors" ) type NixpkgsChannelFetcher struct { - DataPath string - Source *config.Source - Logger *slog.Logger + Source *config.Source + Logger *slog.Logger } func makeChannelURL(channel string, subPath string) (string, error) { @@ -26,15 +24,13 @@ func makeChannelURL(channel string, subPath string) (string, error) { func NewNixpkgsChannelFetcher( source *config.Source, - dataPath string, logger *slog.Logger, ) (*NixpkgsChannelFetcher, error) { switch source.Importer { case config.Options, config.Packages: return &NixpkgsChannelFetcher{ - DataPath: dataPath, - Source: source, - Logger: logger, + Source: source, + Logger: logger, }, nil default: return nil, fmt.Errorf("unsupported importer type %s", source.Importer) @@ -48,19 +44,9 @@ const ( ) func (i *NixpkgsChannelFetcher) FetchIfNeeded( - parent context.Context, -) (f FetchedFiles, updated bool, err error) { - ctx, cancel := context.WithTimeout(parent, i.Source.FetchTimeout.Duration) - defer cancel() - - root := i.DataPath - - err = file.Mkdirp(root) - if err != nil { - err = errors.WithMessagef(err, "error creating directory for data: %s", root) - - return - } + ctx context.Context, + sourceMeta *index.SourceMeta, +) (f FetchedFiles, err error) { filesToFetch := make([]string, 2) filesToFetch[0] = revisionFilename @@ -78,23 +64,29 @@ func (i *NixpkgsChannelFetcher) FetchIfNeeded( return } - outPath := path.Join(root, filename) - - i.Logger.Debug("attempting to fetch file", "url", fetchURL, "outPath", outPath) - updated, err = fetchFileIfNeeded(ctx, outPath, fetchURL) + i.Logger.Debug("attempting to fetch file", "url", fetchURL) + body, mtime, err := fetchFileIfNeeded(ctx, sourceMeta.Updated, fetchURL) if err != nil { - return + i.Logger.Warn("failed to fetch file", "url", fetchURL, "error", err) + + return f, err } // don't bother to issue requests for the later files - if !updated { + if mtime.Before(sourceMeta.Updated) { break } - } - - f = FetchedFiles{ - Revision: path.Join(root, "git-revision"), - Options: path.Join(root, "options.json.br"), - Packages: path.Join(root, "packages.json.br"), + sourceMeta.Updated = mtime + + switch filename { + case revisionFilename: + f.Revision = body + case optionsFilename: + f.Options = body + case packagesFileName: + f.Packages = body + default: + return f, errors.Errorf("unknown file kind %s", filename) + } } return diff --git a/internal/importer/main.go b/internal/importer/main.go index 6f462c3..6be5b45 100644 --- a/internal/importer/main.go +++ b/internal/importer/main.go @@ -2,56 +2,49 @@ package importer import ( "context" - "io" + "fmt" "log/slog" "os/exec" - "path" "searchix/internal/config" "searchix/internal/fetcher" "searchix/internal/index" "slices" "strings" + "time" "github.com/pkg/errors" ) -func Start( - cfg *config.Config, +func createSourceImporter( + parent context.Context, + meta *index.Meta, indexer *index.WriteIndex, forceUpdate bool, - onlyUpdateSources *[]string, -) error { - if len(cfg.Importer.Sources) == 0 { - slog.Info("No sources enabled") - - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration) - defer cancel() - - forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) - - for name, source := range cfg.Importer.Sources { - if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 { - if !slices.Contains(*onlyUpdateSources, name) { - continue - } - } - - logger := slog.With("name", name, "fetcher", source.Fetcher.String()) +) func(*config.Source) error { + return func(source *config.Source) error { + logger := slog.With( + "name", + source.Key, + "fetcher", + source.Fetcher.String(), + "timeout", + source.FetchTimeout.Duration, + ) logger.Debug("starting fetcher") - fetcherDataPath := path.Join(cfg.DataPath, "sources", source.Key) - - fetcher, err := fetcher.New(source, fetcherDataPath, logger) + fetcher, err := fetcher.New(source, logger) if err != nil { - logger.Error("error creating fetcher", "error", err) - - continue + return errors.WithMessage(err, "error creating fetcher") } - files, updated, err := fetcher.FetchIfNeeded(ctx) + sourceMeta := meta.GetSourceMeta(source.Key) + if forceUpdate { + sourceMeta.Updated = time.Time{} + } + previousUpdate := sourceMeta.Updated + ctx, cancel := context.WithTimeout(parent, source.FetchTimeout.Duration) + defer cancel() + files, err := fetcher.FetchIfNeeded(ctx, &sourceMeta) if err != nil { var exerr *exec.ExitError @@ -65,70 +58,52 @@ func Start( "status", exerr.ExitCode(), ) - - continue } - } else { - logger.Error("importer fetch failed", "error", err) - - continue } - continue + return errors.WithMessage(err, "importer fetch failed") } - logger.Info("importer fetch succeeded", "updated", updated) + logger.Info( + "importer fetch succeeded", + "previous", + previousUpdate, + "current", + sourceMeta.Updated, + "is_updated", + sourceMeta.Updated.After(previousUpdate), + "update_force", + forceUpdate, + ) + + if sourceMeta.Updated.After(previousUpdate) || forceUpdate { - if updated || forceUpdate { err = setRepoRevision(files.Revision, source) if err != nil { logger.Warn("could not set source repo revision", "error", err) } - var file io.ReadCloser var processor Processor + logger.Debug( + "creating processor", + "importer_type", + source.Importer, + "revision", + source.Repo.Revision, + ) switch source.Importer { case config.Options: - logger.Debug( - "creating processor", - "filename", - files.Options, - "revision", - source.Repo.Revision, - ) - file, err = openFileDecoded(files.Options) - if err != nil { - logger.Error("could not open file", "filename", files.Options, "error", err) - - continue - } - processor, err = NewOptionProcessor(file, source) + logger.Debug("processor created", "file", fmt.Sprintf("%T", files.Options)) + processor, err = NewOptionProcessor(files.Options, source) case config.Packages: - logger.Debug( - "creating processor", - "filename", - files.Packages, - "revision", - source.Repo.Revision, - ) - file, err = openFileDecoded(files.Packages) - if err != nil { - logger.Error("could not open file", "filename", files.Packages, "error", err) - - continue - } - processor, err = NewPackageProcessor(file, source) + processor, err = NewPackageProcessor(files.Packages, source) } if err != nil { - logger.Error("failed to create processor", "type", source.Importer, "error", err) - - continue + return errors.WithMessagef(err, "failed to create processor") } hadWarnings, err := process(ctx, indexer, processor, logger) if err != nil { - logger.Error("failed to process source", "error", err) - - continue + return errors.WithMessagef(err, "failed to process source") } if hadWarnings { @@ -137,6 +112,53 @@ func Start( logger.Info("importer succeeded") } } + + sourceMeta.Rev = source.Repo.Revision + meta.SetSourceMeta(source.Key, sourceMeta) + + return nil + } +} + +func Start( + cfg *config.Config, + indexer *index.WriteIndex, + forceUpdate bool, + onlyUpdateSources *[]string, +) error { + if len(cfg.Importer.Sources) == 0 { + slog.Info("No sources enabled") + + return nil + } + + slog.Debug("starting importer", "timeout", cfg.Importer.Timeout.Duration) + importCtx, cancelImport := context.WithTimeout( + context.Background(), + cfg.Importer.Timeout.Duration, + ) + defer cancelImport() + + forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) + + meta := indexer.Meta + + importSource := createSourceImporter(importCtx, meta, indexer, forceUpdate) + for name, source := range cfg.Importer.Sources { + if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 { + if !slices.Contains(*onlyUpdateSources, name) { + continue + } + } + err := importSource(source) + if err != nil { + slog.Error("import failed", "source", name, "error", err) + } + } + + err := indexer.SaveMeta() + if err != nil { + return errors.Wrap(err, "failed to save metadata") } return nil diff --git a/internal/importer/utils.go b/internal/importer/utils.go index 425b7bd..7c53173 100644 --- a/internal/importer/utils.go +++ b/internal/importer/utils.go @@ -1,16 +1,13 @@ package importer import ( - "bytes" "fmt" "io" "net/url" - "os" - "path" "searchix/internal/config" "searchix/internal/nix" + "strings" - "github.com/andybalholm/brotli" "github.com/bcicen/jstream" "github.com/pkg/errors" ) @@ -69,58 +66,20 @@ func MakeChannelLink(repo config.Repository, subPath string) (*nix.Link, error) }, nil } -func setRepoRevision(filename string, source *config.Source) error { - if filename != "" { - bits, err := os.ReadFile(filename) +func setRepoRevision(file io.ReadCloser, source *config.Source) error { + if file != nil { + defer file.Close() + var str strings.Builder + _, err := io.Copy(&str, file) if err != nil { return errors.WithMessagef( err, - "unable to read revision file at %s", - filename, + "unable to read revision file", ) } - source.Repo.Revision = string(bytes.TrimSpace(bits)) - + source.Repo.Revision = strings.TrimSpace(str.String()) } return nil } - -type brotliReadCloser struct { - src io.ReadCloser - *brotli.Reader -} - -func newBrotliReader(src io.ReadCloser) *brotliReadCloser { - return &brotliReadCloser{ - src: src, - Reader: brotli.NewReader(src), - } -} - -func (r *brotliReadCloser) Close() error { - return errors.Wrap(r.src.Close(), "failed to call close on underlying reader") -} - -func openFileDecoded(filename string) (io.ReadCloser, error) { - var reader io.ReadCloser - var err error - ext := path.Ext(filename) - reader, err = os.Open(filename) - if err != nil { - return nil, errors.WithMessagef(err, "failed to open file %s", filename) - } - switch ext { - case ".json": - // nothing to do - case ".br": - reader = newBrotliReader(reader) - default: - reader.Close() - - return nil, errors.Errorf("invalid file extension %s", ext) - } - - return reader, nil -} diff --git a/internal/index/index_meta.go b/internal/index/index_meta.go index e24cd3b..f28abc4 100644 --- a/internal/index/index_meta.go +++ b/internal/index/index_meta.go @@ -5,15 +5,27 @@ import ( "log/slog" "os" "searchix/internal/file" + "time" "github.com/pkg/errors" ) -const CurrentSchemaVersion = 1 +const CurrentSchemaVersion = 2 -type Meta struct { - path string +type SourceMeta struct { + Updated time.Time + Path string + Rev string +} + +type data struct { SchemaVersion int + Sources map[string]*SourceMeta +} + +type Meta struct { + path string + data } func createMeta(path string) (*Meta, error) { @@ -26,8 +38,10 @@ func createMeta(path string) (*Meta, error) { } return &Meta{ - path: path, - SchemaVersion: CurrentSchemaVersion, + path: path, + data: data{ + SchemaVersion: CurrentSchemaVersion, + }, }, nil } @@ -36,8 +50,10 @@ func openMeta(path string) (*Meta, error) { if err != nil { return nil, errors.WithMessage(err, "could not open index metadata file") } - var meta Meta - err = json.Unmarshal(j, &meta) + meta := Meta{ + path: path, + } + err = json.Unmarshal(j, &meta.data) if err != nil { return nil, errors.WithMessage(err, "index metadata is corrupt, try replacing the index") } @@ -60,10 +76,12 @@ func (i *Meta) checkSchemaVersion() { } func (i *Meta) Save() error { - j, err := json.Marshal(i) + i.SchemaVersion = CurrentSchemaVersion + j, err := json.Marshal(i.data) if err != nil { return errors.WithMessage(err, "could not prepare index metadata for saving") } + slog.Debug("saving index metadata", "path", i.path) err = os.WriteFile(i.path, j, 0o600) if err != nil { return errors.WithMessage(err, "could not save index metadata") @@ -71,3 +89,19 @@ func (i *Meta) Save() error { return nil } + +func (i *Meta) GetSourceMeta(source string) SourceMeta { + sourceMeta := i.data.Sources[source] + if sourceMeta == nil { + return SourceMeta{} + } + + return *sourceMeta +} + +func (i *Meta) SetSourceMeta(source string, meta SourceMeta) { + if i.data.Sources == nil { + i.data.Sources = make(map[string]*SourceMeta) + } + i.data.Sources[source] = &meta +} diff --git a/internal/index/indexer.go b/internal/index/indexer.go index 1f93c06..742cf5e 100644 --- a/internal/index/indexer.go +++ b/internal/index/indexer.go @@ -29,7 +29,7 @@ import ( type WriteIndex struct { index bleve.Index - meta *Meta + Meta *Meta } type BatchError struct { @@ -216,10 +216,6 @@ func OpenOrCreate(dataRoot string, force bool) (*ReadIndex, *WriteIndex, bool, e return nil, nil, false, err } - err = meta.Save() - if err != nil { - return nil, nil, false, err - } } else { idx, err = bleve.Open(indexPath) if err != nil { @@ -245,6 +241,10 @@ func OpenOrCreate(dataRoot string, force bool) (*ReadIndex, *WriteIndex, bool, e nil } +func (i *WriteIndex) SaveMeta() error { + return i.Meta.Save() +} + func (i *WriteIndex) Import( ctx context.Context, objects <-chan nix.Importable, @@ -262,7 +262,7 @@ func (i *WriteIndex) Import( for obj := range objects { select { case <-ctx.Done(): - slog.Debug("context cancelled") + slog.Warn("import aborted") break outer default: @@ -336,13 +336,17 @@ func (i *WriteIndex) Flush(batch *bleve.Batch) error { return nil } -func (i *WriteIndex) Close() error { - err := i.index.Close() - if err != nil { - return errors.WithMessagef(err, "could not close index") +func (i *WriteIndex) Close() (err error) { + if e := i.Meta.Save(); e != nil { + // index needs to be closed anyway + err = errors.WithMessage(e, "could not save metadata") } - return nil + if e := i.index.Close(); e != nil { + err = errors.WithMessagef(e, "could not close index") + } + + return err } func (i *WriteIndex) DeleteBySource(source string) error { -- cgit 1.4.1