From f23d67df63defd5f6fe6773789851dd63f3ac829 Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Mon, 24 Mar 2025 10:01:57 +0100 Subject: refactor: move SetupIndex and indexing progress to importer --- cmd/searchix-web/main.go | 7 +- internal/components/data.go | 28 ----- internal/components/search.go | 33 +++--- internal/importer/main.go | 117 ++++++++++++++++++++ internal/index/indexer.go | 10 +- searchix.go | 252 ------------------------------------------ web/searchix.go | 156 ++++++++++++++++++++++++++ 7 files changed, 299 insertions(+), 304 deletions(-) delete mode 100644 searchix.go create mode 100644 web/searchix.go diff --git a/cmd/searchix-web/main.go b/cmd/searchix-web/main.go index 6d6dffa..67b2074 100644 --- a/cmd/searchix-web/main.go +++ b/cmd/searchix-web/main.go @@ -10,8 +10,9 @@ import ( "badc0de.net/pkg/flagutil" - "go.alanpearce.eu/searchix" "go.alanpearce.eu/searchix/internal/config" + "go.alanpearce.eu/searchix/internal/importer" + "go.alanpearce.eu/searchix/web" "go.alanpearce.eu/x/log" ) @@ -69,12 +70,12 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() - s, err := searchix.New(cfg, logger) + s, err := web.New(cfg, logger) if err != nil { logger.Fatal("Failed to initialise searchix", "error", err) } - err = s.SetupIndex(ctx, &searchix.IndexOptions{ + err = importer.SetupIndex(ctx, cfg, &importer.Options{ Update: *update, Replace: *replace, LowMemory: cfg.Importer.LowMemory, diff --git a/internal/components/data.go b/internal/components/data.go index 9bc0c5e..977b90e 100644 --- a/internal/components/data.go +++ b/internal/components/data.go @@ -1,21 +1,12 @@ package components import ( - "time" - "go.alanpearce.eu/searchix/frontend" "go.alanpearce.eu/searchix/internal/config" search "go.alanpearce.eu/searchix/internal/index" "go.alanpearce.eu/searchix/internal/nix" ) -var Indexing struct { - InProgress bool - StartedAt time.Time - FinishedAt time.Time - NextRun time.Time -} - type TemplateData struct { Sources []*config.Source Source *config.Source @@ -43,22 +34,3 @@ func convertMatch[I nix.Importable](m nix.Importable) *I { return &i } - -func SetNextRun(nextRun time.Time) { - Indexing.NextRun = nextRun -} - -func SetLastUpdated(last time.Time) { - Indexing.FinishedAt = last -} - -func MarkIndexingStarted() { - Indexing.StartedAt = time.Now() - Indexing.InProgress = true -} - -func MarkIndexingFinished(nextRun time.Time) { - Indexing.FinishedAt = time.Now() - Indexing.InProgress = false - Indexing.NextRun = nextRun -} diff --git a/internal/components/search.go b/internal/components/search.go index 3db1cd4..05cdaa3 100644 --- a/internal/components/search.go +++ b/internal/components/search.go @@ -6,6 +6,7 @@ import ( g "go.alanpearce.eu/gomponents" . "go.alanpearce.eu/gomponents/html" "go.alanpearce.eu/searchix/internal/config" + "go.alanpearce.eu/searchix/internal/importer" ) func SearchForm(tdata TemplateData, r ResultData) g.Node { @@ -41,38 +42,38 @@ func SearchPage(tdata TemplateData, r ResultData, children ...g.Node) g.Node { return A(Href(source.Repo.String()), g.Text(source.Name)) }), ), - g.If(Indexing.InProgress, + g.If(importer.Job.InProgress, P(Class("notice"), - g.Text("Indexing in progress, started "), + g.Text("importer.Indexing in progress, started "), Time( - DateTime(Indexing.StartedAt.Format(time.RFC3339)), - Title(Indexing.StartedAt.Format(time.DateTime)), - g.Text(time.Since(Indexing.StartedAt).Round(time.Second).String()), + DateTime(importer.Job.StartedAt.Format(time.RFC3339)), + Title(importer.Job.StartedAt.Format(time.DateTime)), + g.Text(time.Since(importer.Job.StartedAt).Round(time.Second).String()), ), g.Text(" ago. "), - g.If(!Indexing.FinishedAt.IsZero(), + g.If(!importer.Job.FinishedAt.IsZero(), g.Group([]g.Node{ g.Text("Last run took "), Time( - DateTime(Indexing.FinishedAt.Format(time.RFC3339)), - Title(Indexing.FinishedAt.Format(time.DateTime)), - g.Text(time.Since(Indexing.FinishedAt).Round(time.Minute).String()), + DateTime(importer.Job.FinishedAt.Format(time.RFC3339)), + Title(importer.Job.FinishedAt.Format(time.DateTime)), + g.Text(time.Since(importer.Job.FinishedAt).Round(time.Minute).String()), ), }), ), ), P( - g.Text("Indexing last ran "), + g.Text("importer.Indexing last ran "), Time( - DateTime(Indexing.FinishedAt.Format(time.RFC3339)), - Title(Indexing.FinishedAt.Format(time.DateTime)), - g.Textf("%.0f hours ago", time.Since(Indexing.FinishedAt).Hours()), + DateTime(importer.Job.FinishedAt.Format(time.RFC3339)), + Title(importer.Job.FinishedAt.Format(time.DateTime)), + g.Textf("%.0f hours ago", time.Since(importer.Job.FinishedAt).Hours()), ), g.Text(", will run again in "), Time( - DateTime(Indexing.NextRun.Format(time.RFC3339)), - Title(Indexing.NextRun.Format(time.DateTime)), - g.Textf("%.0f hours", time.Until(Indexing.NextRun).Hours()), + DateTime(importer.Job.NextRun.Format(time.RFC3339)), + Title(importer.Job.NextRun.Format(time.DateTime)), + g.Textf("%.0f hours", time.Until(importer.Job.NextRun).Hours()), ), g.Text("."), ), diff --git a/internal/importer/main.go b/internal/importer/main.go index e2c222c..172c504 100644 --- a/internal/importer/main.go +++ b/internal/importer/main.go @@ -17,6 +17,39 @@ import ( "gitlab.com/tozd/go/errors" ) +type Options struct { + Update bool + Replace bool + LowMemory bool + Logger *log.Logger +} + +var Job struct { + InProgress bool + StartedAt time.Time + FinishedAt time.Time + NextRun time.Time +} + +func SetNextRun(nextRun time.Time) { + Job.NextRun = nextRun +} + +func SetLastUpdated(last time.Time) { + Job.FinishedAt = last +} + +func MarkIndexingStarted() { + Job.StartedAt = time.Now() + Job.InProgress = true +} + +func MarkIndexingFinished(nextRun time.Time) { + Job.FinishedAt = time.Now() + Job.InProgress = false + Job.NextRun = nextRun +} + func createSourceImporter( parent context.Context, log *log.Logger, @@ -203,3 +236,87 @@ func (imp *Importer) Start( return nil } + +func SetupIndex(ctx context.Context, cfg *config.Config, options *Options) errors.E { + var i uint + cfgEnabledSources := make([]string, len(cfg.Importer.Sources)) + for key := range cfg.Importer.Sources { + cfgEnabledSources[i] = key + i++ + } + slices.Sort(cfgEnabledSources) + + read, write, exists, err := index.OpenOrCreate( + cfg.DataPath, + options.Replace, + &index.Options{ + LowMemory: options.LowMemory, + Logger: options.Logger.Named("index"), + }, + ) + if err != nil { + return errors.Wrap(err, "Failed to open or create index") + } + + if !exists || options.Replace || options.Update { + options.Logger.Info( + "Starting build job", + "new", + !exists, + "replace", + options.Replace, + "update", + options.Update, + ) + imp := New(cfg, options.Logger.Named("importer"), write) + err = imp.Start( + ctx, + options.Replace || options.Update, + nil, + ) + if err != nil { + return errors.Wrap(err, "Failed to build index") + } + if options.Replace || options.Update { + return nil + } + } else { + indexedSources, err := read.GetEnabledSources() + if err != nil { + return errors.Wrap(err, "Failed to get enabled sources from index") + } + slices.Sort(indexedSources) + if !slices.Equal(cfgEnabledSources, indexedSources) { + newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { + return slices.Contains(indexedSources, s) + }) + retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { + return slices.Contains(cfgEnabledSources, s) + }) + if len(newSources) > 0 { + options.Logger.Info("adding new sources", "sources", newSources) + imp := New(cfg, options.Logger.Named("importer"), write) + err := imp.Start( + ctx, + false, + &newSources, + ) + if err != nil { + return errors.Wrap(err, "Failed to update index with new sources") + } + } + if len(retiredSources) > 0 { + options.Logger.Info("removing retired sources", "sources", retiredSources) + for _, s := range retiredSources { + err := write.DeleteBySource(s) + if err != nil { + return errors.Wrapf(err, "Failed to remove retired source %s", s) + } + } + } + } + } + SetLastUpdated(read.LastUpdated()) + + return nil +} diff --git a/internal/index/indexer.go b/internal/index/indexer.go index c4032e8..7591aef 100644 --- a/internal/index/indexer.go +++ b/internal/index/indexer.go @@ -32,6 +32,11 @@ import ( "gitlab.com/tozd/go/errors" ) +type Options struct { + LowMemory bool + Logger *log.Logger +} + type WriteIndex struct { index bleve.Index log *log.Logger @@ -204,11 +209,6 @@ func deleteIndex(dataRoot string) errors.E { return nil } -type Options struct { - LowMemory bool - Logger *log.Logger -} - func OpenOrCreate( dataRoot string, force bool, diff --git a/searchix.go b/searchix.go deleted file mode 100644 index 8bd696d..0000000 --- a/searchix.go +++ /dev/null @@ -1,252 +0,0 @@ -package searchix - -import ( - "context" - "math" - "slices" - "sync" - "time" - - "go.alanpearce.eu/searchix/internal/components" - "go.alanpearce.eu/searchix/internal/config" - "go.alanpearce.eu/searchix/internal/importer" - "go.alanpearce.eu/searchix/internal/index" - "go.alanpearce.eu/searchix/internal/server" - "go.alanpearce.eu/x/log" - - "github.com/getsentry/sentry-go" - "gitlab.com/tozd/go/errors" -) - -func nextUTCOccurrenceOfTime(t config.LocalTime) time.Time { - now := time.Now() - dayTime := t - nextRun := time.Date( - now.Year(), - now.Month(), - now.Day(), - dayTime.Hour, - dayTime.Minute, - dayTime.Second, - 0, - time.UTC, - ) - if nextRun.Before(now) { - return nextRun.AddDate(0, 0, 1) - } - - return nextRun -} - -type IndexOptions struct { - Update bool - Replace bool - LowMemory bool - Logger *log.Logger -} - -func (s *Server) SetupIndex(ctx context.Context, options *IndexOptions) errors.E { - var i uint - cfgEnabledSources := make([]string, len(s.cfg.Importer.Sources)) - for key := range s.cfg.Importer.Sources { - cfgEnabledSources[i] = key - i++ - } - slices.Sort(cfgEnabledSources) - - read, write, exists, err := index.OpenOrCreate( - s.cfg.DataPath, - options.Replace, - &index.Options{ - LowMemory: options.LowMemory, - Logger: options.Logger.Named("index"), - }, - ) - if err != nil { - return errors.Wrap(err, "Failed to open or create index") - } - s.readIndex = read - s.writeIndex = write - - if !exists || options.Replace || options.Update { - s.log.Info( - "Starting build job", - "new", - !exists, - "replace", - options.Replace, - "update", - options.Update, - ) - imp := importer.New(s.cfg, s.log.Named("importer"), write) - err = imp.Start( - ctx, - options.Replace || options.Update, - nil, - ) - if err != nil { - return errors.Wrap(err, "Failed to build index") - } - if options.Replace || options.Update { - return nil - } - } else { - indexedSources, err := read.GetEnabledSources() - if err != nil { - return errors.Wrap(err, "Failed to get enabled sources from index") - } - slices.Sort(indexedSources) - if !slices.Equal(cfgEnabledSources, indexedSources) { - newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { - return slices.Contains(indexedSources, s) - }) - retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { - return slices.Contains(cfgEnabledSources, s) - }) - if len(newSources) > 0 { - s.log.Info("adding new sources", "sources", newSources) - imp := importer.New(s.cfg, options.Logger.Named("importer"), write) - err := imp.Start( - ctx, - false, - &newSources, - ) - if err != nil { - return errors.Wrap(err, "Failed to update index with new sources") - } - } - if len(retiredSources) > 0 { - s.log.Info("removing retired sources", "sources", retiredSources) - for _, s := range retiredSources { - err := write.DeleteBySource(s) - if err != nil { - return errors.Wrapf(err, "Failed to remove retired source %s", s) - } - } - } - } - } - components.SetLastUpdated(read.LastUpdated()) - - return nil -} - -type Server struct { - sv *server.Server - wg *sync.WaitGroup - cfg *config.Config - log *log.Logger - sentryHub *sentry.Hub - readIndex *index.ReadIndex - writeIndex *index.WriteIndex -} - -func New(cfg *config.Config, log *log.Logger) (*Server, errors.E) { - err := sentry.Init(sentry.ClientOptions{ - EnableTracing: true, - TracesSampleRate: 1.0, - Dsn: cfg.Web.SentryDSN, - Environment: cfg.Web.Environment, - }) - if err != nil { - log.Warn("could not initialise sentry", "error", err) - } - - return &Server{ - cfg: cfg, - log: log, - sentryHub: sentry.CurrentHub(), - }, nil -} - -func (s *Server) startUpdateTimer( - ctx context.Context, - cfg *config.Config, - localHub *sentry.Hub, -) { - const monitorSlug = "import" - localHub.WithScope(func(scope *sentry.Scope) { - var err errors.E - scope.SetContext("monitor", sentry.Context{"slug": monitorSlug}) - monitorConfig := &sentry.MonitorConfig{ - Schedule: sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay), - MaxRuntime: int64(math.Ceil(cfg.Importer.Timeout.Minutes())), - CheckInMargin: 5, - Timezone: time.Local.String(), - } - - s.wg.Add(1) - nextRun := nextUTCOccurrenceOfTime(s.cfg.Importer.UpdateAt) - components.SetNextRun(nextRun) - for { - s.log.Debug("scheduling next run", "next-run", nextRun) - select { - case <-ctx.Done(): - s.log.Debug("stopping scheduler") - s.wg.Done() - - return - case <-time.After(time.Until(nextRun)): - } - s.wg.Add(1) - s.log.Info("updating index") - - eventID := localHub.CaptureCheckIn(&sentry.CheckIn{ - MonitorSlug: monitorSlug, - Status: sentry.CheckInStatusInProgress, - }, monitorConfig) - components.MarkIndexingStarted() - - imp := importer.New(s.cfg, s.log.Named("importer"), s.writeIndex) - err = imp.Start(ctx, false, nil) - s.wg.Done() - if err != nil { - s.log.Warn("error updating index", "error", err) - - localHub.CaptureException(err) - localHub.CaptureCheckIn(&sentry.CheckIn{ - ID: *eventID, - MonitorSlug: monitorSlug, - Status: sentry.CheckInStatusError, - }, monitorConfig) - } else { - s.log.Info("update complete") - - localHub.CaptureCheckIn(&sentry.CheckIn{ - ID: *eventID, - MonitorSlug: monitorSlug, - Status: sentry.CheckInStatusOK, - }, monitorConfig) - } - nextRun = nextRun.AddDate(0, 0, 1) - components.MarkIndexingFinished(nextRun) - } - }) -} - -func (s *Server) Start(ctx context.Context, liveReload bool) errors.E { - var err errors.E - s.sv, err = server.New(s.cfg, s.readIndex, s.log.Named("server"), liveReload) - if err != nil { - return errors.Wrap(err, "error setting up server") - } - - s.wg = &sync.WaitGroup{} - go s.startUpdateTimer(ctx, s.cfg, sentry.CurrentHub().Clone()) - - s.wg.Add(1) - err = s.sv.Start() - if err != nil { - s.wg.Done() - - return errors.Wrap(err, "error starting server") - } - - return nil -} - -func (s *Server) Stop() { - <-s.sv.Stop() - defer s.wg.Done() - s.sentryHub.Flush(2 * time.Second) -} diff --git a/web/searchix.go b/web/searchix.go new file mode 100644 index 0000000..b410d8f --- /dev/null +++ b/web/searchix.go @@ -0,0 +1,156 @@ +package web + +import ( + "context" + "math" + "sync" + "time" + + "go.alanpearce.eu/searchix/internal/config" + "go.alanpearce.eu/searchix/internal/importer" + "go.alanpearce.eu/searchix/internal/index" + "go.alanpearce.eu/searchix/internal/server" + "go.alanpearce.eu/x/log" + + "github.com/getsentry/sentry-go" + "gitlab.com/tozd/go/errors" +) + +func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time { + now := time.Now() + nextRun := time.Date( + now.Year(), + now.Month(), + now.Day(), + dayTime.Hour, + dayTime.Minute, + dayTime.Second, + 0, + time.UTC, + ) + if nextRun.Before(now) { + return nextRun.AddDate(0, 0, 1) + } + + return nextRun +} + +type Server struct { + sv *server.Server + wg *sync.WaitGroup + cfg *config.Config + log *log.Logger + sentryHub *sentry.Hub + readIndex *index.ReadIndex + writeIndex *index.WriteIndex +} + +func New(cfg *config.Config, log *log.Logger) (*Server, errors.E) { + err := sentry.Init(sentry.ClientOptions{ + EnableTracing: true, + TracesSampleRate: 1.0, + Dsn: cfg.Web.SentryDSN, + Environment: cfg.Web.Environment, + }) + if err != nil { + log.Warn("could not initialise sentry", "error", err) + } + + return &Server{ + cfg: cfg, + log: log, + sentryHub: sentry.CurrentHub(), + }, nil +} + +func (s *Server) startUpdateTimer( + ctx context.Context, + cfg *config.Config, + localHub *sentry.Hub, +) { + const monitorSlug = "import" + localHub.WithScope(func(scope *sentry.Scope) { + var err errors.E + scope.SetContext("monitor", sentry.Context{"slug": monitorSlug}) + monitorConfig := &sentry.MonitorConfig{ + Schedule: sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay), + MaxRuntime: int64(math.Ceil(cfg.Importer.Timeout.Minutes())), + CheckInMargin: 5, + Timezone: time.Local.String(), + } + + s.wg.Add(1) + nextRun := nextUTCOccurrenceOfTime(s.cfg.Importer.UpdateAt) + importer.SetNextRun(nextRun) + for { + s.log.Debug("scheduling next run", "next-run", nextRun) + select { + case <-ctx.Done(): + s.log.Debug("stopping scheduler") + s.wg.Done() + + return + case <-time.After(time.Until(nextRun)): + } + s.wg.Add(1) + s.log.Info("updating index") + + eventID := localHub.CaptureCheckIn(&sentry.CheckIn{ + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusInProgress, + }, monitorConfig) + importer.MarkIndexingStarted() + + imp := importer.New(s.cfg, s.log.Named("importer"), s.writeIndex) + err = imp.Start(ctx, false, nil) + s.wg.Done() + if err != nil { + s.log.Warn("error updating index", "error", err) + + localHub.CaptureException(err) + localHub.CaptureCheckIn(&sentry.CheckIn{ + ID: *eventID, + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusError, + }, monitorConfig) + } else { + s.log.Info("update complete") + + localHub.CaptureCheckIn(&sentry.CheckIn{ + ID: *eventID, + MonitorSlug: monitorSlug, + Status: sentry.CheckInStatusOK, + }, monitorConfig) + } + nextRun = nextRun.AddDate(0, 0, 1) + importer.MarkIndexingFinished(nextRun) + } + }) +} + +func (s *Server) Start(ctx context.Context, liveReload bool) errors.E { + var err errors.E + s.sv, err = server.New(s.cfg, s.readIndex, s.log.Named("server"), liveReload) + if err != nil { + return errors.Wrap(err, "error setting up server") + } + + s.wg = &sync.WaitGroup{} + go s.startUpdateTimer(ctx, s.cfg, sentry.CurrentHub().Clone()) + + s.wg.Add(1) + err = s.sv.Start() + if err != nil { + s.wg.Done() + + return errors.Wrap(err, "error starting server") + } + + return nil +} + +func (s *Server) Stop() { + <-s.sv.Stop() + defer s.wg.Done() + s.sentryHub.Flush(2 * time.Second) +} -- cgit 1.4.1