about summary refs log tree commit diff stats
path: root/internal/index/indexer.go
diff options
context:
space:
mode:
authorAlan Pearce2024-05-23 11:45:38 +0200
committerAlan Pearce2024-05-23 11:45:38 +0200
commit3053e41b1528ef898cccd44e056e4d167619af6b (patch)
treea83da2e8cfcaf824a57d26e9e5ef55b95f14e603 /internal/index/indexer.go
parent3149660f0fcbad484a71703cf2f7f715abc130ba (diff)
downloadsearchix-3053e41b1528ef898cccd44e056e4d167619af6b.tar.lz
searchix-3053e41b1528ef898cccd44e056e4d167619af6b.tar.zst
searchix-3053e41b1528ef898cccd44e056e4d167619af6b.zip
fix: abort import of source on batch processing errors
Diffstat (limited to 'internal/index/indexer.go')
-rw-r--r--internal/index/indexer.go19
1 files changed, 18 insertions, 1 deletions
diff --git a/internal/index/indexer.go b/internal/index/indexer.go
index 3a146c3..1f93c06 100644
--- a/internal/index/indexer.go
+++ b/internal/index/indexer.go
@@ -32,6 +32,14 @@ type WriteIndex struct {
 	meta  *Meta
 }
 
+type BatchError struct {
+	error
+}
+
+func (e *BatchError) Error() string {
+	return e.error.Error()
+}
+
 const batchSize = 10_000
 
 func createIndexMapping() (mapping.IndexMapping, error) {
@@ -292,6 +300,8 @@ func (i *WriteIndex) Import(
 				err = i.Flush(batch)
 				if err != nil {
 					errs <- err
+
+					return
 				}
 			}
 		}
@@ -307,11 +317,18 @@ func (i *WriteIndex) Import(
 
 func (i *WriteIndex) Flush(batch *bleve.Batch) error {
 	size := batch.Size()
+	if size == 0 {
+		return &BatchError{
+			error: errors.New("no documents to flush"),
+		}
+	}
 	slog.Debug("flushing batch", "size", size)
 
 	err := i.index.Batch(batch)
 	if err != nil {
-		return errors.WithMessagef(err, "could not flush batch")
+		return &BatchError{
+			error: errors.WithMessagef(err, "could not flush batch"),
+		}
 	}
 
 	batch.Reset()