about summary refs log tree commit diff stats
path: root/internal/importer
diff options
context:
space:
mode:
authorAlan Pearce2025-03-24 10:35:21 +0100
committerAlan Pearce2025-03-24 14:22:04 +0100
commit523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38 (patch)
treefbfc0d8323f3e58cbf46f79500244356e779fc7a /internal/importer
parentf23d67df63defd5f6fe6773789851dd63f3ac829 (diff)
downloadsearchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.tar.lz
searchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.tar.zst
searchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.zip
refactor: split out importer code from searchix/web
Diffstat (limited to 'internal/importer')
-rw-r--r--internal/importer/main.go219
-rw-r--r--internal/importer/main_test.go10
2 files changed, 140 insertions, 89 deletions
diff --git a/internal/importer/main.go b/internal/importer/main.go
index 172c504..71606e9 100644
--- a/internal/importer/main.go
+++ b/internal/importer/main.go
@@ -3,11 +3,14 @@ package importer
 import (
 	"context"
 	"fmt"
+	"maps"
+	"math"
 	"os/exec"
 	"slices"
 	"strings"
 	"time"
 
+	"github.com/getsentry/sentry-go"
 	"go.alanpearce.eu/searchix/internal/config"
 	"go.alanpearce.eu/searchix/internal/fetcher"
 	"go.alanpearce.eu/searchix/internal/index"
@@ -18,10 +21,9 @@ import (
 )
 
 type Options struct {
-	Update    bool
-	Replace   bool
-	LowMemory bool
-	Logger    *log.Logger
+	LowMemory  bool
+	Logger     *log.Logger
+	WriteIndex *index.WriteIndex
 }
 
 var Job struct {
@@ -54,7 +56,7 @@ func createSourceImporter(
 	parent context.Context,
 	log *log.Logger,
 	meta *index.Meta,
-	indexer *index.WriteIndex,
+	writeIndex *index.WriteIndex,
 	forceUpdate bool,
 ) func(*config.Source) errors.E {
 	return func(source *config.Source) errors.E {
@@ -157,7 +159,7 @@ func createSourceImporter(
 				return errors.WithMessagef(err, "failed to create processor")
 			}
 
-			hadWarnings, err := process(ctx, indexer, processor, logger)
+			hadWarnings, err := process(ctx, writeIndex, processor, logger)
 			if err != nil {
 				return errors.WithMessagef(err, "failed to process source")
 			}
@@ -182,18 +184,6 @@ type Importer struct {
 	indexer *index.WriteIndex
 }
 
-func New(
-	cfg *config.Config,
-	log *log.Logger,
-	indexer *index.WriteIndex,
-) *Importer {
-	return &Importer{
-		config:  cfg,
-		log:     log,
-		indexer: indexer,
-	}
-}
-
 func (imp *Importer) Start(
 	ctx context.Context,
 	forceUpdate bool,
@@ -234,89 +224,142 @@ func (imp *Importer) Start(
 		return errors.Wrap(err, "failed to save metadata")
 	}
 
+	SetLastUpdated(time.Now())
+
 	return nil
 }
 
-func SetupIndex(ctx context.Context, cfg *config.Config, options *Options) errors.E {
-	var i uint
-	cfgEnabledSources := make([]string, len(cfg.Importer.Sources))
-	for key := range cfg.Importer.Sources {
-		cfgEnabledSources[i] = key
-		i++
-	}
+func New(
+	cfg *config.Config,
+	options *Options,
+) (*Importer, errors.E) {
+
+	return &Importer{
+		config:  cfg,
+		log:     options.Logger,
+		indexer: options.WriteIndex,
+	}, nil
+}
+
+func (imp *Importer) EnsureSourcesIndexed(
+	ctx context.Context,
+	read *index.ReadIndex,
+	write *index.WriteIndex,
+) errors.E {
+	cfgEnabledSources := slices.Collect(maps.Keys(imp.config.Importer.Sources))
 	slices.Sort(cfgEnabledSources)
 
-	read, write, exists, err := index.OpenOrCreate(
-		cfg.DataPath,
-		options.Replace,
-		&index.Options{
-			LowMemory: options.LowMemory,
-			Logger:    options.Logger.Named("index"),
-		},
-	)
+	indexedSources, err := read.GetEnabledSources()
 	if err != nil {
-		return errors.Wrap(err, "Failed to open or create index")
+		return errors.Wrap(err, "Failed to get enabled sources from index")
 	}
-
-	if !exists || options.Replace || options.Update {
-		options.Logger.Info(
-			"Starting build job",
-			"new",
-			!exists,
-			"replace",
-			options.Replace,
-			"update",
-			options.Update,
-		)
-		imp := New(cfg, options.Logger.Named("importer"), write)
-		err = imp.Start(
-			ctx,
-			options.Replace || options.Update,
-			nil,
-		)
-		if err != nil {
-			return errors.Wrap(err, "Failed to build index")
-		}
-		if options.Replace || options.Update {
-			return nil
-		}
-	} else {
-		indexedSources, err := read.GetEnabledSources()
-		if err != nil {
-			return errors.Wrap(err, "Failed to get enabled sources from index")
+	slices.Sort(indexedSources)
+	if !slices.Equal(cfgEnabledSources, indexedSources) {
+		newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool {
+			return slices.Contains(indexedSources, s)
+		})
+		retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool {
+			return slices.Contains(cfgEnabledSources, s)
+		})
+		if len(newSources) > 0 {
+			imp.log.Info("adding new sources", "sources", newSources)
+			err := imp.Start(
+				ctx,
+				false,
+				&newSources,
+			)
+			if err != nil {
+				return errors.Wrap(err, "Failed to update index with new sources")
+			}
 		}
-		slices.Sort(indexedSources)
-		if !slices.Equal(cfgEnabledSources, indexedSources) {
-			newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool {
-				return slices.Contains(indexedSources, s)
-			})
-			retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool {
-				return slices.Contains(cfgEnabledSources, s)
-			})
-			if len(newSources) > 0 {
-				options.Logger.Info("adding new sources", "sources", newSources)
-				imp := New(cfg, options.Logger.Named("importer"), write)
-				err := imp.Start(
-					ctx,
-					false,
-					&newSources,
-				)
+		if len(retiredSources) > 0 {
+			imp.log.Info("removing retired sources", "sources", retiredSources)
+			for _, s := range retiredSources {
+				err := write.DeleteBySource(s)
 				if err != nil {
-					return errors.Wrap(err, "Failed to update index with new sources")
-				}
-			}
-			if len(retiredSources) > 0 {
-				options.Logger.Info("removing retired sources", "sources", retiredSources)
-				for _, s := range retiredSources {
-					err := write.DeleteBySource(s)
-					if err != nil {
-						return errors.Wrapf(err, "Failed to remove retired source %s", s)
-					}
+					return errors.Wrapf(err, "Failed to remove retired source %s", s)
 				}
 			}
 		}
 	}
-	SetLastUpdated(read.LastUpdated())
 
 	return nil
 }
+
+func (imp *Importer) StartUpdateTimer(
+	ctx context.Context,
+	localHub *sentry.Hub,
+) {
+	const monitorSlug = "import"
+	localHub.WithScope(func(scope *sentry.Scope) {
+		var err errors.E
+		scope.SetContext("monitor", sentry.Context{"slug": monitorSlug})
+		monitorConfig := &sentry.MonitorConfig{
+			Schedule:      sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay),
+			MaxRuntime:    int64(math.Ceil(imp.config.Importer.Timeout.Minutes())),
+			CheckInMargin: 5,
+			Timezone:      time.Local.String(),
+		}
+
+		nextRun := nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt)
+		SetNextRun(nextRun)
+		for {
+			imp.log.Debug("scheduling next run", "next-run", nextRun)
+			select {
+			case <-ctx.Done():
+				imp.log.Debug("stopping scheduler")
+
+				return
+			case <-time.After(time.Until(nextRun)):
+			}
+			imp.log.Info("updating index")
+
+			eventID := localHub.CaptureCheckIn(&sentry.CheckIn{
+				MonitorSlug: monitorSlug,
+				Status:      sentry.CheckInStatusInProgress,
+			}, monitorConfig)
+			MarkIndexingStarted()
+
+			err = imp.Start(ctx, false, nil)
+			if err != nil {
+				imp.log.Warn("error updating index", "error", err)
+
+				localHub.CaptureException(err)
+				localHub.CaptureCheckIn(&sentry.CheckIn{
+					ID:          *eventID,
+					MonitorSlug: monitorSlug,
+					Status:      sentry.CheckInStatusError,
+				}, monitorConfig)
+			} else {
+				imp.log.Info("update complete")
+
+				localHub.CaptureCheckIn(&sentry.CheckIn{
+					ID:          *eventID,
+					MonitorSlug: monitorSlug,
+					Status:      sentry.CheckInStatusOK,
+				}, monitorConfig)
+			}
+			nextRun = nextRun.AddDate(0, 0, 1)
+			MarkIndexingFinished(nextRun)
+		}
+	})
+}
+
+func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time {
+	now := time.Now()
+	nextRun := time.Date(
+		now.Year(),
+		now.Month(),
+		now.Day(),
+		dayTime.Hour,
+		dayTime.Minute,
+		dayTime.Second,
+		0,
+		time.UTC,
+	)
+	if nextRun.Before(now) {
+		return nextRun.AddDate(0, 0, 1)
+	}
+
+	return nextRun
+}
diff --git a/internal/importer/main_test.go b/internal/importer/main_test.go
index 576d681..84f6adf 100644
--- a/internal/importer/main_test.go
+++ b/internal/importer/main_test.go
@@ -22,7 +22,15 @@ func BenchmarkImporterLowMemory(b *testing.B) {
 		b.Fatal(err)
 	}
 
-	imp := New(&cfg, logger.Named("importer"), write)
+	imp, err := New(&cfg, &Options{
+		Logger:     logger.Named("importer"),
+		LowMemory:  true,
+		WriteIndex: write,
+	})
+	if err != nil {
+		b.Fatal(err)
+	}
+
 	err = imp.Start(
 		context.Background(),
 		false,