about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorAlan Pearce2024-05-17 11:22:54 +0200
committerAlan Pearce2024-05-17 12:21:19 +0200
commit8eb869f10a08e93f67a7feb50dc73600bc5304fb (patch)
tree16f2a27d5df4c74f88035417af061daba639440d
parentc5c9f726cae4cf4488312338da9599da53ac8b6c (diff)
downloadsearchix-8eb869f10a08e93f67a7feb50dc73600bc5304fb.tar.lz
searchix-8eb869f10a08e93f67a7feb50dc73600bc5304fb.tar.zst
searchix-8eb869f10a08e93f67a7feb50dc73600bc5304fb.zip
perf: flush index batch in groups of 10000
-rw-r--r--internal/index/indexer.go31
1 files changed, 26 insertions, 5 deletions
diff --git a/internal/index/indexer.go b/internal/index/indexer.go
index 3665785..da2fab3 100644
--- a/internal/index/indexer.go
+++ b/internal/index/indexer.go
@@ -31,6 +31,8 @@ type WriteIndex struct {
 	meta  *Meta
 }
 
+const batchSize = 10_000
+
 func createIndexMapping() (mapping.IndexMapping, error) {
 	indexMapping := bleve.NewIndexMapping()
 	indexMapping.StoreDynamic = false
@@ -233,6 +235,7 @@ func (i *WriteIndex) Import(
 
 	go func() {
 		defer close(errs)
+		k := 0
 		batch := i.index.NewBatch()
 		indexMapping := i.index.Mapping()
 
@@ -273,20 +276,38 @@ func (i *WriteIndex) Import(
 
 				continue
 			}
-		}
 
-		size := batch.Size()
-		slog.Debug("flushing batch", "size", size)
+			if k++; k%batchSize == 0 {
+				err = i.Flush(batch)
+				if err != nil {
+					errs <- err
+				}
+			}
+		}
 
-		err := i.index.Batch(batch)
+		err := i.Flush(batch)
 		if err != nil {
-			errs <- errors.WithMessagef(err, "could not flush batch")
+			errs <- err
 		}
 	}()
 
 	return errs
 }
 
+func (i *WriteIndex) Flush(batch *bleve.Batch) error {
+	size := batch.Size()
+	slog.Debug("flushing batch", "size", size)
+
+	err := i.index.Batch(batch)
+	if err != nil {
+		return errors.WithMessagef(err, "could not flush batch")
+	}
+
+	batch.Reset()
+
+	return nil
+}
+
 func (i *WriteIndex) Close() error {
 	err := i.index.Close()
 	if err != nil {