diff options
-rw-r--r-- | internal/index/indexer.go | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/internal/index/indexer.go b/internal/index/indexer.go index 3665785..da2fab3 100644 --- a/internal/index/indexer.go +++ b/internal/index/indexer.go @@ -31,6 +31,8 @@ type WriteIndex struct { meta *Meta } +const batchSize = 10_000 + func createIndexMapping() (mapping.IndexMapping, error) { indexMapping := bleve.NewIndexMapping() indexMapping.StoreDynamic = false @@ -233,6 +235,7 @@ func (i *WriteIndex) Import( go func() { defer close(errs) + k := 0 batch := i.index.NewBatch() indexMapping := i.index.Mapping() @@ -273,20 +276,38 @@ func (i *WriteIndex) Import( continue } - } - size := batch.Size() - slog.Debug("flushing batch", "size", size) + if k++; k%batchSize == 0 { + err = i.Flush(batch) + if err != nil { + errs <- err + } + } + } - err := i.index.Batch(batch) + err := i.Flush(batch) if err != nil { - errs <- errors.WithMessagef(err, "could not flush batch") + errs <- err } }() return errs } +func (i *WriteIndex) Flush(batch *bleve.Batch) error { + size := batch.Size() + slog.Debug("flushing batch", "size", size) + + err := i.index.Batch(batch) + if err != nil { + return errors.WithMessagef(err, "could not flush batch") + } + + batch.Reset() + + return nil +} + func (i *WriteIndex) Close() error { err := i.index.Close() if err != nil { |