Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add ES Auto Index Cleaner #6425

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions pkg/es/client/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"strconv"
"strings"
"time"

"github.com/jaegertracing/jaeger/pkg/es/config"
"go.uber.org/zap"
)

// Index represents ES index.
Expand Down Expand Up @@ -41,6 +44,32 @@
MasterTimeoutSeconds int
}

// Create the indices only client using config.Configuration
func CreateIndicesClient(c *config.Configuration, logger *zap.Logger) (*IndicesClient, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow why we need this function. The pattern in es.Factory is to use config.NewClient, which takes configuration.

if len(c.Servers) < 1 {
return nil, errors.New("no servers specified")
}

Check warning on line 51 in pkg/es/client/index_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/client/index_client.go#L48-L51

Added lines #L48 - L51 were not covered by tests

httpClient := &http.Client{
Timeout: c.QueryTimeout,
}

transport, err := config.GetHTTPRoundTripper(c, logger)
if err != nil {
return nil, err
}
httpClient.Transport = transport

return &IndicesClient{
Client: Client{
Client: httpClient,
Endpoint: c.Servers[0],
BasicAuth: BasicAuth(c.Authentication.BasicAuthentication.Username, c.Authentication.BasicAuthentication.Password),
},
MasterTimeoutSeconds: int(c.QueryTimeout) / int(time.Second),
}, nil

Check warning on line 70 in pkg/es/client/index_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/client/index_client.go#L53-L70

Added lines #L53 - L70 were not covered by tests
}

// GetJaegerIndices queries all Jaeger indices including the archive and rollover.
// Jaeger daily indices are:
// - jaeger-span-2019-01-01
Expand Down
8 changes: 8 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ type Configuration struct {
Tags TagsAsFields `mapstructure:"tags_as_fields"`
// Enabled, if set to true, enables the namespace for storage pointed to by this configuration.
Enabled bool `mapstructure:"-"`

// IndexCleaner cleans indices older than MaxSpanAge
IndexCleaner IndexCleaner `mapstructure:"index_cleaner"`
}

type IndexCleaner struct {
// Enabled, if set to true, enables the cleaner feature
Enabled bool `mapstructure:"enabled"`
}

// TagsAsFields holds configuration for tag schema.
Expand Down
68 changes: 68 additions & 0 deletions plugin/storage/es/cleaner/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package cleaner

import (
"time"

"github.com/jaegertracing/jaeger/pkg/es/client"
"github.com/jaegertracing/jaeger/pkg/es/filter"
"go.uber.org/zap"
)

type IndexCleaner struct {
// client is the client used for interacting with es
client *client.IndicesClient
logger *zap.Logger
// indexPrefix is the user-defined index prefix, required for searching indices
indexPrefix string
// timePeriod is the interval at which cleaner would run
// also, cleaning is done for index with creationTime < now - timePeriod
timePeriod time.Duration
ticker *time.Ticker
}

func NewIndexCleaner(client *client.IndicesClient, logger *zap.Logger, indexPrefix string, timePeriod time.Duration) *IndexCleaner {
return &IndexCleaner{
client: client,
logger: logger,
indexPrefix: indexPrefix,
timePeriod: timePeriod,
ticker: time.NewTicker(timePeriod),
}

Check warning on line 30 in plugin/storage/es/cleaner/cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/cleaner/cleaner.go#L23-L30

Added lines #L23 - L30 were not covered by tests
}

func (i *IndexCleaner) Start() {
go func() {
for _ = range i.ticker.C {
err := i.Clean()
if err != nil {
i.logger.Error("Index cleaning failed", zap.Error(err))
}

Check warning on line 39 in plugin/storage/es/cleaner/cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/cleaner/cleaner.go#L33-L39

Added lines #L33 - L39 were not covered by tests
}
}()
}

func (i *IndexCleaner) Clean() error {
indices, err := i.client.GetJaegerIndices(i.indexPrefix)
if err != nil {
return err
}

Check warning on line 48 in plugin/storage/es/cleaner/cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/cleaner/cleaner.go#L44-L48

Added lines #L44 - L48 were not covered by tests

year, month, day := time.Now().UTC().Date()
tomorrowMidnight := time.Date(year, month, day, 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
deleteIndicesBefore := tomorrowMidnight.Add(-1 * i.timePeriod)
i.logger.Info("Indices before this date will be deleted", zap.String("date", deleteIndicesBefore.Format(time.RFC3339)))

indices = filter.ByDate(indices, deleteIndicesBefore)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add check to not delete indices with write alias.

if len(indices) == 0 {
i.logger.Info("No indices to delete")
return nil
}

Check warning on line 60 in plugin/storage/es/cleaner/cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/cleaner/cleaner.go#L50-L60

Added lines #L50 - L60 were not covered by tests

i.logger.Info("Deleting indices", zap.Any("indices", indices))
return i.client.DeleteIndices(indices)

Check warning on line 63 in plugin/storage/es/cleaner/cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/cleaner/cleaner.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}

func (i *IndexCleaner) Stop() {
i.ticker.Stop()

Check warning on line 67 in plugin/storage/es/cleaner/cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/cleaner/cleaner.go#L66-L67

Added lines #L66 - L67 were not covered by tests
}
1 change: 1 addition & 0 deletions plugin/storage/es/cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cleaner
15 changes: 15 additions & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/client"
"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/fswatcher"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/storage/es/cleaner"
esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore"
"github.com/jaegertracing/jaeger/plugin/storage/es/mappings"
esSampleStore "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore"
Expand Down Expand Up @@ -67,6 +69,8 @@
archiveClient atomic.Pointer[es.Client]

watchers []*fswatcher.FSWatcher

primaryIndexCleaner *cleaner.IndexCleaner
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -161,6 +165,15 @@
f.watchers = append(f.watchers, primaryWatcher)
}

if f.primaryConfig.IndexCleaner.Enabled {
indexCleanerClient, err := client.CreateIndicesClient(f.primaryConfig, logger)
if err != nil {
return fmt.Errorf("failed to create index cleaner client: %w", err)
}
primaryIndexCleaner := cleaner.NewIndexCleaner(indexCleanerClient, logger, string(f.primaryConfig.Indices.IndexPrefix), f.primaryConfig.MaxSpanAge)
primaryIndexCleaner.Start()

Check warning on line 174 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L168-L174

Added lines #L168 - L174 were not covered by tests
}

if f.archiveConfig.Enabled {
archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory)
if err != nil {
Expand Down Expand Up @@ -370,6 +383,8 @@
errs = append(errs, client.Close())
}

f.primaryIndexCleaner.Stop()

Check warning on line 387 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L386-L387

Added lines #L386 - L387 were not covered by tests
return errors.Join(errs...)
}

Expand Down
Loading