From 0dbfe37fbddb95c184d845c79bbe014597d55fe8 Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Thu, 23 May 2024 13:14:45 +0200 Subject: feat: stream files directly from fetcher to importer Use IndexMeta to store the information relevant to making conditional updates in future runs. --- internal/index/index_meta.go | 50 +++++++++++++++++++++++++++++++++++++------- internal/index/indexer.go | 26 +++++++++++++---------- 2 files changed, 57 insertions(+), 19 deletions(-) (limited to 'internal/index') diff --git a/internal/index/index_meta.go b/internal/index/index_meta.go index e24cd3b..f28abc4 100644 --- a/internal/index/index_meta.go +++ b/internal/index/index_meta.go @@ -5,15 +5,27 @@ import ( "log/slog" "os" "searchix/internal/file" + "time" "github.com/pkg/errors" ) -const CurrentSchemaVersion = 1 +const CurrentSchemaVersion = 2 -type Meta struct { - path string +type SourceMeta struct { + Updated time.Time + Path string + Rev string +} + +type data struct { SchemaVersion int + Sources map[string]*SourceMeta +} + +type Meta struct { + path string + data } func createMeta(path string) (*Meta, error) { @@ -26,8 +38,10 @@ func createMeta(path string) (*Meta, error) { } return &Meta{ - path: path, - SchemaVersion: CurrentSchemaVersion, + path: path, + data: data{ + SchemaVersion: CurrentSchemaVersion, + }, }, nil } @@ -36,8 +50,10 @@ func openMeta(path string) (*Meta, error) { if err != nil { return nil, errors.WithMessage(err, "could not open index metadata file") } - var meta Meta - err = json.Unmarshal(j, &meta) + meta := Meta{ + path: path, + } + err = json.Unmarshal(j, &meta.data) if err != nil { return nil, errors.WithMessage(err, "index metadata is corrupt, try replacing the index") } @@ -60,10 +76,12 @@ func (i *Meta) checkSchemaVersion() { } func (i *Meta) Save() error { - j, err := json.Marshal(i) + i.SchemaVersion = CurrentSchemaVersion + j, err := json.Marshal(i.data) if err != nil { return errors.WithMessage(err, "could not prepare index metadata for saving") } + slog.Debug("saving index metadata", "path", i.path) err = os.WriteFile(i.path, j, 0o600) if err != nil { return errors.WithMessage(err, "could not save index metadata") @@ -71,3 +89,19 @@ func (i *Meta) Save() error { return nil } + +func (i *Meta) GetSourceMeta(source string) SourceMeta { + sourceMeta := i.data.Sources[source] + if sourceMeta == nil { + return SourceMeta{} + } + + return *sourceMeta +} + +func (i *Meta) SetSourceMeta(source string, meta SourceMeta) { + if i.data.Sources == nil { + i.data.Sources = make(map[string]*SourceMeta) + } + i.data.Sources[source] = &meta +} diff --git a/internal/index/indexer.go b/internal/index/indexer.go index 1f93c06..742cf5e 100644 --- a/internal/index/indexer.go +++ b/internal/index/indexer.go @@ -29,7 +29,7 @@ import ( type WriteIndex struct { index bleve.Index - meta *Meta + Meta *Meta } type BatchError struct { @@ -216,10 +216,6 @@ func OpenOrCreate(dataRoot string, force bool) (*ReadIndex, *WriteIndex, bool, e return nil, nil, false, err } - err = meta.Save() - if err != nil { - return nil, nil, false, err - } } else { idx, err = bleve.Open(indexPath) if err != nil { @@ -245,6 +241,10 @@ func OpenOrCreate(dataRoot string, force bool) (*ReadIndex, *WriteIndex, bool, e nil } +func (i *WriteIndex) SaveMeta() error { + return i.Meta.Save() +} + func (i *WriteIndex) Import( ctx context.Context, objects <-chan nix.Importable, @@ -262,7 +262,7 @@ func (i *WriteIndex) Import( for obj := range objects { select { case <-ctx.Done(): - slog.Debug("context cancelled") + slog.Warn("import aborted") break outer default: @@ -336,13 +336,17 @@ func (i *WriteIndex) Flush(batch *bleve.Batch) error { return nil } -func (i *WriteIndex) Close() error { - err := i.index.Close() - if err != nil { - return errors.WithMessagef(err, "could not close index") +func (i *WriteIndex) Close() (err error) { + if e := i.Meta.Save(); e != nil { + // index needs to be closed anyway + err = errors.WithMessage(e, "could not save metadata") } - return nil + if e := i.index.Close(); e != nil { + err = errors.WithMessagef(e, "could not close index") + } + + return err } func (i *WriteIndex) DeleteBySource(source string) error { -- cgit 1.4.1