all repos — searchix @ 3053e41b1528ef898cccd44e056e4d167619af6b

Search engine for NixOS, nix-darwin, home-manager and NUR users

fix: abort import of source on batch processing errors
Alan Pearce alan@alanpearce.eu
Thu, 23 May 2024 11:45:38 +0200
commit

3053e41b1528ef898cccd44e056e4d167619af6b

parent

3149660f0fcbad484a71703cf2f7f715abc130ba

3 files changed, 36 insertions(+), 7 deletions(-)

jump to
M internal/importer/importer.gointernal/importer/importer.go
@@ -21,7 +21,7 @@ ctx context.Context, 	indexer *index.WriteIndex,
 	processor Processor,
 	logger *slog.Logger,
-) bool {
+) (bool, error) {
 	wg := sync.WaitGroup{}
 
 	wg.Add(1)
@@ -30,7 +30,8 @@ 	wg.Add(1)
 	iErrs := indexer.Import(ctx, objects)
 
-	var hadErrors bool
+	var hadObjectErrors bool
+	var criticalError error
 	go func() {
 		for {
 			select {
@@ -42,7 +43,13 @@ logger.Debug("ingest completed") 
 					continue
 				}
-				hadErrors = true
+				be, isBatchError := err.(*index.BatchError)
+				if isBatchError {
+					criticalError = be
+
+					break
+				}
+				hadObjectErrors = true
 				logger.Warn("error ingesting object", "error", err)
 			case err, running := <-pErrs:
 				if !running {
@@ -52,7 +59,7 @@ logger.Debug("processing completed") 
 					continue
 				}
-				hadErrors = true
+				hadObjectErrors = true
 				logger.Warn("error processing object", "error", err)
 			}
 		}
@@ -60,5 +67,5 @@ }() 
 	wg.Wait()
 
-	return hadErrors
+	return hadObjectErrors, criticalError
 }
M internal/importer/main.gointernal/importer/main.go
@@ -124,7 +124,12 @@ 				continue
 			}
 
-			hadWarnings := process(ctx, indexer, processor, logger)
+			hadWarnings, err := process(ctx, indexer, processor, logger)
+			if err != nil {
+				logger.Error("failed to process source", "error", err)
+
+				continue
+			}
 
 			if hadWarnings {
 				logger.Warn("importer succeeded, but with warnings/errors")
M internal/index/indexer.gointernal/index/indexer.go
@@ -32,6 +32,14 @@ index bleve.Index 	meta  *Meta
 }
 
+type BatchError struct {
+	error
+}
+
+func (e *BatchError) Error() string {
+	return e.error.Error()
+}
+
 const batchSize = 10_000
 
 func createIndexMapping() (mapping.IndexMapping, error) {
@@ -292,6 +300,8 @@ if k++; k%batchSize == 0 { 				err = i.Flush(batch)
 				if err != nil {
 					errs <- err
+
+					return
 				}
 			}
 		}
@@ -307,11 +317,18 @@ } 
 func (i *WriteIndex) Flush(batch *bleve.Batch) error {
 	size := batch.Size()
+	if size == 0 {
+		return &BatchError{
+			error: errors.New("no documents to flush"),
+		}
+	}
 	slog.Debug("flushing batch", "size", size)
 
 	err := i.index.Batch(batch)
 	if err != nil {
-		return errors.WithMessagef(err, "could not flush batch")
+		return &BatchError{
+			error: errors.WithMessagef(err, "could not flush batch"),
+		}
 	}
 
 	batch.Reset()