diff options
author | Alan Pearce | 2024-05-16 23:41:57 +0200 |
---|---|---|
committer | Alan Pearce | 2024-05-16 23:41:57 +0200 |
commit | a5e758d41c151c17ed03b39454470ba8dd0c3b99 (patch) | |
tree | 386333b5020477eabcf490773113b029e47a21ef /internal/importer/importer.go | |
parent | d558039919b6198a246a6a3fd007276191cb4b2f (diff) | |
download | searchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.tar.lz searchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.tar.zst searchix-a5e758d41c151c17ed03b39454470ba8dd0c3b99.zip |
refactor: separate fetch and import logic
Diffstat (limited to 'internal/importer/importer.go')
-rw-r--r-- | internal/importer/importer.go | 81 |
1 files changed, 17 insertions, 64 deletions
diff --git a/internal/importer/importer.go b/internal/importer/importer.go index 255f70e..57118c8 100644 --- a/internal/importer/importer.go +++ b/internal/importer/importer.go @@ -3,79 +3,32 @@ package importer import ( "context" "log/slog" - "searchix/internal/config" "searchix/internal/index" + "searchix/internal/nix" "sync" ) type Importer interface { - FetchIfNeeded(context.Context) (bool, error) Import(context.Context, *index.WriteIndex) (bool, error) } -func NewNixpkgsChannelImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *NixpkgsChannelImporter { - return &NixpkgsChannelImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -func NewChannelImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *ChannelImporter { - return &ChannelImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -func NewDownloadOptionsImporter( - source *config.Source, - dataPath string, - logger *slog.Logger, -) *DownloadOptionsImporter { - return &DownloadOptionsImporter{ - DataPath: dataPath, - Source: source, - Logger: logger, - } -} - -type importConfig struct { - Filename string - Source *config.Source - Logger *slog.Logger +type Processor interface { + Process(context.Context) (<-chan nix.Importable, <-chan error) } -func processOptions( - parent context.Context, +func process( + ctx context.Context, indexer *index.WriteIndex, - conf *importConfig, -) (bool, error) { - ctx, cancel := context.WithTimeout(parent, conf.Source.ImportTimeout) - defer cancel() - - conf.Logger.Debug("creating option processor", "filename", conf.Filename) - processor, err := NewOptionProcessor(conf.Filename, conf.Source) - if err != nil { - return true, err - } - + processor Processor, + logger *slog.Logger, +) bool { wg := sync.WaitGroup{} wg.Add(1) - options, pErrs := processor.Process(ctx) + objects, pErrs := processor.Process(ctx) wg.Add(1) - iErrs := indexer.Import(ctx, options) + iErrs := indexer.Import(ctx, objects) var hadErrors bool go func() { @@ -85,29 +38,29 @@ func processOptions( if !running { wg.Done() iErrs = nil - conf.Logger.Info("ingest completed") + logger.Debug("ingest completed") continue } hadErrors = true - conf.Logger.Warn("error ingesting option", "error", err) + logger.Warn("error ingesting object", "error", err) case err, running := <-pErrs: if !running { wg.Done() pErrs = nil - conf.Logger.Debug("processing completed") + logger.Debug("processing completed") continue } hadErrors = true - conf.Logger.Warn("error processing option", "error", err) + logger.Warn("error processing object", "error", err) } } }() - conf.Logger.Debug("options processing", "state", "waiting") + logger.Debug("object processing", "state", "waiting") wg.Wait() - conf.Logger.Debug("options processing", "state", "complete") + logger.Debug("object processing", "state", "complete") - return hadErrors, nil + return hadErrors } |