about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--internal/importer/main.go18
-rw-r--r--internal/index/indexer.go35
-rw-r--r--internal/index/search.go19
-rw-r--r--searchix.go43
4 files changed, 111 insertions, 4 deletions
diff --git a/internal/importer/main.go b/internal/importer/main.go
index 7776482..747d813 100644
--- a/internal/importer/main.go
+++ b/internal/importer/main.go
@@ -9,12 +9,18 @@ import (
 	"searchix/internal/config"
 	"searchix/internal/fetcher"
 	"searchix/internal/index"
+	"slices"
 	"strings"
 
 	"github.com/pkg/errors"
 )
 
-func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error {
+func Start(
+	cfg *config.Config,
+	indexer *index.WriteIndex,
+	forceUpdate bool,
+	onlyUpdateSources *[]string,
+) error {
 	if len(cfg.Importer.Sources) == 0 {
 		slog.Info("No sources enabled")
 
@@ -24,7 +30,15 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error {
 	ctx, cancel := context.WithTimeout(context.Background(), cfg.Importer.Timeout.Duration)
 	defer cancel()
 
+	forceUpdate = forceUpdate || (onlyUpdateSources != nil && len(*onlyUpdateSources) > 0)
+
 	for name, source := range cfg.Importer.Sources {
+		if len(*onlyUpdateSources) > 0 {
+			if !slices.Contains(*onlyUpdateSources, name) {
+				continue
+			}
+		}
+
 		logger := slog.With("name", name, "fetcher", source.Fetcher.String())
 		logger.Debug("starting fetcher")
 
@@ -54,7 +68,7 @@ func Start(cfg *config.Config, indexer *index.WriteIndex, replace bool) error {
 		}
 		logger.Info("importer fetch succeeded", "updated", updated)
 
-		if updated || replace {
+		if updated || forceUpdate {
 			err = setRepoRevision(files.Revision, source)
 			if err != nil {
 				logger.Warn("could not set source repo revision", "error", err)
diff --git a/internal/index/indexer.go b/internal/index/indexer.go
index a661b61..4a6a9d8 100644
--- a/internal/index/indexer.go
+++ b/internal/index/indexer.go
@@ -7,6 +7,7 @@ import (
 	"io/fs"
 	"log"
 	"log/slog"
+	"math"
 	"os"
 	"path"
 	"searchix/internal/file"
@@ -326,3 +327,37 @@ func (i *WriteIndex) Close() error {
 
 	return nil
 }
+
+func (i *WriteIndex) DeleteBySource(source string) error {
+	query := bleve.NewTermQuery(source)
+	search := bleve.NewSearchRequest(query)
+	search.Size = math.MaxInt
+	search.Fields = []string{"_id"}
+
+	results, err := i.index.Search(search)
+	if err != nil {
+		return errors.WithMessagef(err, "failed to query documents of retired index %s", source)
+	}
+
+	batch := i.index.NewBatch()
+	var k uint
+	for _, hit := range results.Hits {
+		batch.Delete(hit.ID)
+		if k++; k%batchSize == 0 {
+			err := i.Flush(batch)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	err = i.Flush(batch)
+	if err != nil {
+		return err
+	}
+
+	if uint64(search.Size) < results.Total {
+		return i.DeleteBySource(source) // unlikely :^)
+	}
+
+	return nil
+}
diff --git a/internal/index/search.go b/internal/index/search.go
index 5c18edb..0b20063 100644
--- a/internal/index/search.go
+++ b/internal/index/search.go
@@ -29,6 +29,25 @@ type ReadIndex struct {
 	meta  *Meta
 }
 
+func (index *ReadIndex) GetEnabledSources() ([]string, error) {
+	facet := bleve.NewFacetRequest("Source", 100)
+	query := bleve.NewMatchAllQuery()
+	search := bleve.NewSearchRequest(query)
+	search.AddFacet("Source", facet)
+
+	results, err := index.index.Search(search)
+	if err != nil {
+		return nil, errors.WithMessage(err, "could not get list of enabled sources from index")
+	}
+
+	enabledSources := make([]string, results.Facets["Source"].Terms.Len())
+	for i, term := range results.Facets["Source"].Terms.Terms() {
+		enabledSources[i] = term.Term
+	}
+
+	return enabledSources, nil
+}
+
 func (index *ReadIndex) GetSource(ctx context.Context, name string) (*bleve.SearchResult, error) {
 	query := bleve.NewTermQuery(name)
 	query.SetField("Source")
diff --git a/searchix.go b/searchix.go
index a369f73..6a984ee 100644
--- a/searchix.go
+++ b/searchix.go
@@ -7,6 +7,7 @@ import (
 	"log/slog"
 	"os"
 	"os/signal"
+	"slices"
 	"sync"
 	"time"
 
@@ -76,6 +77,14 @@ func main() {
 		slog.Warn("could not initialise sentry", "error", err)
 	}
 
+	var i uint
+	cfgEnabledSources := make([]string, len(cfg.Importer.Sources))
+	for key := range cfg.Importer.Sources {
+		cfgEnabledSources[i] = key
+		i++
+	}
+	slices.Sort(cfgEnabledSources)
+
 	read, write, exists, err := index.OpenOrCreate(cfg.DataPath, *replace)
 	if err != nil {
 		log.Fatalf("Failed to open or create index: %v", err)
@@ -83,13 +92,43 @@ func main() {
 
 	if !exists || *replace {
 		slog.Info("Index doesn't exist. Starting build job...")
-		err = importer.Start(cfg, write, *replace)
+		err = importer.Start(cfg, write, *replace, nil)
 		if err != nil {
 			log.Fatalf("Failed to build index: %v", err)
 		}
 		if *replace {
 			return
 		}
+	} else {
+		indexedSources, err := read.GetEnabledSources()
+		if err != nil {
+			log.Fatalln("failed to get enabled sources from index")
+		}
+		slices.Sort(indexedSources)
+		if !slices.Equal(cfgEnabledSources, indexedSources) {
+			newSources := slices.DeleteFunc(slices.Clone(cfgEnabledSources), func(s string) bool {
+				return slices.Contains(indexedSources, s)
+			})
+			retiredSources := slices.DeleteFunc(slices.Clone(indexedSources), func(s string) bool {
+				return slices.Contains(cfgEnabledSources, s)
+			})
+			if len(newSources) > 0 {
+				slog.Info("adding new sources", "sources", newSources)
+				err := importer.Start(cfg, write, false, &newSources)
+				if err != nil {
+					log.Fatalf("failed to update index with new sources: %v", err)
+				}
+			}
+			if len(retiredSources) > 0 {
+				slog.Info("removing retired sources", "sources", retiredSources)
+				for _, s := range retiredSources {
+					err := write.DeleteBySource(s)
+					if err != nil {
+						log.Fatalf("failed to remove retired source %s from index: %v", s, err)
+					}
+				}
+			}
+		}
 	}
 
 	c := make(chan os.Signal, 2)
@@ -132,7 +171,7 @@ func main() {
 					Status:      sentry.CheckInStatusInProgress,
 				}, monitorConfig)
 
-				err = importer.Start(cfg, write, false)
+				err = importer.Start(cfg, write, false, nil)
 				wg.Done()
 				if err != nil {
 					slog.Warn("error updating index", "error", err)