diff options
Diffstat (limited to 'internal/importer/main.go')
-rw-r--r-- | internal/importer/main.go | 196 |
1 files changed, 182 insertions, 14 deletions
diff --git a/internal/importer/main.go b/internal/importer/main.go index e2c222c..d129cd3 100644 --- a/internal/importer/main.go +++ b/internal/importer/main.go @@ -3,11 +3,14 @@ package importer import ( "context" "fmt" + "maps" + "math" "os/exec" "slices" "strings" "time" + "github.com/getsentry/sentry-go" "go.alanpearce.eu/searchix/internal/config" "go.alanpearce.eu/searchix/internal/fetcher" "go.alanpearce.eu/searchix/internal/index" @@ -17,11 +20,43 @@ import ( "gitlab.com/tozd/go/errors" ) +type Options struct { + LowMemory bool + Logger *log.Logger + WriteIndex *index.WriteIndex +} + +var Job struct { + InProgress bool + StartedAt time.Time + FinishedAt time.Time + NextRun time.Time +} + +func SetNextRun(nextRun time.Time) { + Job.NextRun = nextRun +} + +func SetLastUpdated(last time.Time) { + Job.FinishedAt = last +} + +func MarkIndexingStarted() { + Job.StartedAt = time.Now() + Job.InProgress = true +} + +func MarkIndexingFinished(nextRun time.Time) { + Job.FinishedAt = time.Now() + Job.InProgress = false + Job.NextRun = nextRun +} + func createSourceImporter( parent context.Context, log *log.Logger, meta *index.Meta, - indexer *index.WriteIndex, + writeIndex *index.WriteIndex, forceUpdate bool, ) func(*config.Source) errors.E { return func(source *config.Source) errors.E { @@ -124,7 +159,7 @@ func createSourceImporter( return errors.WithMessagef(err, "failed to create processor") } - hadWarnings, err := process(ctx, indexer, processor, logger) + hadWarnings, err := process(ctx, writeIndex, processor, logger) if err != nil { return errors.WithMessagef(err, "failed to process source") } @@ -149,18 +184,6 @@ type Importer struct { indexer *index.WriteIndex } -func New( - cfg *config.Config, - log *log.Logger, - indexer *index.WriteIndex, -) *Importer { - return &Importer{ - config: cfg, - log: log, - indexer: indexer, - } -} - func (imp *Importer) Start( ctx context.Context, forceUpdate bool, @@ -201,5 +224,150 @@ func (imp *Importer) Start( return errors.Wrap(err, "failed to save metadata") } + SetLastUpdated(time.Now()) + return nil } + +func New( + cfg *config.Config, + options *Options, +) (*Importer, errors.E) { + + return &Importer{ + config: cfg, + log: options.Logger, + indexer: options.WriteIndex, + }, nil +} + +func (imp *Importer) EnsureSourcesIndexed( + ctx context.Context, + read *index.ReadIndex, +) errors.E { + cfgEnabledSources := slices.Collect(maps.Keys(imp.config.Importer.Sources)) + slices.Sort(cfgEnabledSources) + + indexedSources, err := read.GetEnabledSources() + if err != nil { + return errors.Wrap(err, "Failed to get enabled sources from index") + } + slices.Sort(indexedSources) + if !slices.Equal(cfgEnabledSources, indexedSources) { + newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { + return slices.Contains(indexedSources, s) + }) + retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { + return slices.Contains(cfgEnabledSources, s) + }) + if len(newSources) > 0 { + imp.log.Info("adding new sources", "sources", newSources) + err := imp.Start( + ctx, + false, + &newSources, + ) + if err != nil { + return errors.Wrap(err, "Failed to update index with new sources") + } + } + if len(retiredSources) > 0 { + imp.log.Info("removing retired sources", "sources", retiredSources) + for _, s := range retiredSources { + err := imp.indexer.DeleteBySource(s) + if err != nil { + return errors.Wrapf(err, "Failed to remove retired source %s", s) + } + } + } + } + + return nil +} + +func (imp *Importer) StartUpdateTimer( + parentCtx context.Context, + localHub *sentry.Hub, +) { + const monitorSlug = "import" + localHub.WithScope(func(scope *sentry.Scope) { + var err errors.E + scope.SetContext("monitor", sentry.Context{"slug": monitorSlug}) + monitorConfig := &sentry.MonitorConfig{ + Schedule: sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay), + MaxRuntime: int64(math.Ceil(imp.config.Importer.Timeout.Minutes())), + CheckInMargin: 5, + Timezone: time.Local.String(), + } + + var nextRun time.Time + if Job.FinishedAt.Before(time.Now().Add(-24 * time.Hour)) { + imp.log.Info("indexing last ran more than 24 hours ago, scheduling immediate update") + nextRun = time.Now() + } else { + nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) + } + SetNextRun(nextRun) + for { + select { + case <-parentCtx.Done(): + imp.log.Debug("stopping scheduler") + + return + case <-time.After(time.Until(nextRun)): + } + imp.log.Info("updating index") + + eventID := localHub.CaptureCheckIn(&sentry.CheckIn{ + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusInProgress, + }, monitorConfig) + MarkIndexingStarted() + + ctx, cancel := context.WithTimeout(parentCtx, imp.config.Importer.Timeout.Duration) + err = imp.Start(ctx, false, nil) + cancel() + + if err != nil { + imp.log.Warn("error updating index", "error", err) + + localHub.CaptureException(err) + localHub.CaptureCheckIn(&sentry.CheckIn{ + ID: *eventID, + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusError, + }, monitorConfig) + } else { + imp.log.Info("update complete") + + localHub.CaptureCheckIn(&sentry.CheckIn{ + ID: *eventID, + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusOK, + }, monitorConfig) + } + nextRun = nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) + MarkIndexingFinished(nextRun) + imp.log.Info("scheduling next run", "next-run", nextRun.Format(time.DateTime)) + } + }) +} + +func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time { + now := time.Now() + nextRun := time.Date( + now.Year(), + now.Month(), + now.Day(), + dayTime.Hour, + dayTime.Minute, + dayTime.Second, + 0, + time.UTC, + ) + if nextRun.Before(now) { + return nextRun.AddDate(0, 0, 1) + } + + return nextRun +} |