From 0dbfe37fbddb95c184d845c79bbe014597d55fe8 Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Thu, 23 May 2024 13:14:45 +0200 Subject: feat: stream files directly from fetcher to importer Use IndexMeta to store the information relevant to making conditional updates in future runs. --- internal/index/indexer.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) (limited to 'internal/index/indexer.go') diff --git a/internal/index/indexer.go b/internal/index/indexer.go index 1f93c06..742cf5e 100644 --- a/internal/index/indexer.go +++ b/internal/index/indexer.go @@ -29,7 +29,7 @@ import ( type WriteIndex struct { index bleve.Index - meta *Meta + Meta *Meta } type BatchError struct { @@ -216,10 +216,6 @@ func OpenOrCreate(dataRoot string, force bool) (*ReadIndex, *WriteIndex, bool, e return nil, nil, false, err } - err = meta.Save() - if err != nil { - return nil, nil, false, err - } } else { idx, err = bleve.Open(indexPath) if err != nil { @@ -245,6 +241,10 @@ func OpenOrCreate(dataRoot string, force bool) (*ReadIndex, *WriteIndex, bool, e nil } +func (i *WriteIndex) SaveMeta() error { + return i.Meta.Save() +} + func (i *WriteIndex) Import( ctx context.Context, objects <-chan nix.Importable, @@ -262,7 +262,7 @@ func (i *WriteIndex) Import( for obj := range objects { select { case <-ctx.Done(): - slog.Debug("context cancelled") + slog.Warn("import aborted") break outer default: @@ -336,13 +336,17 @@ func (i *WriteIndex) Flush(batch *bleve.Batch) error { return nil } -func (i *WriteIndex) Close() error { - err := i.index.Close() - if err != nil { - return errors.WithMessagef(err, "could not close index") +func (i *WriteIndex) Close() (err error) { + if e := i.Meta.Save(); e != nil { + // index needs to be closed anyway + err = errors.WithMessage(e, "could not save metadata") } - return nil + if e := i.index.Close(); e != nil { + err = errors.WithMessagef(e, "could not close index") + } + + return err } func (i *WriteIndex) DeleteBySource(source string) error { -- cgit 1.4.1