From 8eb869f10a08e93f67a7feb50dc73600bc5304fb Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Fri, 17 May 2024 11:22:54 +0200 Subject: perf: flush index batch in groups of 10000 --- internal/index/indexer.go | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) (limited to 'internal/index/indexer.go') diff --git a/internal/index/indexer.go b/internal/index/indexer.go index 3665785..da2fab3 100644 --- a/internal/index/indexer.go +++ b/internal/index/indexer.go @@ -31,6 +31,8 @@ type WriteIndex struct { meta *Meta } +const batchSize = 10_000 + func createIndexMapping() (mapping.IndexMapping, error) { indexMapping := bleve.NewIndexMapping() indexMapping.StoreDynamic = false @@ -233,6 +235,7 @@ func (i *WriteIndex) Import( go func() { defer close(errs) + k := 0 batch := i.index.NewBatch() indexMapping := i.index.Mapping() @@ -273,20 +276,38 @@ func (i *WriteIndex) Import( continue } - } - size := batch.Size() - slog.Debug("flushing batch", "size", size) + if k++; k%batchSize == 0 { + err = i.Flush(batch) + if err != nil { + errs <- err + } + } + } - err := i.index.Batch(batch) + err := i.Flush(batch) if err != nil { - errs <- errors.WithMessagef(err, "could not flush batch") + errs <- err } }() return errs } +func (i *WriteIndex) Flush(batch *bleve.Batch) error { + size := batch.Size() + slog.Debug("flushing batch", "size", size) + + err := i.index.Batch(batch) + if err != nil { + return errors.WithMessagef(err, "could not flush batch") + } + + batch.Reset() + + return nil +} + func (i *WriteIndex) Close() error { err := i.index.Close() if err != nil { -- cgit 1.4.1