internal/importer/importer.go (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | package importer import ( "context" "log/slog" "searchix/internal/index" "searchix/internal/nix" "sync" ) type Importer interface { Import(context.Context, *index.WriteIndex) (bool, error) } type Processor interface { Process(context.Context) (<-chan nix.Importable, <-chan error) } func process( ctx context.Context, indexer *index.WriteIndex, processor Processor, logger *slog.Logger, ) bool { wg := sync.WaitGroup{} wg.Add(1) objects, pErrs := processor.Process(ctx) wg.Add(1) iErrs := indexer.Import(ctx, objects) var hadErrors bool go func() { for { select { case err, running := <-iErrs: if !running { wg.Done() iErrs = nil logger.Debug("ingest completed") continue } hadErrors = true logger.Warn("error ingesting object", "error", err) case err, running := <-pErrs: if !running { wg.Done() pErrs = nil logger.Debug("processing completed") continue } hadErrors = true logger.Warn("error processing object", "error", err) } } }() wg.Wait() return hadErrors } |