diff options
author | Alan Pearce | 2024-05-23 11:45:38 +0200 |
---|---|---|
committer | Alan Pearce | 2024-05-23 11:45:38 +0200 |
commit | 3053e41b1528ef898cccd44e056e4d167619af6b (patch) | |
tree | a83da2e8cfcaf824a57d26e9e5ef55b95f14e603 /internal/index | |
parent | 3149660f0fcbad484a71703cf2f7f715abc130ba (diff) | |
download | searchix-3053e41b1528ef898cccd44e056e4d167619af6b.tar.lz searchix-3053e41b1528ef898cccd44e056e4d167619af6b.tar.zst searchix-3053e41b1528ef898cccd44e056e4d167619af6b.zip |
fix: abort import of source on batch processing errors
Diffstat (limited to 'internal/index')
-rw-r--r-- | internal/index/indexer.go | 19 |
1 files changed, 18 insertions, 1 deletions
diff --git a/internal/index/indexer.go b/internal/index/indexer.go index 3a146c3..1f93c06 100644 --- a/internal/index/indexer.go +++ b/internal/index/indexer.go @@ -32,6 +32,14 @@ type WriteIndex struct { meta *Meta } +type BatchError struct { + error +} + +func (e *BatchError) Error() string { + return e.error.Error() +} + const batchSize = 10_000 func createIndexMapping() (mapping.IndexMapping, error) { @@ -292,6 +300,8 @@ func (i *WriteIndex) Import( err = i.Flush(batch) if err != nil { errs <- err + + return } } } @@ -307,11 +317,18 @@ func (i *WriteIndex) Import( func (i *WriteIndex) Flush(batch *bleve.Batch) error { size := batch.Size() + if size == 0 { + return &BatchError{ + error: errors.New("no documents to flush"), + } + } slog.Debug("flushing batch", "size", size) err := i.index.Batch(batch) if err != nil { - return errors.WithMessagef(err, "could not flush batch") + return &BatchError{ + error: errors.WithMessagef(err, "could not flush batch"), + } } batch.Reset() |