about summary refs log tree commit diff stats
path: root/internal/importer/importer.go
diff options
context:
space:
mode:
authorAlan Pearce2024-05-23 11:45:38 +0200
committerAlan Pearce2024-05-23 11:45:38 +0200
commit3053e41b1528ef898cccd44e056e4d167619af6b (patch)
treea83da2e8cfcaf824a57d26e9e5ef55b95f14e603 /internal/importer/importer.go
parent3149660f0fcbad484a71703cf2f7f715abc130ba (diff)
downloadsearchix-3053e41b1528ef898cccd44e056e4d167619af6b.tar.lz
searchix-3053e41b1528ef898cccd44e056e4d167619af6b.tar.zst
searchix-3053e41b1528ef898cccd44e056e4d167619af6b.zip
fix: abort import of source on batch processing errors
Diffstat (limited to 'internal/importer/importer.go')
-rw-r--r--internal/importer/importer.go17
1 files changed, 12 insertions, 5 deletions
diff --git a/internal/importer/importer.go b/internal/importer/importer.go
index 53a87c9..6803f00 100644
--- a/internal/importer/importer.go
+++ b/internal/importer/importer.go
@@ -21,7 +21,7 @@ func process(
 	indexer *index.WriteIndex,
 	processor Processor,
 	logger *slog.Logger,
-) bool {
+) (bool, error) {
 	wg := sync.WaitGroup{}
 
 	wg.Add(1)
@@ -30,7 +30,8 @@ func process(
 	wg.Add(1)
 	iErrs := indexer.Import(ctx, objects)
 
-	var hadErrors bool
+	var hadObjectErrors bool
+	var criticalError error
 	go func() {
 		for {
 			select {
@@ -42,7 +43,13 @@ func process(
 
 					continue
 				}
-				hadErrors = true
+				be, isBatchError := err.(*index.BatchError)
+				if isBatchError {
+					criticalError = be
+
+					break
+				}
+				hadObjectErrors = true
 				logger.Warn("error ingesting object", "error", err)
 			case err, running := <-pErrs:
 				if !running {
@@ -52,7 +59,7 @@ func process(
 
 					continue
 				}
-				hadErrors = true
+				hadObjectErrors = true
 				logger.Warn("error processing object", "error", err)
 			}
 		}
@@ -60,5 +67,5 @@ func process(
 
 	wg.Wait()
 
-	return hadErrors
+	return hadObjectErrors, criticalError
 }