From 9f241bff227608dd53a250d012116077dce6dab6 Mon Sep 17 00:00:00 2001 From: Alan Pearce Date: Fri, 17 May 2024 15:51:27 +0200 Subject: feat: automatically add/remove sources to/from index --- internal/importer/main.go | 18 ++++++++++++++++-- internal/index/indexer.go | 35 +++++++++++++++++++++++++++++++++++ internal/index/search.go | 19 +++++++++++++++++++ searchix.go | 43 +++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 111 insertions(+), 4 deletions(-) diff --git a/internal/importer/main.go b/internal/importer/main.go index 7776482..747d813 100644 --- a/internal/importer/main.go +++ b/internal/importer/main.go @@ -9,12 +9,18 @@ import ( "searchix/internal/config" "searchix/internal/fetcher" "searchix/internal/index" + "slices" "strings" "github.com/pkg/errors" ) -func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { +func Start( + cfg *config.Config, + indexer *index.WriteIndex, + forceUpdate bool, + onlyUpdateSources *[]string, +) error { if len(cfg.Importer.Sources) == 0 { slog.Info("No sources enabled") @@ -24,7 +30,15 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration) defer cancel() + forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0) + for name, source := range cfg.Importer.Sources { + if len(*onlyUpdateSources) > 0 { + if !slices.Contains(*onlyUpdateSources, name) { + continue + } + } + logger := slog.With("name", name, "fetcher", source.Fetcher.String()) logger.Debug("starting fetcher") @@ -54,7 +68,7 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error { } logger.Info("importer fetch succeeded", "updated", updated) - if updated || replace { + if updated || forceUpdate { err = setRepoRevision(files.Revision, source) if err != nil { logger.Warn("could not set source repo revision", "error", err) diff --git a/internal/index/indexer.go b/internal/index/indexer.go index a661b61..4a6a9d8 100644 --- a/internal/index/indexer.go +++ b/internal/index/indexer.go @@ -7,6 +7,7 @@ import ( "io/fs" "log" "log/slog" + "math" "os" "path" "searchix/internal/file" @@ -326,3 +327,37 @@ func (i *WriteIndex) Close() error { return nil } + +func (i *WriteIndex) DeleteBySource(source string) error { + query := bleve.NewTermQuery(source) + search := bleve.NewSearchRequest(query) + search.Size = math.MaxInt + search.Fields = []string{"_id"} + + results, err := i.index.Search(search) + if err != nil { + return errors.WithMessagef(err, "failed to query documents of retired index %s", source) + } + + batch := i.index.NewBatch() + var k uint + for _, hit := range results.Hits { + batch.Delete(hit.ID) + if k++; k%batchSize == 0 { + err := i.Flush(batch) + if err != nil { + return err + } + } + } + err = i.Flush(batch) + if err != nil { + return err + } + + if uint64(search.Size) < results.Total { + return i.DeleteBySource(source) // unlikely :^) + } + + return nil +} diff --git a/internal/index/search.go b/internal/index/search.go index 5c18edb..0b20063 100644 --- a/internal/index/search.go +++ b/internal/index/search.go @@ -29,6 +29,25 @@ type ReadIndex struct { meta *Meta } +func (index *ReadIndex) GetEnabledSources() ([]string, error) { + facet := bleve.NewFacetRequest("Source", 100) + query := bleve.NewMatchAllQuery() + search := bleve.NewSearchRequest(query) + search.AddFacet("Source", facet) + + results, err := index.index.Search(search) + if err != nil { + return nil, errors.WithMessage(err, "could not get list of enabled sources from index") + } + + enabledSources := make([]string, results.Facets["Source"].Terms.Len()) + for i, term := range results.Facets["Source"].Terms.Terms() { + enabledSources[i] = term.Term + } + + return enabledSources, nil +} + func (index *ReadIndex) GetSource(ctx context.Context, name string) (*bleve.SearchResult, error) { query := bleve.NewTermQuery(name) query.SetField("Source") diff --git a/searchix.go b/searchix.go index a369f73..6a984ee 100644 --- a/searchix.go +++ b/searchix.go @@ -7,6 +7,7 @@ import ( "log/slog" "os" "os/signal" + "slices" "sync" "time" @@ -76,6 +77,14 @@ func main() { slog.Warn("could not initialise sentry", "error", err) } + var i uint + cfgEnabledSources := make([]string, len(cfg.Importer.Sources)) + for key := range cfg.Importer.Sources { + cfgEnabledSources[i] = key + i++ + } + slices.Sort(cfgEnabledSources) + read, write, exists, err := index.OpenOrCreate(cfg.DataPath, *replace) if err != nil { log.Fatalf("Failed to open or create index: %v", err) @@ -83,13 +92,43 @@ func main() { if !exists || *replace { slog.Info("Index doesn't exist. Starting build job...") - err = importer.Start(cfg, write, *replace) + err = importer.Start(cfg, write, *replace, nil) if err != nil { log.Fatalf("Failed to build index: %v", err) } if *replace { return } + } else { + indexedSources, err := read.GetEnabledSources() + if err != nil { + log.Fatalln("failed to get enabled sources from index") + } + slices.Sort(indexedSources) + if !slices.Equal(cfgEnabledSources, indexedSources) { + newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool { + return slices.Contains(indexedSources, s) + }) + retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool { + return slices.Contains(cfgEnabledSources, s) + }) + if len(newSources) > 0 { + slog.Info("adding new sources", "sources", newSources) + err := importer.Start(cfg, write, false, &newSources) + if err != nil { + log.Fatalf("failed to update index with new sources: %v", err) + } + } + if len(retiredSources) > 0 { + slog.Info("removing retired sources", "sources", retiredSources) + for _, s := range retiredSources { + err := write.DeleteBySource(s) + if err != nil { + log.Fatalf("failed to remove retired source %s from index: %v", s, err) + } + } + } + } } c := make(chan os.Signal, 2) @@ -132,7 +171,7 @@ func main() { Status: sentry.CheckInStatusInProgress, }, monitorConfig) - err = importer.Start(cfg, write, false) + err = importer.Start(cfg, write, false, nil) wg.Done() if err != nil { slog.Warn("error updating index", "error", err) -- cgit 1.4.1