about summary refs log tree commit diff stats
path: root/internal/index
diff options
context:
space:
mode:
Diffstat (limited to 'internal/index')
-rw-r--r--internal/index/indexer.go31
1 files changed, 26 insertions, 5 deletions
diff --git a/internal/index/indexer.go b/internal/index/indexer.go
index 3665785..da2fab3 100644
--- a/internal/index/indexer.go
+++ b/internal/index/indexer.go
@@ -31,6 +31,8 @@ type WriteIndex struct {
 	meta  *Meta
 }
 
+const batchSize = 10_000
+
 func createIndexMapping() (mapping.IndexMapping, error) {
 	indexMapping := bleve.NewIndexMapping()
 	indexMapping.StoreDynamic = false
@@ -233,6 +235,7 @@ func (i *WriteIndex) Import(
 
 	go func() {
 		defer close(errs)
+		k := 0
 		batch := i.index.NewBatch()
 		indexMapping := i.index.Mapping()
 
@@ -273,20 +276,38 @@ func (i *WriteIndex) Import(
 
 				continue
 			}
-		}
 
-		size := batch.Size()
-		slog.Debug("flushing batch", "size", size)
+			if k++; k%batchSize == 0 {
+				err = i.Flush(batch)
+				if err != nil {
+					errs <- err
+				}
+			}
+		}
 
-		err := i.index.Batch(batch)
+		err := i.Flush(batch)
 		if err != nil {
-			errs <- errors.WithMessagef(err, "could not flush batch")
+			errs <- err
 		}
 	}()
 
 	return errs
 }
 
+func (i *WriteIndex) Flush(batch *bleve.Batch) error {
+	size := batch.Size()
+	slog.Debug("flushing batch", "size", size)
+
+	err := i.index.Batch(batch)
+	if err != nil {
+		return errors.WithMessagef(err, "could not flush batch")
+	}
+
+	batch.Reset()
+
+	return nil
+}
+
 func (i *WriteIndex) Close() error {
 	err := i.index.Close()
 	if err != nil {