diff options
author | Alan Pearce | 2025-03-24 10:35:21 +0100 |
---|---|---|
committer | Alan Pearce | 2025-03-24 14:22:04 +0100 |
commit | 523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38 (patch) | |
tree | fbfc0d8323f3e58cbf46f79500244356e779fc7a | |
parent | f23d67df63defd5f6fe6773789851dd63f3ac829 (diff) | |
download | searchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.tar.lz searchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.tar.zst searchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.zip |
refactor: split out importer code from searchix/web
-rw-r--r-- | cmd/searchix-web/main.go | 51 | ||||
-rw-r--r-- | internal/importer/main.go | 219 | ||||
-rw-r--r-- | internal/importer/main_test.go | 10 | ||||
-rw-r--r-- | web/searchix.go | 112 |
4 files changed, 190 insertions, 202 deletions
diff --git a/cmd/searchix-web/main.go b/cmd/searchix-web/main.go index 67b2074..c70b4d3 100644 --- a/cmd/searchix-web/main.go +++ b/cmd/searchix-web/main.go @@ -7,11 +7,14 @@ import ( "os" "os/signal" "runtime/pprof" + "sync" "badc0de.net/pkg/flagutil" + "github.com/getsentry/sentry-go" "go.alanpearce.eu/searchix/internal/config" "go.alanpearce.eu/searchix/internal/importer" + "go.alanpearce.eu/searchix/internal/index" "go.alanpearce.eu/searchix/web" "go.alanpearce.eu/x/log" ) @@ -70,35 +73,65 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() - s, err := web.New(cfg, logger) + read, write, exists, err := index.OpenOrCreate( + cfg.DataPath, + *replace, + &index.Options{ + LowMemory: cfg.Importer.LowMemory, + Logger: logger.Named("index"), + }, + ) + if err != nil { + logger.Fatal("Failed to open or create index", "error", err) + } + + s, err := web.New(cfg, logger, read) if err != nil { - logger.Fatal("Failed to initialise searchix", "error", err) + logger.Fatal("Failed to initialise searchix-web", "error", err) } - err = importer.SetupIndex(ctx, cfg, &importer.Options{ - Update: *update, - Replace: *replace, + imp, err := importer.New(cfg, &importer.Options{ LowMemory: cfg.Importer.LowMemory, - Logger: logger, + Logger: logger.Named("importer"), }) if err != nil { - logger.Fatal("Failed to setup index", "error", err) + logger.Fatal("Failed to create importer", "error", err) } - if *replace || *update { + if !exists || *replace || *update { + err := imp.Start(ctx, true, nil) + if err != nil { + logger.Fatal("Failed to start importer", "error", err) + } + return } + err = imp.EnsureSourcesIndexed(ctx, read, write) + if err != nil { + logger.Fatal("Failed to setup index", "error", err) + } + + wg := &sync.WaitGroup{} + wg.Add(1) go func() { - err = s.Start(ctx, *dev) + defer wg.Done() + err := s.Start(*dev) if err != nil { // Error starting or closing listener: logger.Fatal("error", "error", err) } }() + wg.Add(1) + go func() { + defer wg.Done() + imp.StartUpdateTimer(ctx, sentry.CurrentHub().Clone()) + }() + <-ctx.Done() logger.Debug("calling stop") s.Stop() + wg.Wait() logger.Debug("done") } diff --git a/internal/importer/main.go b/internal/importer/main.go index 172c504..71606e9 100644 --- a/internal/importer/main.go +++ b/internal/importer/main.go @@ -3,11 +3,14 @@ package importer import ( "context" "fmt" + "maps" + "math" "os/exec" "slices" "strings" "time" + "github.com/getsentry/sentry-go" "go.alanpearce.eu/searchix/internal/config" "go.alanpearce.eu/searchix/internal/fetcher" "go.alanpearce.eu/searchix/internal/index" @@ -18,10 +21,9 @@ import ( ) type Options struct { - Update bool - Replace bool - LowMemory bool - Logger *log.Logger + LowMemory bool + Logger *log.Logger + WriteIndex *index.WriteIndex } var Job struct { @@ -54,7 +56,7 @@ func createSourceImporter( parent context.Context, log *log.Logger, meta *index.Meta, - indexer *index.WriteIndex, + writeIndex *index.WriteIndex, forceUpdate bool, ) func(*config.Source) errors.E { return func(source *config.Source) errors.E { @@ -157,7 +159,7 @@ func createSourceImporter( return errors.WithMessagef(err, "failed to create processor") } - hadWarnings, err := process(ctx, indexer, processor, logger) + hadWarnings, err := process(ctx, writeIndex, processor, logger) if err != nil { return errors.WithMessagef(err, "failed to process source") } @@ -182,18 +184,6 @@ type Importer struct { indexer *index.WriteIndex } -func New( - cfg *config.Config, - log *log.Logger, - indexer *index.WriteIndex, -) *Importer { - return &Importer{ - config: cfg, - log: log, - indexer: indexer, - } -} - func (imp *Importer) Start( ctx context.Context, forceUpdate bool, @@ -234,89 +224,142 @@ func (imp *Importer) Start( return errors.Wrap(err, "failed to save metadata") } + SetLastUpdated(time.Now()) + return nil } -func SetupIndex(ctx context.Context, cfg *config.Config, options *Options) errors.E { - var i uint - cfgEnabledSources := make([]string, len(cfg.Importer.Sources)) - for key := range cfg.Importer.Sources { - cfgEnabledSources[i] = key - i++ - } +func New( + cfg *config.Config, + options *Options, +) (*Importer, errors.E) { + + return &Importer{ + config: cfg, + log: options.Logger, + indexer: options.WriteIndex, + }, nil +} + +func (imp *Importer) EnsureSourcesIndexed( + ctx context.Context, + read *index.ReadIndex, + write *index.WriteIndex, +) errors.E { + cfgEnabledSources := slices.Collect(maps.Keys(imp.config.Importer.Sources)) slices.Sort(cfgEnabledSources) - read, write, exists, err := index.OpenOrCreate( - cfg.DataPath, - options.Replace, - &index.Options{ - LowMemory: options.LowMemory, - Logger: options.Logger.Named("index"), - }, - ) + indexedSources, err := read.GetEnabledSources() if err != nil { - return errors.Wrap(err, "Failed to open or create index") + return errors.Wrap(err, "Failed to get enabled sources from index") } - - if !exists || options.Replace || options.Update { - options.Logger.Info( - "Starting build job", - "new", - !exists, - "replace", - options.Replace, - "update", - options.Update, - ) - imp := New(cfg, options.Logger.Named("importer"), write) - err = imp.Start( - ctx, - options.Replace || options.Update, - nil, - ) - if err != nil { - return errors.Wrap(err, "Failed to build index") - } - if options.Replace || options.Update { - return nil - } - } else { - indexedSources, err := read.GetEnabledSources() - if err != nil { - return errors.Wrap(err, "Failed to get enabled sources from index") + slices.Sort(indexedSources) + if !slices.Equal(cfgEnabledSources, indexedSources) { + newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { + return slices.Contains(indexedSources, s) + }) + retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { + return slices.Contains(cfgEnabledSources, s) + }) + if len(newSources) > 0 { + imp.log.Info("adding new sources", "sources", newSources) + err := imp.Start( + ctx, + false, + &newSources, + ) + if err != nil { + return errors.Wrap(err, "Failed to update index with new sources") + } } - slices.Sort(indexedSources) - if !slices.Equal(cfgEnabledSources, indexedSources) { - newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { - return slices.Contains(indexedSources, s) - }) - retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { - return slices.Contains(cfgEnabledSources, s) - }) - if len(newSources) > 0 { - options.Logger.Info("adding new sources", "sources", newSources) - imp := New(cfg, options.Logger.Named("importer"), write) - err := imp.Start( - ctx, - false, - &newSources, - ) + if len(retiredSources) > 0 { + imp.log.Info("removing retired sources", "sources", retiredSources) + for _, s := range retiredSources { + err := write.DeleteBySource(s) if err != nil { - return errors.Wrap(err, "Failed to update index with new sources") - } - } - if len(retiredSources) > 0 { - options.Logger.Info("removing retired sources", "sources", retiredSources) - for _, s := range retiredSources { - err := write.DeleteBySource(s) - if err != nil { - return errors.Wrapf(err, "Failed to remove retired source %s", s) - } + return errors.Wrapf(err, "Failed to remove retired source %s", s) } } } } - SetLastUpdated(read.LastUpdated()) return nil } + +func (imp *Importer) StartUpdateTimer( + ctx context.Context, + localHub *sentry.Hub, +) { + const monitorSlug = "import" + localHub.WithScope(func(scope *sentry.Scope) { + var err errors.E + scope.SetContext("monitor", sentry.Context{"slug": monitorSlug}) + monitorConfig := &sentry.MonitorConfig{ + Schedule: sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay), + MaxRuntime: int64(math.Ceil(imp.config.Importer.Timeout.Minutes())), + CheckInMargin: 5, + Timezone: time.Local.String(), + } + + nextRun := nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt) + SetNextRun(nextRun) + for { + imp.log.Debug("scheduling next run", "next-run", nextRun) + select { + case <-ctx.Done(): + imp.log.Debug("stopping scheduler") + + return + case <-time.After(time.Until(nextRun)): + } + imp.log.Info("updating index") + + eventID := localHub.CaptureCheckIn(&sentry.CheckIn{ + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusInProgress, + }, monitorConfig) + MarkIndexingStarted() + + err = imp.Start(ctx, false, nil) + if err != nil { + imp.log.Warn("error updating index", "error", err) + + localHub.CaptureException(err) + localHub.CaptureCheckIn(&sentry.CheckIn{ + ID: *eventID, + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusError, + }, monitorConfig) + } else { + imp.log.Info("update complete") + + localHub.CaptureCheckIn(&sentry.CheckIn{ + ID: *eventID, + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusOK, + }, monitorConfig) + } + nextRun = nextRun.AddDate(0, 0, 1) + MarkIndexingFinished(nextRun) + } + }) +} + +func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time { + now := time.Now() + nextRun := time.Date( + now.Year(), + now.Month(), + now.Day(), + dayTime.Hour, + dayTime.Minute, + dayTime.Second, + 0, + time.UTC, + ) + if nextRun.Before(now) { + return nextRun.AddDate(0, 0, 1) + } + + return nextRun +} diff --git a/internal/importer/main_test.go b/internal/importer/main_test.go index 576d681..84f6adf 100644 --- a/internal/importer/main_test.go +++ b/internal/importer/main_test.go @@ -22,7 +22,15 @@ func BenchmarkImporterLowMemory(b *testing.B) { b.Fatal(err) } - imp := New(&cfg, logger.Named("importer"), write) + imp, err := New(&cfg, &Options{ + Logger: logger.Named("importer"), + LowMemory: true, + WriteIndex: write, + }) + if err != nil { + b.Fatal(err) + } + err = imp.Start( context.Background(), false, diff --git a/web/searchix.go b/web/searchix.go index b410d8f..3197285 100644 --- a/web/searchix.go +++ b/web/searchix.go @@ -1,13 +1,9 @@ package web import ( - "context" - "math" - "sync" "time" "go.alanpearce.eu/searchix/internal/config" - "go.alanpearce.eu/searchix/internal/importer" "go.alanpearce.eu/searchix/internal/index" "go.alanpearce.eu/searchix/internal/server" "go.alanpearce.eu/x/log" @@ -16,36 +12,15 @@ import ( "gitlab.com/tozd/go/errors" ) -func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time { - now := time.Now() - nextRun := time.Date( - now.Year(), - now.Month(), - now.Day(), - dayTime.Hour, - dayTime.Minute, - dayTime.Second, - 0, - time.UTC, - ) - if nextRun.Before(now) { - return nextRun.AddDate(0, 0, 1) - } - - return nextRun -} - type Server struct { - sv *server.Server - wg *sync.WaitGroup - cfg *config.Config - log *log.Logger - sentryHub *sentry.Hub - readIndex *index.ReadIndex - writeIndex *index.WriteIndex + sv *server.Server + cfg *config.Config + log *log.Logger + sentryHub *sentry.Hub + readIndex *index.ReadIndex } -func New(cfg *config.Config, log *log.Logger) (*Server, errors.E) { +func New(cfg *config.Config, log *log.Logger, read *index.ReadIndex) (*Server, errors.E) { err := sentry.Init(sentry.ClientOptions{ EnableTracing: true, TracesSampleRate: 1.0, @@ -60,89 +35,19 @@ func New(cfg *config.Config, log *log.Logger) (*Server, errors.E) { cfg: cfg, log: log, sentryHub: sentry.CurrentHub(), + readIndex: read, }, nil } -func (s *Server) startUpdateTimer( - ctx context.Context, - cfg *config.Config, - localHub *sentry.Hub, -) { - const monitorSlug = "import" - localHub.WithScope(func(scope *sentry.Scope) { - var err errors.E - scope.SetContext("monitor", sentry.Context{"slug": monitorSlug}) - monitorConfig := &sentry.MonitorConfig{ - Schedule: sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay), - MaxRuntime: int64(math.Ceil(cfg.Importer.Timeout.Minutes())), - CheckInMargin: 5, - Timezone: time.Local.String(), - } - - s.wg.Add(1) - nextRun := nextUTCOccurrenceOfTime(s.cfg.Importer.UpdateAt) - importer.SetNextRun(nextRun) - for { - s.log.Debug("scheduling next run", "next-run", nextRun) - select { - case <-ctx.Done(): - s.log.Debug("stopping scheduler") - s.wg.Done() - - return - case <-time.After(time.Until(nextRun)): - } - s.wg.Add(1) - s.log.Info("updating index") - - eventID := localHub.CaptureCheckIn(&sentry.CheckIn{ - MonitorSlug: monitorSlug, - Status: sentry.CheckInStatusInProgress, - }, monitorConfig) - importer.MarkIndexingStarted() - - imp := importer.New(s.cfg, s.log.Named("importer"), s.writeIndex) - err = imp.Start(ctx, false, nil) - s.wg.Done() - if err != nil { - s.log.Warn("error updating index", "error", err) - - localHub.CaptureException(err) - localHub.CaptureCheckIn(&sentry.CheckIn{ - ID: *eventID, - MonitorSlug: monitorSlug, - Status: sentry.CheckInStatusError, - }, monitorConfig) - } else { - s.log.Info("update complete") - - localHub.CaptureCheckIn(&sentry.CheckIn{ - ID: *eventID, - MonitorSlug: monitorSlug, - Status: sentry.CheckInStatusOK, - }, monitorConfig) - } - nextRun = nextRun.AddDate(0, 0, 1) - importer.MarkIndexingFinished(nextRun) - } - }) -} - -func (s *Server) Start(ctx context.Context, liveReload bool) errors.E { +func (s *Server) Start(liveReload bool) errors.E { var err errors.E s.sv, err = server.New(s.cfg, s.readIndex, s.log.Named("server"), liveReload) if err != nil { return errors.Wrap(err, "error setting up server") } - s.wg = &sync.WaitGroup{} - go s.startUpdateTimer(ctx, s.cfg, sentry.CurrentHub().Clone()) - - s.wg.Add(1) err = s.sv.Start() if err != nil { - s.wg.Done() - return errors.Wrap(err, "error starting server") } @@ -151,6 +56,5 @@ func (s *Server) Start(ctx context.Context, liveReload bool) errors.E { func (s *Server) Stop() { <-s.sv.Stop() - defer s.wg.Done() s.sentryHub.Flush(2 * time.Second) } |