about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorAlan Pearce2025-03-24 10:35:21 +0100
committerAlan Pearce2025-03-24 14:22:04 +0100
commit523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38 (patch)
treefbfc0d8323f3e58cbf46f79500244356e779fc7a
parentf23d67df63defd5f6fe6773789851dd63f3ac829 (diff)
downloadsearchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.tar.lz
searchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.tar.zst
searchix-523d9dcd4cc2932e5fd4df80cd0cff0d0ca43c38.zip
refactor: split out importer code from searchix/web
-rw-r--r--cmd/searchix-web/main.go51
-rw-r--r--internal/importer/main.go219
-rw-r--r--internal/importer/main_test.go10
-rw-r--r--web/searchix.go112
4 files changed, 190 insertions, 202 deletions
diff --git a/cmd/searchix-web/main.go b/cmd/searchix-web/main.go
index 67b2074..c70b4d3 100644
--- a/cmd/searchix-web/main.go
+++ b/cmd/searchix-web/main.go
@@ -7,11 +7,14 @@ import (
 	"os"
 	"os/signal"
 	"runtime/pprof"
+	"sync"
 
 	"badc0de.net/pkg/flagutil"
+	"github.com/getsentry/sentry-go"
 
 	"go.alanpearce.eu/searchix/internal/config"
 	"go.alanpearce.eu/searchix/internal/importer"
+	"go.alanpearce.eu/searchix/internal/index"
 	"go.alanpearce.eu/searchix/web"
 	"go.alanpearce.eu/x/log"
 )
@@ -70,35 +73,65 @@ func main() {
 	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
 	defer cancel()
 
-	s, err := web.New(cfg, logger)
+	read, write, exists, err := index.OpenOrCreate(
+		cfg.DataPath,
+		*replace,
+		&index.Options{
+			LowMemory: cfg.Importer.LowMemory,
+			Logger:    logger.Named("index"),
+		},
+	)
+	if err != nil {
+		logger.Fatal("Failed to open or create index", "error", err)
+	}
+
+	s, err := web.New(cfg, logger, read)
 	if err != nil {
-		logger.Fatal("Failed to initialise searchix", "error", err)
+		logger.Fatal("Failed to initialise searchix-web", "error", err)
 	}
 
-	err = importer.SetupIndex(ctx, cfg, &importer.Options{
-		Update:    *update,
-		Replace:   *replace,
+	imp, err := importer.New(cfg, &importer.Options{
 		LowMemory: cfg.Importer.LowMemory,
-		Logger:    logger,
+		Logger:    logger.Named("importer"),
 	})
 	if err != nil {
-		logger.Fatal("Failed to setup index", "error", err)
+		logger.Fatal("Failed to create importer", "error", err)
 	}
 
-	if *replace || *update {
+	if !exists || *replace || *update {
+		err := imp.Start(ctx, true, nil)
+		if err != nil {
+			logger.Fatal("Failed to start importer", "error", err)
+		}
+
 		return
 	}
 
+	err = imp.EnsureSourcesIndexed(ctx, read, write)
+	if err != nil {
+		logger.Fatal("Failed to setup index", "error", err)
+	}
+
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
 	go func() {
-		err = s.Start(ctx, *dev)
+		defer wg.Done()
+		err := s.Start(*dev)
 		if err != nil {
 			// Error starting or closing listener:
 			logger.Fatal("error", "error", err)
 		}
 	}()
 
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		imp.StartUpdateTimer(ctx, sentry.CurrentHub().Clone())
+	}()
+
 	<-ctx.Done()
 	logger.Debug("calling stop")
 	s.Stop()
+	wg.Wait()
 	logger.Debug("done")
 }
diff --git a/internal/importer/main.go b/internal/importer/main.go
index 172c504..71606e9 100644
--- a/internal/importer/main.go
+++ b/internal/importer/main.go
@@ -3,11 +3,14 @@ package importer
 import (
 	"context"
 	"fmt"
+	"maps"
+	"math"
 	"os/exec"
 	"slices"
 	"strings"
 	"time"
 
+	"github.com/getsentry/sentry-go"
 	"go.alanpearce.eu/searchix/internal/config"
 	"go.alanpearce.eu/searchix/internal/fetcher"
 	"go.alanpearce.eu/searchix/internal/index"
@@ -18,10 +21,9 @@ import (
 )
 
 type Options struct {
-	Update    bool
-	Replace   bool
-	LowMemory bool
-	Logger    *log.Logger
+	LowMemory  bool
+	Logger     *log.Logger
+	WriteIndex *index.WriteIndex
 }
 
 var Job struct {
@@ -54,7 +56,7 @@ func createSourceImporter(
 	parent context.Context,
 	log *log.Logger,
 	meta *index.Meta,
-	indexer *index.WriteIndex,
+	writeIndex *index.WriteIndex,
 	forceUpdate bool,
 ) func(*config.Source) errors.E {
 	return func(source *config.Source) errors.E {
@@ -157,7 +159,7 @@ func createSourceImporter(
 				return errors.WithMessagef(err, "failed to create processor")
 			}
 
-			hadWarnings, err := process(ctx, indexer, processor, logger)
+			hadWarnings, err := process(ctx, writeIndex, processor, logger)
 			if err != nil {
 				return errors.WithMessagef(err, "failed to process source")
 			}
@@ -182,18 +184,6 @@ type Importer struct {
 	indexer *index.WriteIndex
 }
 
-func New(
-	cfg *config.Config,
-	log *log.Logger,
-	indexer *index.WriteIndex,
-) *Importer {
-	return &Importer{
-		config:  cfg,
-		log:     log,
-		indexer: indexer,
-	}
-}
-
 func (imp *Importer) Start(
 	ctx context.Context,
 	forceUpdate bool,
@@ -234,89 +224,142 @@ func (imp *Importer) Start(
 		return errors.Wrap(err, "failed to save metadata")
 	}
 
+	SetLastUpdated(time.Now())
+
 	return nil
 }
 
-func SetupIndex(ctx context.Context, cfg *config.Config, options *Options) errors.E {
-	var i uint
-	cfgEnabledSources := make([]string, len(cfg.Importer.Sources))
-	for key := range cfg.Importer.Sources {
-		cfgEnabledSources[i] = key
-		i++
-	}
+func New(
+	cfg *config.Config,
+	options *Options,
+) (*Importer, errors.E) {
+
+	return &Importer{
+		config:  cfg,
+		log:     options.Logger,
+		indexer: options.WriteIndex,
+	}, nil
+}
+
+func (imp *Importer) EnsureSourcesIndexed(
+	ctx context.Context,
+	read *index.ReadIndex,
+	write *index.WriteIndex,
+) errors.E {
+	cfgEnabledSources := slices.Collect(maps.Keys(imp.config.Importer.Sources))
 	slices.Sort(cfgEnabledSources)
 
-	read, write, exists, err := index.OpenOrCreate(
-		cfg.DataPath,
-		options.Replace,
-		&index.Options{
-			LowMemory: options.LowMemory,
-			Logger:    options.Logger.Named("index"),
-		},
-	)
+	indexedSources, err := read.GetEnabledSources()
 	if err != nil {
-		return errors.Wrap(err, "Failed to open or create index")
+		return errors.Wrap(err, "Failed to get enabled sources from index")
 	}
-
-	if !exists || options.Replace || options.Update {
-		options.Logger.Info(
-			"Starting build job",
-			"new",
-			!exists,
-			"replace",
-			options.Replace,
-			"update",
-			options.Update,
-		)
-		imp := New(cfg, options.Logger.Named("importer"), write)
-		err = imp.Start(
-			ctx,
-			options.Replace || options.Update,
-			nil,
-		)
-		if err != nil {
-			return errors.Wrap(err, "Failed to build index")
-		}
-		if options.Replace || options.Update {
-			return nil
-		}
-	} else {
-		indexedSources, err := read.GetEnabledSources()
-		if err != nil {
-			return errors.Wrap(err, "Failed to get enabled sources from index")
+	slices.Sort(indexedSources)
+	if !slices.Equal(cfgEnabledSources, indexedSources) {
+		newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool {
+			return slices.Contains(indexedSources, s)
+		})
+		retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool {
+			return slices.Contains(cfgEnabledSources, s)
+		})
+		if len(newSources) > 0 {
+			imp.log.Info("adding new sources", "sources", newSources)
+			err := imp.Start(
+				ctx,
+				false,
+				&newSources,
+			)
+			if err != nil {
+				return errors.Wrap(err, "Failed to update index with new sources")
+			}
 		}
-		slices.Sort(indexedSources)
-		if !slices.Equal(cfgEnabledSources, indexedSources) {
-			newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool {
-				return slices.Contains(indexedSources, s)
-			})
-			retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool {
-				return slices.Contains(cfgEnabledSources, s)
-			})
-			if len(newSources) > 0 {
-				options.Logger.Info("adding new sources", "sources", newSources)
-				imp := New(cfg, options.Logger.Named("importer"), write)
-				err := imp.Start(
-					ctx,
-					false,
-					&newSources,
-				)
+		if len(retiredSources) > 0 {
+			imp.log.Info("removing retired sources", "sources", retiredSources)
+			for _, s := range retiredSources {
+				err := write.DeleteBySource(s)
 				if err != nil {
-					return errors.Wrap(err, "Failed to update index with new sources")
-				}
-			}
-			if len(retiredSources) > 0 {
-				options.Logger.Info("removing retired sources", "sources", retiredSources)
-				for _, s := range retiredSources {
-					err := write.DeleteBySource(s)
-					if err != nil {
-						return errors.Wrapf(err, "Failed to remove retired source %s", s)
-					}
+					return errors.Wrapf(err, "Failed to remove retired source %s", s)
 				}
 			}
 		}
 	}
-	SetLastUpdated(read.LastUpdated())
 
 	return nil
 }
+
+func (imp *Importer) StartUpdateTimer(
+	ctx context.Context,
+	localHub *sentry.Hub,
+) {
+	const monitorSlug = "import"
+	localHub.WithScope(func(scope *sentry.Scope) {
+		var err errors.E
+		scope.SetContext("monitor", sentry.Context{"slug": monitorSlug})
+		monitorConfig := &sentry.MonitorConfig{
+			Schedule:      sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay),
+			MaxRuntime:    int64(math.Ceil(imp.config.Importer.Timeout.Minutes())),
+			CheckInMargin: 5,
+			Timezone:      time.Local.String(),
+		}
+
+		nextRun := nextUTCOccurrenceOfTime(imp.config.Importer.UpdateAt)
+		SetNextRun(nextRun)
+		for {
+			imp.log.Debug("scheduling next run", "next-run", nextRun)
+			select {
+			case <-ctx.Done():
+				imp.log.Debug("stopping scheduler")
+
+				return
+			case <-time.After(time.Until(nextRun)):
+			}
+			imp.log.Info("updating index")
+
+			eventID := localHub.CaptureCheckIn(&sentry.CheckIn{
+				MonitorSlug: monitorSlug,
+				Status:      sentry.CheckInStatusInProgress,
+			}, monitorConfig)
+			MarkIndexingStarted()
+
+			err = imp.Start(ctx, false, nil)
+			if err != nil {
+				imp.log.Warn("error updating index", "error", err)
+
+				localHub.CaptureException(err)
+				localHub.CaptureCheckIn(&sentry.CheckIn{
+					ID:          *eventID,
+					MonitorSlug: monitorSlug,
+					Status:      sentry.CheckInStatusError,
+				}, monitorConfig)
+			} else {
+				imp.log.Info("update complete")
+
+				localHub.CaptureCheckIn(&sentry.CheckIn{
+					ID:          *eventID,
+					MonitorSlug: monitorSlug,
+					Status:      sentry.CheckInStatusOK,
+				}, monitorConfig)
+			}
+			nextRun = nextRun.AddDate(0, 0, 1)
+			MarkIndexingFinished(nextRun)
+		}
+	})
+}
+
+func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time {
+	now := time.Now()
+	nextRun := time.Date(
+		now.Year(),
+		now.Month(),
+		now.Day(),
+		dayTime.Hour,
+		dayTime.Minute,
+		dayTime.Second,
+		0,
+		time.UTC,
+	)
+	if nextRun.Before(now) {
+		return nextRun.AddDate(0, 0, 1)
+	}
+
+	return nextRun
+}
diff --git a/internal/importer/main_test.go b/internal/importer/main_test.go
index 576d681..84f6adf 100644
--- a/internal/importer/main_test.go
+++ b/internal/importer/main_test.go
@@ -22,7 +22,15 @@ func BenchmarkImporterLowMemory(b *testing.B) {
 		b.Fatal(err)
 	}
 
-	imp := New(&cfg, logger.Named("importer"), write)
+	imp, err := New(&cfg, &Options{
+		Logger:     logger.Named("importer"),
+		LowMemory:  true,
+		WriteIndex: write,
+	})
+	if err != nil {
+		b.Fatal(err)
+	}
+
 	err = imp.Start(
 		context.Background(),
 		false,
diff --git a/web/searchix.go b/web/searchix.go
index b410d8f..3197285 100644
--- a/web/searchix.go
+++ b/web/searchix.go
@@ -1,13 +1,9 @@
 package web
 
 import (
-	"context"
-	"math"
-	"sync"
 	"time"
 
 	"go.alanpearce.eu/searchix/internal/config"
-	"go.alanpearce.eu/searchix/internal/importer"
 	"go.alanpearce.eu/searchix/internal/index"
 	"go.alanpearce.eu/searchix/internal/server"
 	"go.alanpearce.eu/x/log"
@@ -16,36 +12,15 @@ import (
 	"gitlab.com/tozd/go/errors"
 )
 
-func nextUTCOccurrenceOfTime(dayTime config.LocalTime) time.Time {
-	now := time.Now()
-	nextRun := time.Date(
-		now.Year(),
-		now.Month(),
-		now.Day(),
-		dayTime.Hour,
-		dayTime.Minute,
-		dayTime.Second,
-		0,
-		time.UTC,
-	)
-	if nextRun.Before(now) {
-		return nextRun.AddDate(0, 0, 1)
-	}
-
-	return nextRun
-}
-
 type Server struct {
-	sv         *server.Server
-	wg         *sync.WaitGroup
-	cfg        *config.Config
-	log        *log.Logger
-	sentryHub  *sentry.Hub
-	readIndex  *index.ReadIndex
-	writeIndex *index.WriteIndex
+	sv        *server.Server
+	cfg       *config.Config
+	log       *log.Logger
+	sentryHub *sentry.Hub
+	readIndex *index.ReadIndex
 }
 
-func New(cfg *config.Config, log *log.Logger) (*Server, errors.E) {
+func New(cfg *config.Config, log *log.Logger, read *index.ReadIndex) (*Server, errors.E) {
 	err := sentry.Init(sentry.ClientOptions{
 		EnableTracing:    true,
 		TracesSampleRate: 1.0,
@@ -60,89 +35,19 @@ func New(cfg *config.Config, log *log.Logger) (*Server, errors.E) {
 		cfg:       cfg,
 		log:       log,
 		sentryHub: sentry.CurrentHub(),
+		readIndex: read,
 	}, nil
 }
 
-func (s *Server) startUpdateTimer(
-	ctx context.Context,
-	cfg *config.Config,
-	localHub *sentry.Hub,
-) {
-	const monitorSlug = "import"
-	localHub.WithScope(func(scope *sentry.Scope) {
-		var err errors.E
-		scope.SetContext("monitor", sentry.Context{"slug": monitorSlug})
-		monitorConfig := &sentry.MonitorConfig{
-			Schedule:      sentry.IntervalSchedule(1, sentry.MonitorScheduleUnitDay),
-			MaxRuntime:    int64(math.Ceil(cfg.Importer.Timeout.Minutes())),
-			CheckInMargin: 5,
-			Timezone:      time.Local.String(),
-		}
-
-		s.wg.Add(1)
-		nextRun := nextUTCOccurrenceOfTime(s.cfg.Importer.UpdateAt)
-		importer.SetNextRun(nextRun)
-		for {
-			s.log.Debug("scheduling next run", "next-run", nextRun)
-			select {
-			case <-ctx.Done():
-				s.log.Debug("stopping scheduler")
-				s.wg.Done()
-
-				return
-			case <-time.After(time.Until(nextRun)):
-			}
-			s.wg.Add(1)
-			s.log.Info("updating index")
-
-			eventID := localHub.CaptureCheckIn(&sentry.CheckIn{
-				MonitorSlug: monitorSlug,
-				Status:      sentry.CheckInStatusInProgress,
-			}, monitorConfig)
-			importer.MarkIndexingStarted()
-
-			imp := importer.New(s.cfg, s.log.Named("importer"), s.writeIndex)
-			err = imp.Start(ctx, false, nil)
-			s.wg.Done()
-			if err != nil {
-				s.log.Warn("error updating index", "error", err)
-
-				localHub.CaptureException(err)
-				localHub.CaptureCheckIn(&sentry.CheckIn{
-					ID:          *eventID,
-					MonitorSlug: monitorSlug,
-					Status:      sentry.CheckInStatusError,
-				}, monitorConfig)
-			} else {
-				s.log.Info("update complete")
-
-				localHub.CaptureCheckIn(&sentry.CheckIn{
-					ID:          *eventID,
-					MonitorSlug: monitorSlug,
-					Status:      sentry.CheckInStatusOK,
-				}, monitorConfig)
-			}
-			nextRun = nextRun.AddDate(0, 0, 1)
-			importer.MarkIndexingFinished(nextRun)
-		}
-	})
-}
-
-func (s *Server) Start(ctx context.Context, liveReload bool) errors.E {
+func (s *Server) Start(liveReload bool) errors.E {
 	var err errors.E
 	s.sv, err = server.New(s.cfg, s.readIndex, s.log.Named("server"), liveReload)
 	if err != nil {
 		return errors.Wrap(err, "error setting up server")
 	}
 
-	s.wg = &sync.WaitGroup{}
-	go s.startUpdateTimer(ctx, s.cfg, sentry.CurrentHub().Clone())
-
-	s.wg.Add(1)
 	err = s.sv.Start()
 	if err != nil {
-		s.wg.Done()
-
 		return errors.Wrap(err, "error starting server")
 	}
 
@@ -151,6 +56,5 @@ func (s *Server) Start(ctx context.Context, liveReload bool) errors.E {
 
 func (s *Server) Stop() {
 	<-s.sv.Stop()
-	defer s.wg.Done()
 	s.sentryHub.Flush(2 * time.Second)
 }