about summary refs log tree commit diff stats
path: root/internal/importer/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/importer/main.go')
-rw-r--r--internal/importer/main.go196
1 files changed, 182 insertions, 14 deletions
diff --git a/internal/importer/main.go b/internal/importer/main.go
index e2c222c..d129cd3 100644
--- a/internal/importer/main.go
+++ b/internal/importer/main.go
@@ -3,11 +3,14 @@ package importer
 import (
 	"context"
 	"fmt"
+	"maps"
+	"math"
 	"os/exec"
 	"slices"
 	"strings"
 	"time"
 
+	"github.com/getsentry/sentry-go"
 	"go.alanpearce.eu/searchix/internal/config"
 	"go.alanpearce.eu/searchix/internal/fetcher"
 	"go.alanpearce.eu/searchix/internal/index"
@@ -17,11 +20,43 @@ import (
 	"gitlab.com/tozd/go/errors"
 )
 
+type Options struct {
+	LowMemory  bool
+	Logger     *log.Logger
+	WriteIndex *index.WriteIndex
+}
+
+var Job struct {
+	InProgress bool
+	StartedAt  time.Time
+	FinishedAt time.Time
+	NextRun    time.Time
+}
+
+func SetNextRun(nextRun time.Time) {
+	Job.NextRun = nextRun
+}
+
+func SetLastUpdated(last time.Time) {
+	Job.FinishedAt = last
+}
+
+func MarkIndexingStarted() {
+	Job.StartedAt = time.Now()
+	Job.InProgress = true
+}
+
+func MarkIndexingFinished(nextRun time.Time) {
+	Job.FinishedAt = time.Now()
+	Job.InProgress = false
+	Job.NextRun = nextRun
+}
+
 func createSourceImporter(
 	parent context.Context,
 	log *log.Logger,
 	meta *index.Meta,
-	indexer *index.WriteIndex,
+	writeIndex *index.WriteIndex,
 	forceUpdate bool,
 ) func(*config.Source) errors.E {
 	return func(source *config.Source) errors.E {
@@ -124,7 +159,7 @@ func createSourceImporter(
 				return errors.WithMessagef(err, "failed to create processor")
 			}
 
-			hadWarnings, err := process(ctx, indexer, processor, logger)
+			hadWarnings, err := process(ctx, writeIndex, processor, logger)
 			if err != nil {
 				return errors.WithMessagef(err, "failed to process source")
 			}
@@ -149,18 +184,6 @@ type Importer struct {
 	indexer *index.WriteIndex
 }
 
-func New(
-	cfg *config.Config,
-	log *log.Logger,
-	indexer *index.WriteIndex,
-) *Importer {
-	return &Importer{
-		config:  cfg,
-		log:     log,
-		indexer: indexer,
-	}
-}
-
 func (imp *Importer) Start(
 	ctx context.Context,
 	forceUpdate bool,
@@ -201,5 +224,150 @@ func (imp *Importer) Start(
 		return errors.Wrap(err, "failed to save metadata")
 	}
 
+	SetLastUpdated(time.Now())
+
 	return nil
 }
+
+func New(
+	cfg *config.Config,
+	options *Options,
+) (*Importer, errors.E) {
+
+	return &Importer{
+		config:  cfg,
+		log:     options.Logger,
+		indexer: options.WriteIndex,
+	}, nil
+}
+
+func (imp *Importer) EnsureSourcesIndexed(
+	ctx context.Context,
+	read *index.ReadIndex,
+) errors.E {
+	cfgEnabledSources := slices.Collect(maps.Keys(imp.config.Importer.Sources))
+	slices.Sort(cfgEnabledSources)
+
+	indexedSources, err := read.GetEnabledSources()
+	if err != nil {
+		return errors.Wrap(err, "Failed to get enabled sources from index")
+	}
+	slices.Sort(indexedSources)
+	if !slices.Equal(cfgEnabledSources, indexedSources) {
+		newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool {
+			return slices.Contains(indexedSources, s)
+		})
+		retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool {
+			return slices.Contains(cfgEnabledSources, s)
+		})
+		if len(newSources) > 0 {
+			imp.log.Info("adding new sources", "sources", newSources)
+			err := imp.Start(
+				ctx,
+				false,
+				&newSources,
+			)
+			if err != nil {
+				return errors.Wrap(err, "Failed to update index with new sources")
+			}
+		}
+		if len(retiredSources) > 0 {
+			imp.log.Info("removing retired sources", "sources", retiredSources)
+			for _, s := range retiredSources {
+				err := imp.indexer.DeleteBySource(s)
+				if err != nil {
+					return errors.Wrapf(err, "Failed to remove retired source %s", s)
+				}
+			}
+		}
+	}
+
+	return nil
+}
+
+func (imp *Importer) StartUpdateTimer(
+	parentCtx context.Context,
+	localHub *sentry.Hub,
+) {
+	const monitorSlug = "import"
+	localHub.WithScope(func(scope *sentry.Scope) {
+		var err errors.E
+		scope.SetContext("monitor", sentry.Context{"slug": monitorSlug})
+		monitorConfig := &sentry.MonitorConfig{
+			Schedule:      sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay),
+			MaxRuntime:    int64(math.Ceil(imp.config.Importer.Timeout.Minutes())),
+			CheckInMargin: 5,
+			Timezone:      time.Local.String(),
+		}
+
+		var nextRun time.Time
+		if Job.FinishedAt.Before(time.Now().Add(-24 * time.Hour)) {
+			imp.log.Info("indexing last ran more than 24 hours ago, scheduling immediate update")
+			nextRun = time.Now()
+		} else {
+			nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt)
+		}
+		SetNextRun(nextRun)
+		for {
+			select {
+			case <-parentCtx.Done():
+				imp.log.Debug("stopping scheduler")
+
+				return
+			case <-time.After(time.Until(nextRun)):
+			}
+			imp.log.Info("updating index")
+
+			eventID := localHub.CaptureCheckIn(&sentry.CheckIn{
+				MonitorSlug: monitorSlug,
+				Status:      sentry.CheckInStatusInProgress,
+			}, monitorConfig)
+			MarkIndexingStarted()
+
+			ctx, cancel := context.WithTimeout(parentCtx, imp.config.Importer.Timeout.Duration)
+			err = imp.Start(ctx, false, nil)
+			cancel()
+
+			if err != nil {
+				imp.log.Warn("error updating index", "error", err)
+
+				localHub.CaptureException(err)
+				localHub.CaptureCheckIn(&sentry.CheckIn{
+					ID:          *eventID,
+					MonitorSlug: monitorSlug,
+					Status:      sentry.CheckInStatusError,
+				}, monitorConfig)
+			} else {
+				imp.log.Info("update complete")
+
+				localHub.CaptureCheckIn(&sentry.CheckIn{
+					ID:          *eventID,
+					MonitorSlug: monitorSlug,
+					Status:      sentry.CheckInStatusOK,
+				}, monitorConfig)
+			}
+			nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt)
+			MarkIndexingFinished(nextRun)
+			imp.log.Info("scheduling next run", "next-run", nextRun.Format(time.DateTime))
+		}
+	})
+}
+
+func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time {
+	now := time.Now()
+	nextRun := time.Date(
+		now.Year(),
+		now.Month(),
+		now.Day(),
+		dayTime.Hour,
+		dayTime.Minute,
+		dayTime.Second,
+		0,
+		time.UTC,
+	)
+	if nextRun.Before(now) {
+		return nextRun.AddDate(0, 0, 1)
+	}
+
+	return nextRun
+}