about summary refs log tree commit diff stats
path: root/internal/importer/main.go
diff options
context:
space:
mode:
authorAlan Pearce2024-05-23 13:14:45 +0200
committerAlan Pearce2024-05-23 13:14:45 +0200
commit0dbfe37fbddb95c184d845c79bbe014597d55fe8 (patch)
treee68a2db861211ceebe4c357a059a4cb511f707a9 /internal/importer/main.go
parent3053e41b1528ef898cccd44e056e4d167619af6b (diff)
downloadsearchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.tar.lz
searchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.tar.zst
searchix-0dbfe37fbddb95c184d845c79bbe014597d55fe8.zip
feat: stream files directly from fetcher to importer
Use IndexMeta to store the information relevant to making conditional
updates in future runs.
Diffstat (limited to 'internal/importer/main.go')
-rw-r--r--internal/importer/main.go174
1 files changed, 98 insertions, 76 deletions
diff --git a/internal/importer/main.go b/internal/importer/main.go
index 6f462c3..6be5b45 100644
--- a/internal/importer/main.go
+++ b/internal/importer/main.go
@@ -2,56 +2,49 @@ package importer
 
 import (
 	"context"
-	"io"
+	"fmt"
 	"log/slog"
 	"os/exec"
-	"path"
 	"searchix/internal/config"
 	"searchix/internal/fetcher"
 	"searchix/internal/index"
 	"slices"
 	"strings"
+	"time"
 
 	"github.com/pkg/errors"
 )
 
-func Start(
-	cfg *config.Config,
+func createSourceImporter(
+	parent context.Context,
+	meta *index.Meta,
 	indexer *index.WriteIndex,
 	forceUpdate bool,
-	onlyUpdateSources *[]string,
-) error {
-	if len(cfg.Importer.Sources) == 0 {
-		slog.Info("No sources enabled")
-
-		return nil
-	}
-
-	ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration)
-	defer cancel()
-
-	forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0)
-
-	for name, source := range cfg.Importer.Sources {
-		if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 {
-			if !slices.Contains(*onlyUpdateSources, name) {
-				continue
-			}
-		}
-
-		logger := slog.With("name", name, "fetcher", source.Fetcher.String())
+) func(*config.Source) error {
+	return func(source *config.Source) error {
+		logger := slog.With(
+			"name",
+			source.Key,
+			"fetcher",
+			source.Fetcher.String(),
+			"timeout",
+			source.FetchTimeout.Duration,
+		)
 		logger.Debug("starting fetcher")
 
-		fetcherDataPath := path.Join(cfg.DataPath, "sources", source.Key)
-
-		fetcher, err := fetcher.New(source, fetcherDataPath, logger)
+		fetcher, err := fetcher.New(source, logger)
 		if err != nil {
-			logger.Error("error creating fetcher", "error", err)
-
-			continue
+			return errors.WithMessage(err, "error creating fetcher")
 		}
 
-		files, updated, err := fetcher.FetchIfNeeded(ctx)
+		sourceMeta := meta.GetSourceMeta(source.Key)
+		if forceUpdate {
+			sourceMeta.Updated = time.Time{}
+		}
+		previousUpdate := sourceMeta.Updated
+		ctx, cancel := context.WithTimeout(parent, source.FetchTimeout.Duration)
+		defer cancel()
+		files, err := fetcher.FetchIfNeeded(ctx, &sourceMeta)
 
 		if err != nil {
 			var exerr *exec.ExitError
@@ -65,70 +58,52 @@ func Start(
 						"status",
 						exerr.ExitCode(),
 					)
-
-					continue
 				}
-			} else {
-				logger.Error("importer fetch failed", "error", err)
-
-				continue
 			}
 
-			continue
+			return errors.WithMessage(err, "importer fetch failed")
 		}
-		logger.Info("importer fetch succeeded", "updated", updated)
+		logger.Info(
+			"importer fetch succeeded",
+			"previous",
+			previousUpdate,
+			"current",
+			sourceMeta.Updated,
+			"is_updated",
+			sourceMeta.Updated.After(previousUpdate),
+			"update_force",
+			forceUpdate,
+		)
+
+		if sourceMeta.Updated.After(previousUpdate) || forceUpdate {
 
-		if updated || forceUpdate {
 			err = setRepoRevision(files.Revision, source)
 			if err != nil {
 				logger.Warn("could not set source repo revision", "error", err)
 			}
 
-			var file io.ReadCloser
 			var processor Processor
+			logger.Debug(
+				"creating processor",
+				"importer_type",
+				source.Importer,
+				"revision",
+				source.Repo.Revision,
+			)
 			switch source.Importer {
 			case config.Options:
-				logger.Debug(
-					"creating processor",
-					"filename",
-					files.Options,
-					"revision",
-					source.Repo.Revision,
-				)
-				file, err = openFileDecoded(files.Options)
-				if err != nil {
-					logger.Error("could not open file", "filename", files.Options, "error", err)
-
-					continue
-				}
-				processor, err = NewOptionProcessor(file, source)
+				logger.Debug("processor created", "file", fmt.Sprintf("%T", files.Options))
+				processor, err = NewOptionProcessor(files.Options, source)
 			case config.Packages:
-				logger.Debug(
-					"creating processor",
-					"filename",
-					files.Packages,
-					"revision",
-					source.Repo.Revision,
-				)
-				file, err = openFileDecoded(files.Packages)
-				if err != nil {
-					logger.Error("could not open file", "filename", files.Packages, "error", err)
-
-					continue
-				}
-				processor, err = NewPackageProcessor(file, source)
+				processor, err = NewPackageProcessor(files.Packages, source)
 			}
 			if err != nil {
-				logger.Error("failed to create processor", "type", source.Importer, "error", err)
-
-				continue
+				return errors.WithMessagef(err, "failed to create processor")
 			}
 
 			hadWarnings, err := process(ctx, indexer, processor, logger)
 			if err != nil {
-				logger.Error("failed to process source", "error", err)
-
-				continue
+				return errors.WithMessagef(err, "failed to process source")
 			}
 
 			if hadWarnings {
@@ -137,6 +112,53 @@ func Start(
 				logger.Info("importer succeeded")
 			}
 		}
+
+		sourceMeta.Rev = source.Repo.Revision
+		meta.SetSourceMeta(source.Key, sourceMeta)
+
+		return nil
+	}
+}
+
+func Start(
+	cfg *config.Config,
+	indexer *index.WriteIndex,
+	forceUpdate bool,
+	onlyUpdateSources *[]string,
+) error {
+	if len(cfg.Importer.Sources) == 0 {
+		slog.Info("No sources enabled")
+
+		return nil
+	}
+
+	slog.Debug("starting importer", "timeout", cfg.Importer.Timeout.Duration)
+	importCtx, cancelImport := context.WithTimeout(
+		context.Background(),
+		cfg.Importer.Timeout.Duration,
+	)
+	defer cancelImport()
+
+	forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0)
+
+	meta := indexer.Meta
+
+	importSource := createSourceImporter(importCtx, meta, indexer, forceUpdate)
+	for name, source := range cfg.Importer.Sources {
+		if onlyUpdateSources != nil && len(*onlyUpdateSources) > 0 {
+			if !slices.Contains(*onlyUpdateSources, name) {
+				continue
+			}
+		}
+		err := importSource(source)
+		if err != nil {
+			slog.Error("import failed", "source", name, "error", err)
+		}
+	}
+
+	err := indexer.SaveMeta()
+	if err != nil {
+		return errors.Wrap(err, "failed to save metadata")
 	}
 
 	return nil