diff --git a/cmd/sippy-daemon/main.go b/cmd/sippy-daemon/main.go index 72c03c5bd1..6b11c665f1 100644 --- a/cmd/sippy-daemon/main.go +++ b/cmd/sippy-daemon/main.go @@ -119,9 +119,7 @@ func NewSippyDaemonCommand() *cobra.Command { // 4 potential GitHub calls per comment gives us a safe buffer // get comment data, get existing comments, possible delete existing, and adding the comment // could lower to 3 seconds if we need, most writes likely won't have to delete - processes = append(processes, sippyserver.NewWorkProcessor(dbc, - gcsClient.Bucket(f.GoogleCloudFlags.StorageBucket), - 10, bigQueryClient, 5*time.Minute, 5*time.Second, ghCommenter, f.GithubCommenterFlags.CommentProcessingDryRun)) + processes = append(processes, sippyserver.NewWorkProcessor(dbc, bigQueryClient, gcsClient.Bucket(f.GoogleCloudFlags.StorageBucket), cacheClient, ghCommenter, 10, 5*time.Minute, 5*time.Second, f.GithubCommenterFlags.CommentProcessingDryRun)) } daemonServer := sippyserver.NewDaemonServer(processes) diff --git a/cmd/sippy/load.go b/cmd/sippy/load.go index 0cc759ec14..9f9d7786a7 100644 --- a/cmd/sippy/load.go +++ b/cmd/sippy/load.go @@ -145,9 +145,13 @@ func NewLoadCommand() *cobra.Command { } } + // likewise get a cache client if possible, though some things operate without it. cacheClient, cacheErr := f.CacheFlags.GetCacheClient() - releaseConfigs := []sippyv1.Release{} + if cacheErr != nil { + cacheClient = nil // error hygiene, since we pass this down to quite a few functions + } + releaseConfigs := []sippyv1.Release{} // initializing a bigquery client different from the normal one opCtx, ctx := bqcachedclient.OpCtxForCronEnv(ctx, "load") bqc, bigqueryErr := bqcachedclient.New( @@ -337,9 +341,8 @@ func NewLoadCommand() *cobra.Command { elapsed := time.Since(start) log.WithField("elapsed", elapsed).Info("database load complete") - pinnedTime := f.DBFlags.GetPinnedTime() if refreshMatviews && !f.SkipMatviewRefresh { - sippyserver.RefreshData(dbc, pinnedTime, false) + sippyserver.RefreshData(dbc, cacheClient, false) } elapsed = time.Since(start) diff --git a/cmd/sippy/refresh.go b/cmd/sippy/refresh.go index 46e7f623b2..c0e22d7cb9 100644 --- a/cmd/sippy/refresh.go +++ b/cmd/sippy/refresh.go @@ -1,6 +1,9 @@ package main import ( + "fmt" + + "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -10,17 +13,20 @@ import ( type RefreshFlags struct { DBFlags *flags.PostgresFlags + CacheFlags *flags.CacheFlags RefreshOnlyIfEmpty bool } func NewRefreshFlags() *RefreshFlags { return &RefreshFlags{ - DBFlags: flags.NewPostgresDatabaseFlags(), + DBFlags: flags.NewPostgresDatabaseFlags(), + CacheFlags: flags.NewCacheFlags(), } } func (f *RefreshFlags) BindFlags(fs *pflag.FlagSet) { f.DBFlags.BindFlags(fs) + f.CacheFlags.BindFlags(fs) fs.BoolVar(&f.RefreshOnlyIfEmpty, "refresh-only-if-empty", f.RefreshOnlyIfEmpty, "only refresh matviews if they're empty") } @@ -35,8 +41,13 @@ func NewRefreshCommand() *cobra.Command { if err != nil { return err } - pinnedDateTime := f.DBFlags.GetPinnedTime() - sippyserver.RefreshData(dbc, pinnedDateTime, f.RefreshOnlyIfEmpty) + cacheClient, cacheErr := f.CacheFlags.GetCacheClient() + if cacheErr != nil { + return fmt.Errorf("failed to get cache client: %v", cacheErr) + } else if cacheClient == nil { + logrus.Warn("no cache provided; refresh will not update cached timestamps, so cached data may not be properly invalidated") + } + sippyserver.RefreshData(dbc, cacheClient, f.RefreshOnlyIfEmpty) return nil }, } diff --git a/cmd/sippy/seed_data.go b/cmd/sippy/seed_data.go index 6b5e3a6329..e6829df337 100644 --- a/cmd/sippy/seed_data.go +++ b/cmd/sippy/seed_data.go @@ -20,6 +20,7 @@ import ( type SeedDataFlags struct { DBFlags *flags.PostgresFlags + CacheFlags *flags.CacheFlags InitDatabase bool Releases []string JobsPerRelease int @@ -30,6 +31,7 @@ type SeedDataFlags struct { func NewSeedDataFlags() *SeedDataFlags { return &SeedDataFlags{ DBFlags: flags.NewPostgresDatabaseFlags(), + CacheFlags: flags.NewCacheFlags(), Releases: []string{"5.0", "4.22", "4.21"}, // Default releases JobsPerRelease: 3, // Default jobs per release TestNames: []string{ @@ -48,6 +50,7 @@ func NewSeedDataFlags() *SeedDataFlags { func (f *SeedDataFlags) BindFlags(fs *pflag.FlagSet) { f.DBFlags.BindFlags(fs) + f.CacheFlags.BindFlags(fs) fs.BoolVar(&f.InitDatabase, "init-database", false, "Initialize the DB schema before seeding data") fs.StringSliceVar(&f.Releases, "release", f.Releases, "Releases to create ProwJobs for (can be specified multiple times)") fs.IntVar(&f.JobsPerRelease, "jobs", f.JobsPerRelease, "Number of ProwJobs to create for each release") @@ -86,6 +89,13 @@ rolled off the 1 week window. log.Info("Database schema initialized successfully") } + cacheClient, cacheErr := f.CacheFlags.GetCacheClient() + if cacheErr != nil { + return fmt.Errorf("failed to get cache client: %v", cacheErr) + } else if cacheClient == nil { + log.Warn("no cache provided; refresh timestamps will not be cached") + } + log.Info("Starting to seed test data...") // Create the test suite @@ -131,7 +141,7 @@ rolled off the 1 week window. totalTestResults := totalRuns * len(f.TestNames) log.Info("Refreshing materialized views...") - sippyserver.RefreshData(dbc, nil, false) + sippyserver.RefreshData(dbc, cacheClient, false) log.Infof("Successfully seeded test data! Created %d ProwJobs, %d Tests, %d ProwJobRuns, and %d test results", totalProwJobs, len(f.TestNames), totalRuns, totalTestResults) diff --git a/pkg/api/cache.go b/pkg/api/cache.go index 6dce3502b7..d15ddea69f 100644 --- a/pkg/api/cache.go +++ b/pkg/api/cache.go @@ -158,3 +158,79 @@ func isStructWithNoPublicFields(v interface{}) bool { } return true } + +// GetDataFromCacheOrMatview caches data that is based on a matview and invalidates it when the matview is refreshed +func GetDataFromCacheOrMatview[T any](ctx context.Context, + cacheClient cache.Cache, cacheSpec CacheSpec, + matview string, + cacheDuration time.Duration, + generateFn func(context.Context) (T, []error), + defaultVal T, +) (T, []error) { + if cacheClient == nil { + return generateFn(ctx) + } + + cacheKey, err := cacheSpec.GetCacheKey() + if err != nil { + return defaultVal, []error{err} + } + + // If someone gives us an uncacheable cacheKey, panic so it gets detected in testing + if len(cacheKey) == 0 { + panic(fmt.Sprintf("cache key is empty for %s", reflect.TypeOf(defaultVal))) + } + // If someone gives us an uncacheable value, panic so it gets detected in testing + if isStructWithNoPublicFields(defaultVal) { + panic(fmt.Sprintf("cannot cache type %s that exports no fields", reflect.TypeOf(defaultVal))) + } + + var cacheVal struct { + Val T // the actual value we want to cache + Timestamp time.Time // the time when it was cached (for comparison to matview refresh time) + } + if cached, err := cacheClient.Get(ctx, string(cacheKey), 0); err == nil { + logrus.WithFields(logrus.Fields{ + "key": string(cacheKey), + "type": reflect.TypeOf(defaultVal).String(), + }).Debugf("cache hit") + + if err := json.Unmarshal(cached, &cacheVal); err != nil { + return defaultVal, []error{errors.WithMessagef(err, "failed to unmarshal cached item. cacheKey=%+v", cacheKey)} + } + + // look up when the matview was refreshed to see if the cached value is stale + var lastRefresh time.Time + if lastRefreshBytes, err := cacheClient.Get(ctx, RefreshMatviewKey(matview), 0); err == nil { + if parsed, err := time.Parse(time.RFC3339, string(lastRefreshBytes)); err != nil { + logrus.WithError(err).Warnf("failed to parse matview refresh timestamp %q for %q; cache will not be invalidated", lastRefreshBytes, matview) + } else { + lastRefresh = parsed + } + } + + if lastRefresh.Before(cacheVal.Timestamp) { + // not invalidated by a newer refresh, so use it (if we don't know the last refresh, still use it) + return cacheVal.Val, nil + } + logrus.Debugf("matview %q refreshed at %v, will not use earlier cache entry from %v", matview, lastRefresh, cacheVal.Timestamp) + } else if strings.Contains(err.Error(), "connection refused") { + logrus.WithError(err).Fatalf("redis URL specified but got connection refused; exiting due to cost issues in this configuration") + } else { + logrus.WithFields(logrus.Fields{"key": string(cacheKey)}).Debugf("cache miss") + } + + // Cache missed or refresh invalidated the data, so generate it. + logrus.Debugf("cache duration set to %s or approx %s for key %s", cacheDuration, time.Now().Add(cacheDuration).Format(time.RFC3339), cacheKey) + result, errs := generateFn(ctx) + if len(errs) == 0 { + cacheVal.Val = result + cacheVal.Timestamp = time.Now().UTC() + CacheSet(ctx, cacheClient, cacheVal, cacheKey, cacheDuration) + } + return result, errs +} + +func RefreshMatviewKey(matview string) string { + return "matview_refreshed:" + matview +} diff --git a/pkg/api/cache_test.go b/pkg/api/cache_test.go index f37acbc45e..724b805623 100644 --- a/pkg/api/cache_test.go +++ b/pkg/api/cache_test.go @@ -4,14 +4,22 @@ import ( "context" "encoding/json" "fmt" + "os" "testing" "time" "github.com/openshift/sippy/pkg/apis/cache" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func init() { + if os.Getenv("DEBUG_LOGGING") != "" { + logrus.SetLevel(logrus.DebugLevel) + } +} + // mockCache is a test cache that properly returns errors on miss, // records durations passed to Set, and can simulate errors. type mockCache struct { @@ -500,3 +508,240 @@ func TestNewCacheSpec_Prefix(t *testing.T) { func timePtr(t time.Time) *time.Time { return &t } + +const testMatview = "prow_test_report_7d_matview" + +// helper to pre-populate the cache with a matview-style cached value (val + timestamp) +func seedMatviewCache(t *testing.T, mc *mockCache, spec CacheSpec, val testResult, cachedAt time.Time) { + t.Helper() + cacheKey, err := spec.GetCacheKey() + require.NoError(t, err) + entry := struct { + Val testResult + Timestamp time.Time + }{Val: val, Timestamp: cachedAt} + data, err := json.Marshal(entry) + require.NoError(t, err) + mc.store[string(cacheKey)] = data +} + +func seedRefreshTimestamp(mc *mockCache, matview string, refreshedAt time.Time) { + mc.store[RefreshMatviewKey(matview)] = []byte(refreshedAt.UTC().Format(time.RFC3339)) +} + +// TestGetDataFromCacheOrMatview_NilCache verifies that with no cache, generateFn is always called. +func TestGetDataFromCacheOrMatview_NilCache(t *testing.T) { + var generateCalls int + expected := testResult{Value: "generated"} + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + + result, errs := GetDataFromCacheOrMatview( + context.Background(), nil, spec, testMatview, time.Hour, + makeGenerateFn(expected, &generateCalls), testResult{}, + ) + + assert.Empty(t, errs) + assert.Equal(t, expected, result) + assert.Equal(t, 1, generateCalls) +} + +// TestGetDataFromCacheOrMatview_CacheMiss verifies that on a miss, generateFn is called and the result is stored. +func TestGetDataFromCacheOrMatview_CacheMiss(t *testing.T) { + mc := newMockCache() + var generateCalls int + expected := testResult{Value: "generated"} + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + + result, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec, testMatview, time.Hour, + makeGenerateFn(expected, &generateCalls), testResult{}, + ) + + assert.Empty(t, errs) + assert.Equal(t, expected, result) + assert.Equal(t, 1, generateCalls) + assert.Equal(t, 1, mc.setCalls, "should store result in cache") +} + +// TestGetDataFromCacheOrMatview_CacheHit_NoRefresh verifies that a cached value is returned +// when no matview refresh timestamp exists in the cache. +func TestGetDataFromCacheOrMatview_CacheHit_NoRefresh(t *testing.T) { + mc := newMockCache() + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + cached := testResult{Value: "cached"} + seedMatviewCache(t, mc, spec, cached, time.Now().UTC()) + + var generateCalls int + result, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec, testMatview, time.Hour, + makeGenerateFn(testResult{Value: "fresh"}, &generateCalls), testResult{}, + ) + + assert.Empty(t, errs) + assert.Equal(t, cached, result, "should return cached value when no refresh timestamp exists") + assert.Equal(t, 0, generateCalls, "should not call generateFn") +} + +// TestGetDataFromCacheOrMatview_CacheHit_RefreshBeforeCacheTime verifies that a cached value +// is returned when the matview was refreshed before the data was cached. +func TestGetDataFromCacheOrMatview_CacheHit_RefreshBeforeCacheTime(t *testing.T) { + mc := newMockCache() + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + cachedAt := time.Now().UTC() + cached := testResult{Value: "cached"} + seedMatviewCache(t, mc, spec, cached, cachedAt) + // Matview was refreshed 10 minutes before the data was cached + seedRefreshTimestamp(mc, testMatview, cachedAt.Add(-10*time.Minute)) + + var generateCalls int + result, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec, testMatview, time.Hour, + makeGenerateFn(testResult{Value: "fresh"}, &generateCalls), testResult{}, + ) + + assert.Empty(t, errs) + assert.Equal(t, cached, result, "should return cached value when refresh predates cache entry") + assert.Equal(t, 0, generateCalls, "should not call generateFn") +} + +// TestGetDataFromCacheOrMatview_CacheInvalidated_RefreshAfterCacheTime verifies that when the +// matview was refreshed after the data was cached, the cached value is invalidated and +// generateFn is called to produce fresh data. +func TestGetDataFromCacheOrMatview_CacheInvalidated_RefreshAfterCacheTime(t *testing.T) { + mc := newMockCache() + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + cachedAt := time.Now().UTC().Add(-5 * time.Minute) + seedMatviewCache(t, mc, spec, testResult{Value: "stale"}, cachedAt) + // Matview was refreshed 1 minute ago, after the data was cached + seedRefreshTimestamp(mc, testMatview, time.Now().UTC().Add(-1*time.Minute)) + + var generateCalls int + expected := testResult{Value: "fresh"} + result, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec, testMatview, time.Hour, + makeGenerateFn(expected, &generateCalls), testResult{}, + ) + + assert.Empty(t, errs) + assert.Equal(t, expected, result, "should regenerate data after matview refresh") + assert.Equal(t, 1, generateCalls, "should call generateFn when cache is invalidated") + assert.GreaterOrEqual(t, mc.setCalls, 1, "should store the fresh result") +} + +// TestGetDataFromCacheOrMatview_CacheValid_RefreshAtExactCacheTime verifies that when +// the refresh timestamp equals the cache timestamp, the cache is still valid (data was +// generated at refresh time so it reflects the refreshed matview). +func TestGetDataFromCacheOrMatview_CacheValid_RefreshAtExactCacheTime(t *testing.T) { + mc := newMockCache() + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + cachedAt := time.Now().UTC().Add(-5 * time.Minute) + seedMatviewCache(t, mc, spec, testResult{Value: "cached"}, cachedAt) + // Refresh at the exact same time as the cache entry + seedRefreshTimestamp(mc, testMatview, cachedAt) + + var generateCalls int + result, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec, testMatview, time.Hour, + makeGenerateFn(testResult{Value: "fresh"}, &generateCalls), testResult{}, + ) + + assert.Empty(t, errs) + assert.Equal(t, "cached", result.Value, "should return cached value when refresh equals cache time") + assert.Equal(t, 0, generateCalls, "should not regenerate") +} + +// TestGetDataFromCacheOrMatview_GenerateErrorSkipsCacheWrite verifies that errors from +// generateFn are not cached. +func TestGetDataFromCacheOrMatview_GenerateErrorSkipsCacheWrite(t *testing.T) { + mc := newMockCache() + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + + var generateCalls int + result, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec, testMatview, time.Hour, + makeFailingGenerateFn(&generateCalls), testResult{}, + ) + + assert.Len(t, errs, 1) + assert.Equal(t, testResult{}, result) + assert.Equal(t, 1, generateCalls) + assert.Equal(t, 0, mc.setCalls, "should not cache error results") +} + +// TestGetDataFromCacheOrMatview_InvalidRefreshTimestamp verifies that an unparseable refresh +// timestamp in the cache is treated like no refresh (cache is still used). +func TestGetDataFromCacheOrMatview_InvalidRefreshTimestamp(t *testing.T) { + mc := newMockCache() + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + cached := testResult{Value: "cached"} + seedMatviewCache(t, mc, spec, cached, time.Now().UTC()) + // Store garbage as the refresh timestamp + mc.store[RefreshMatviewKey(testMatview)] = []byte("not-a-timestamp") + + var generateCalls int + result, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec, testMatview, time.Hour, + makeGenerateFn(testResult{Value: "fresh"}, &generateCalls), testResult{}, + ) + + assert.Empty(t, errs) + assert.Equal(t, cached, result, "should return cached value when refresh timestamp is unparseable") + assert.Equal(t, 0, generateCalls) +} + +// TestGetDataFromCacheOrMatview_CacheDurationPassedToSet verifies that the specified +// cacheDuration is used when writing to the cache. +func TestGetDataFromCacheOrMatview_CacheDurationPassedToSet(t *testing.T) { + mc := newMockCache() + spec := NewCacheSpec(testCacheKey{Query: "q1"}, "mv~", nil) + expectedDuration := 2 * time.Hour + + var generateCalls int + _, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec, testMatview, expectedDuration, + makeGenerateFn(testResult{Value: "data"}, &generateCalls), testResult{}, + ) + + assert.Empty(t, errs) + cacheKey, _ := spec.GetCacheKey() + assert.Equal(t, expectedDuration, mc.setDurations[string(cacheKey)], "should use specified cache duration") +} + +// TestGetDataFromCacheOrMatview_DifferentMatviews verifies that cache entries for different +// matviews are invalidated independently. +func TestGetDataFromCacheOrMatview_DifferentMatviews(t *testing.T) { + mc := newMockCache() + matview7d := "prow_test_report_7d_matview" + matview2d := "prow_test_report_2d_matview" + spec7d := NewCacheSpec(testCacheKey{Query: "7d"}, "mv~", nil) + spec2d := NewCacheSpec(testCacheKey{Query: "2d"}, "mv~", nil) + + cachedAt := time.Now().UTC().Add(-5 * time.Minute) + seedMatviewCache(t, mc, spec7d, testResult{Value: "cached-7d"}, cachedAt) + seedMatviewCache(t, mc, spec2d, testResult{Value: "cached-2d"}, cachedAt) + + // Only refresh the 7d matview (after caching) + seedRefreshTimestamp(mc, matview7d, time.Now().UTC().Add(-1*time.Minute)) + // 2d matview was refreshed before caching + seedRefreshTimestamp(mc, matview2d, cachedAt.Add(-10*time.Minute)) + + var gen7dCalls, gen2dCalls int + + // 7d should be invalidated + result7d, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec7d, matview7d, time.Hour, + makeGenerateFn(testResult{Value: "fresh-7d"}, &gen7dCalls), testResult{}, + ) + assert.Empty(t, errs) + assert.Equal(t, "fresh-7d", result7d.Value, "7d cache should be invalidated") + assert.Equal(t, 1, gen7dCalls) + + // 2d should still be cached + result2d, errs := GetDataFromCacheOrMatview( + context.Background(), mc, spec2d, matview2d, time.Hour, + makeGenerateFn(testResult{Value: "fresh-2d"}, &gen2dCalls), testResult{}, + ) + assert.Empty(t, errs) + assert.Equal(t, "cached-2d", result2d.Value, "2d cache should not be invalidated") + assert.Equal(t, 0, gen2dCalls) +} diff --git a/pkg/api/job_runs.go b/pkg/api/job_runs.go index 93fb72928f..b00c446450 100644 --- a/pkg/api/job_runs.go +++ b/pkg/api/job_runs.go @@ -21,6 +21,7 @@ import ( log "github.com/sirupsen/logrus" apitype "github.com/openshift/sippy/pkg/apis/api" + "github.com/openshift/sippy/pkg/apis/cache" "github.com/openshift/sippy/pkg/apis/openshift" sippyprocessingv1 "github.com/openshift/sippy/pkg/apis/sippyprocessing/v1" "github.com/openshift/sippy/pkg/bigquery" @@ -310,7 +311,12 @@ func joinSegments(segments []string, start int, separator string) string { // JobRunRiskAnalysis checks the test failures and linked bugs for a job run, and reports back an estimated // risk level for each failed test, and the job run overall. -func JobRunRiskAnalysis(ctx context.Context, dbc *db.DB, bqc *bigquery.Client, jobRun *models.ProwJobRun, logger *log.Entry, compareOtherPRs bool) (apitype.ProwJobRunRiskAnalysis, error) { +func JobRunRiskAnalysis( + ctx context.Context, logger *log.Entry, + dbc *db.DB, bqc *bigquery.Client, cacheClient cache.Cache, + jobRun *models.ProwJobRun, + compareOtherPRs bool, +) (apitype.ProwJobRunRiskAnalysis, error) { logger = logger.WithField("func", "JobRunRiskAnalysis") // If this job is a Presubmit, compare to test results from master, not presubmits, which may perform // worse due to dev code that hasn't merged. We do not presently track presubmits on branches other than @@ -418,7 +424,7 @@ func JobRunRiskAnalysis(ctx context.Context, dbc *db.DB, bqc *bigquery.Client, j } } - return runJobRunAnalysis(ctx, bqc, jobRun, compareRelease, historicalCount, neverStableJob, jobNames, logger, jobNamesTestResultFunc(dbc), variantsTestResultFunc(dbc), compareOtherPRs) + return runJobRunAnalysis(ctx, bqc, jobRun, compareRelease, historicalCount, neverStableJob, jobNames, logger, jobNamesTestResultFunc(dbc), variantsTestResultFunc(dbc, cacheClient), compareOtherPRs) } // testResultsByJobNameFunc is used for injecting db responses in unit tests. @@ -448,7 +454,7 @@ func jobNamesTestResultFunc(dbc *db.DB) testResultsByJobNameFunc { } // variantsTestResultFunc looks to match job runs based on variant matches -func variantsTestResultFunc(dbc *db.DB) testResultsByVariantsFunc { +func variantsTestResultFunc(dbc *db.DB, cacheClient cache.Cache) testResultsByVariantsFunc { return func(testName, release, suite string, variants []string, jobNames []string) (*apitype.Test, error) { fil := &filter.Filter{ @@ -462,22 +468,28 @@ func variantsTestResultFunc(dbc *db.DB) testResultsByVariantsFunc { }, LinkOperator: "and", } - testResults, overallTest, err := BuildTestsResults(dbc, release, "default", false, true, - fil) + spec := TestResultsSpec{ + Release: release, + Period: "default", + Collapse: false, + IncludeOverall: true, + Filter: fil, + } + result, err := spec.buildTestsResultsFromPostgres(dbc, cacheClient) if err != nil { return nil, err } - if overallTest != nil { - overallTest.Variants = append(overallTest.Variants, "Overall") + if result.Test != nil { + result.Test.Variants = append(result.Test.Variants, "Overall") } gosort.Strings(variants) - for _, testResult := range testResults { + for _, testResult := range result.TestsAPIResult { // this is a weird way to get the variant we want, but it allows re-use // of the existing code. gosort.Strings(testResult.Variants) if stringSlicesEqual(variants, testResult.Variants) && testResult.SuiteName == suite { - if overallTest.CurrentPassPercentage < testResult.CurrentPassPercentage { - return overallTest, nil + if result.Test.CurrentPassPercentage < testResult.CurrentPassPercentage { + return result.Test, nil } return &testResult, nil } @@ -486,12 +498,12 @@ func variantsTestResultFunc(dbc *db.DB) testResultsByVariantsFunc { // otherwise, what is our best match... // do something more expensive and check to see // which testResult contains all the variants we have currently - for _, testResult := range testResults { + for _, testResult := range result.TestsAPIResult { // we didn't find an exact variant match // next best guess is the first variant list that contains all of our known variants if stringSubSlicesEqual(variants, testResult.Variants) && testResult.SuiteName == suite { - if overallTest.CurrentPassPercentage < testResult.CurrentPassPercentage { - return overallTest, nil + if result.Test.CurrentPassPercentage < testResult.CurrentPassPercentage { + return result.Test, nil } return &testResult, nil } diff --git a/pkg/api/tests.go b/pkg/api/tests.go index 4fa8ca56cb..b231c47da3 100644 --- a/pkg/api/tests.go +++ b/pkg/api/tests.go @@ -16,6 +16,7 @@ import ( "google.golang.org/api/iterator" apitype "github.com/openshift/sippy/pkg/apis/api" + "github.com/openshift/sippy/pkg/apis/cache" bq "github.com/openshift/sippy/pkg/bigquery" "github.com/openshift/sippy/pkg/bigquery/bqlabel" "github.com/openshift/sippy/pkg/db" @@ -223,9 +224,9 @@ func GetTestDurationsFromDB(dbc *db.DB, release, test string, filters *filter.Fi return query.TestDurations(dbc, release, test, includedVariants, excludedVariants) } -type testsAPIResult []apitype.Test +type TestsAPIResult []apitype.Test -func (tests testsAPIResult) sort(req *http.Request) testsAPIResult { +func (tests TestsAPIResult) sort(req *http.Request) TestsAPIResult { sortField := param.SafeRead(req, "sortField") sort := param.SafeRead(req, "sort") @@ -247,7 +248,7 @@ func (tests testsAPIResult) sort(req *http.Request) testsAPIResult { return tests } -func (tests testsAPIResult) limit(req *http.Request) testsAPIResult { +func (tests TestsAPIResult) limit(req *http.Request) TestsAPIResult { limit, _ := strconv.Atoi(req.URL.Query().Get("limit")) if limit == 0 || len(tests) < limit { return tests @@ -289,7 +290,7 @@ func (tests testsAPIResultBQ) limit(req *http.Request) testsAPIResultBQ { return tests[:limit] } -func PrintTestsJSONFromDB(release string, w http.ResponseWriter, req *http.Request, dbc *db.DB) { +func PrintTestsJSONFromDB(release string, w http.ResponseWriter, req *http.Request, dbc *db.DB, cacheClient cache.Cache) { var fil *filter.Filter // Collapse means to produce an aggregated test result of all variant (NURP+ - network, upgrade, release, platform) @@ -324,15 +325,22 @@ func PrintTestsJSONFromDB(release string, w http.ResponseWriter, req *http.Reque return } - testsResult, overall, err := BuildTestsResults(dbc, release, period, collapse, includeOverall, fil) + spec := TestResultsSpec{ + Release: release, + Period: period, + Collapse: collapse, + IncludeOverall: includeOverall, + Filter: fil, + } + result, err := spec.buildTestsResultsFromPostgres(dbc, cacheClient) if err != nil { RespondWithJSON(http.StatusInternalServerError, w, map[string]interface{}{"code": http.StatusInternalServerError, "message": "Error building job report:" + err.Error()}) return } - testsResult = testsResult.sort(req).limit(req) - if overall != nil { - testsResult = append([]apitype.Test{*overall}, testsResult...) + testsResult := result.TestsAPIResult.sort(req).limit(req) + if result.Test != nil { + testsResult = append([]apitype.Test{*result.Test}, testsResult...) } RespondWithJSON(http.StatusOK, w, testsResult) @@ -373,7 +381,7 @@ func PrintTestsJSONFromBigQuery(release string, w http.ResponseWriter, req *http return } - testsResult, overall, err := BuildTestsResultsFromBigQuery(req.Context(), bqc, release, period, collapse, includeOverall, fil) + testsResult, overall, err := buildTestsResultsFromBigQuery(req.Context(), bqc, release, period, collapse, includeOverall, fil) if err != nil { RespondWithJSON(http.StatusInternalServerError, w, map[string]interface{}{"code": http.StatusInternalServerError, "message": "Error building job report:" + err.Error()}) return @@ -387,29 +395,6 @@ func PrintTestsJSONFromBigQuery(release string, w http.ResponseWriter, req *http RespondWithJSON(http.StatusOK, w, testsResult) } -func PrintCanaryTestsFromDB(release string, w http.ResponseWriter, dbc *db.DB) { - f := filter.Filter{ - Items: []filter.FilterItem{ - { - Field: "current_pass_percentage", - Operator: ">=", - Value: "99", - }, - }, - } - - results, _, err := BuildTestsResults(dbc, release, "default", true, false, &f) - if err != nil { - RespondWithJSON(http.StatusInternalServerError, w, map[string]interface{}{"code": http.StatusInternalServerError, "message": "Error building test report:" + err.Error()}) - return - } - - w.Header().Set("Content-Type", "text/plain;charset=UTF-8") - for _, result := range results { - fmt.Fprintf(w, "%q:struct{}{},\n", result.Name) - } -} - func GetJobRunTestsCountByLookback(dbc *db.DB, lookbackDays int) (int64, int64, error) { if lookbackDays < 1 { return -1, -1, errors.New("Lookback Days must be greater than zero") @@ -444,37 +429,64 @@ func GetJobRunTestsCountByLookback(dbc *db.DB, lookbackDays int) (int64, int64, return queryCounts.JobRunsCount, queryCounts.TestIDsCount, nil } -func BuildTestsResults(dbc *db.DB, release, period string, collapse, includeOverall bool, fil *filter.Filter) (testsAPIResult, *apitype.Test, error) { //lint:ignore +type TestResultsSpec struct { + Release, Period string + Collapse, IncludeOverall bool + Filter *filter.Filter +} +type testResults struct { + TestsAPIResult + Test *apitype.Test +} + +const testResultsCacheDuration = time.Hour + +func (spec *TestResultsSpec) buildTestsResultsFromPostgres(dbc *db.DB, cacheClient cache.Cache) (testResults, error) { //lint:ignore + matview := testReport7dMatView + if spec.Period == "twoDay" { + matview = testReport2dMatView + } + + generator := func(ctx context.Context) (testResults, []error) { + return spec.buildTestsResults(dbc, matview) + } + result, errs := GetDataFromCacheOrMatview(context.TODO(), cacheClient, + NewCacheSpec(spec, "PostgresTestsResults~", nil), + matview, testResultsCacheDuration, + generator, + testResults{}, + ) + if errs != nil { + return result, fmt.Errorf("error(s) querying test results: %s", errs) + } + return result, nil +} + +func (spec *TestResultsSpec) buildTestsResults(dbc *db.DB, matview string) (result testResults, errs []error) { //lint:ignore now := time.Now() // Test results are generated by using two subqueries, which need to be filtered separately. Once during // pre-processing where we're evaluating summed variant results, and in post-processing after we've // assembled our final temporary table. var rawFilter, processedFilter *filter.Filter - if fil != nil { - rawFilter, processedFilter = fil.Split([]string{"name", "variants"}) - } - - table := testReport7dMatView - if period == "twoDay" { - table = testReport2dMatView + if spec.Filter != nil { + rawFilter, processedFilter = spec.Filter.Split([]string{"name", "variants"}) } rawQuery := dbc.DB. - Table(table). - Where("release = ?", release) + Table(matview). + Where("release = ?", spec.Release) // Collapse groups the test results together -- otherwise we return the test results per-variant combo (NURP+) variantSelect := "" - if collapse { + if spec.Collapse { rawQuery = rawQuery.Select(`suite_name,name,jira_component,jira_component_id,` + query.QueryTestSummer).Group("suite_name,name,jira_component,jira_component_id") } else { - rawQuery = query.TestsByNURPAndStandardDeviation(dbc, release, table) + rawQuery = query.TestsByNURPAndStandardDeviation(dbc, spec.Release, matview) variantSelect = "suite_name, variants," + "delta_from_working_average, working_average, working_standard_deviation, " + "delta_from_passing_average, passing_average, passing_standard_deviation, " + "delta_from_flake_average, flake_average, flake_standard_deviation, " - } if rawFilter != nil { @@ -495,12 +507,14 @@ func BuildTestsResults(dbc *db.DB, release, period string, collapse, includeOver frr := finalResults.Scan(&testReports) if frr.Error != nil { log.WithError(finalResults.Error).Error("error querying test reports") - return []apitype.Test{}, nil, frr.Error + result.TestsAPIResult = []apitype.Test{} + errs = append(errs, frr.Error) + return } // Produce a special "overall" test that has a summary of all the selected tests. var overallTest *apitype.Test - if includeOverall { + if spec.IncludeOverall { finalResults := dbc.DB.Table("(?) as final_results", finalResults) finalResults = finalResults.Select(query.QueryTestSummer) summaryResult := dbc.DB.Table("(?) as overall", finalResults).Select(query.QueryTestSummarizer) @@ -516,12 +530,14 @@ func BuildTestsResults(dbc *db.DB, release, period string, collapse, includeOver log.WithFields(log.Fields{ "elapsed": elapsed, "reports": len(testReports), - }).Debug("BuildTestsResults completed") + }).Debug("buildTestsResultsFromPostgres completed") - return testReports, overallTest, nil + result.TestsAPIResult = testReports + result.Test = overallTest + return } -func BuildTestsResultsFromBigQuery(ctx context.Context, bqc *bq.Client, release, period string, collapse, includeOverall bool, fil *filter.Filter) (testsAPIResultBQ, *apitype.TestBQ, error) { //lint:ignore +func buildTestsResultsFromBigQuery(ctx context.Context, bqc *bq.Client, release, period string, collapse, includeOverall bool, fil *filter.Filter) (testsAPIResultBQ, *apitype.TestBQ, error) { //lint:ignore now := time.Now() // Test results are generated by using two subqueries, which need to be filtered separately. Once during @@ -671,7 +687,7 @@ func BuildTestsResultsFromBigQuery(ctx context.Context, bqc *bq.Client, release, log.WithFields(log.Fields{ "elapsed": elapsed, "reports": len(testReports), - }).Debug("BuildTestsResults completed") + }).Debug("buildTestsResultsFromPostgres completed") return testReports, overallTest, nil } diff --git a/pkg/sippyserver/pr_commenting_processor.go b/pkg/sippyserver/pr_commenting_processor.go index 4e9778153a..c429359352 100644 --- a/pkg/sippyserver/pr_commenting_processor.go +++ b/pkg/sippyserver/pr_commenting_processor.go @@ -21,6 +21,7 @@ import ( jobQueries "github.com/openshift/sippy/pkg/api" "github.com/openshift/sippy/pkg/apis/api" + "github.com/openshift/sippy/pkg/apis/cache" "github.com/openshift/sippy/pkg/apis/prow" "github.com/openshift/sippy/pkg/bigquery" "github.com/openshift/sippy/pkg/dataloader/prowloader/gcs" @@ -62,15 +63,21 @@ var ( // NewWorkProcessor creates a standard work processor from parameters. // dbc: our database +// bigQueryClient: client for querying our data warehouse // gcsBucket: handle to our root gcs bucket +// cacheClient: client for our local redis cache +// ghCommenter: the commenting implementation // commentAnalysisWorkers: the number of threads active to process pending comment jobs // commentAnalysisRate: the minimun duration between querying the db for pending jobs // commentUpdaterRate: the minimum duration between adding a comment before we begin work on adding the next -// ghCommenter: the commenting implmentation // dryRunOnly: default is true to prevent unintended commenting when running locally or in a test deployment -func NewWorkProcessor(dbc *db.DB, gcsBucket *storage.BucketHandle, commentAnalysisWorkers int, bigQueryClient *bigquery.Client, commentAnalysisRate, commentUpdaterRate time.Duration, ghCommenter *commenter.GitHubCommenter, dryRunOnly bool) *WorkProcessor { - wp := &WorkProcessor{dbc: dbc, gcsBucket: gcsBucket, ghCommenter: ghCommenter, +func NewWorkProcessor(dbc *db.DB, bigQueryClient *bigquery.Client, gcsBucket *storage.BucketHandle, cacheClient cache.Cache, ghCommenter *commenter.GitHubCommenter, commentAnalysisWorkers int, commentAnalysisRate, commentUpdaterRate time.Duration, dryRunOnly bool) *WorkProcessor { + wp := &WorkProcessor{ + dbc: dbc, bigQueryClient: bigQueryClient, + gcsBucket: gcsBucket, + cacheClient: cacheClient, + ghCommenter: ghCommenter, commentAnalysisRate: commentAnalysisRate, commentUpdaterRate: commentUpdaterRate, commentAnalysisWorkers: commentAnalysisWorkers, @@ -86,6 +93,7 @@ type WorkProcessor struct { commentAnalysisRate time.Duration commentAnalysisWorkers int dbc *db.DB + cacheClient cache.Cache gcsBucket *storage.BucketHandle ghCommenter *commenter.GitHubCommenter bigQueryClient *bigquery.Client @@ -112,6 +120,7 @@ type CommentWorker struct { type AnalysisWorker struct { dbc *db.DB + cacheClient cache.Cache gcsBucket *storage.BucketHandle bigQueryClient *bigquery.Client riskAnalysisLocator *regexp.Regexp @@ -170,6 +179,7 @@ func (wp *WorkProcessor) Run(ctx context.Context) { analysisWorker := AnalysisWorker{ riskAnalysisLocator: gcs.GetDefaultRiskAnalysisSummaryFile(), dbc: wp.dbc, + cacheClient: wp.cacheClient, gcsBucket: wp.gcsBucket, bigQueryClient: wp.bigQueryClient, prCommentProspects: prospects, @@ -812,7 +822,7 @@ func (aw *AnalysisWorker) getRiskSummary(ctx context.Context, jobRunID, jobRunID if !errors.Is(err, gorm.ErrRecordNotFound) { logger.WithError(err).Errorf("Error fetching job run for: %s", jobRunIDPath) } - } else if ra, err := jobQueries.JobRunRiskAnalysis(ctx, aw.dbc, aw.bigQueryClient, jobRun, logger, true); err != nil { + } else if ra, err := jobQueries.JobRunRiskAnalysis(ctx, logger, aw.dbc, aw.bigQueryClient, aw.cacheClient, jobRun, true); err != nil { logger.WithError(err).Errorf("Error querying risk analysis for: %s", jobRunIDPath) } else { // query succeeded so use the riskAnalysis we got diff --git a/pkg/sippyserver/server.go b/pkg/sippyserver/server.go index fdcb0776e7..c7d95b8c10 100644 --- a/pkg/sippyserver/server.go +++ b/pkg/sippyserver/server.go @@ -222,7 +222,7 @@ func (s *Server) GetReportEnd() time.Time { // // refreshMatviewOnlyIfEmpty is used on startup to indicate that we want to do an initial refresh *only* if // the views appear to be empty. -func refreshMaterializedViews(dbc *db.DB, refreshMatviewOnlyIfEmpty bool) { +func refreshMaterializedViews(dbc *db.DB, cacheClient cache.Cache, refreshMatviewOnlyIfEmpty bool) { var promPusher *push.Pusher if pushgateway := os.Getenv("SIPPY_PROMETHEUS_PUSHGATEWAY"); pushgateway != "" { promPusher = push.New(pushgateway, "sippy-matviews") @@ -269,7 +269,7 @@ func refreshMaterializedViews(dbc *db.DB, refreshMatviewOnlyIfEmpty bool) { // allow concurrent workers for refreshing matviews in parallel for t := 0; t < 2; t++ { wg.Add(1) - go refreshMatview(dbc, refreshMatviewOnlyIfEmpty, ch, &wg) + go refreshMatview(dbc, cacheClient, refreshMatviewOnlyIfEmpty, ch, &wg) } // Sort materialized views so prow_test_report_2d_matview runs last to avoid CPU overload @@ -315,7 +315,7 @@ func refreshMaterializedViews(dbc *db.DB, refreshMatviewOnlyIfEmpty bool) { } } -func refreshMatview(dbc *db.DB, refreshMatviewOnlyIfEmpty bool, ch chan string, wg *sync.WaitGroup) { +func refreshMatview(dbc *db.DB, cacheClient cache.Cache, refreshMatviewOnlyIfEmpty bool, ch chan string, wg *sync.WaitGroup) { for matView := range ch { start := time.Now() @@ -346,21 +346,36 @@ func refreshMatview(dbc *db.DB, refreshMatviewOnlyIfEmpty bool, ch chan string, } else { elapsed := time.Since(start) tmpLog.WithField("elapsed", elapsed).Info("refreshed materialized view") + recordMatviewRefreshTime(cacheClient, matView, tmpLog) matViewRefreshMetric.WithLabelValues(matView).Observe(float64(elapsed.Milliseconds())) } } else { elapsed := time.Since(start) tmpLog.WithField("elapsed", elapsed).Info("refreshed materialized view concurrently") + recordMatviewRefreshTime(cacheClient, matView, tmpLog) matViewRefreshMetric.WithLabelValues(matView).Observe(float64(elapsed.Milliseconds())) } } wg.Done() } -func RefreshData(dbc *db.DB, pinnedDateTime *time.Time, refreshMatviewsOnlyIfEmpty bool) { +func recordMatviewRefreshTime(cacheClient cache.Cache, matView string, tmpLog *log.Entry) { + if cacheClient == nil { + return + } + // note that a matview refresh uses the source data that is present at the *start* of the refresh, + // but the matview data updates may not be available to read until it completes; so we invalidate the cache with + // the timestamp *after* the refresh completes. + ts := []byte(time.Now().UTC().Format(time.RFC3339)) + if err := cacheClient.Set(context.Background(), api.RefreshMatviewKey(matView), ts, 24*time.Hour); err != nil { + tmpLog.WithError(err).Warn("failed to record matview refresh timestamp in cache") + } +} + +func RefreshData(dbc *db.DB, cacheClient cache.Cache, refreshMatviewsOnlyIfEmpty bool) { log.Infof("Refreshing data") - refreshMaterializedViews(dbc, refreshMatviewsOnlyIfEmpty) + refreshMaterializedViews(dbc, cacheClient, refreshMatviewsOnlyIfEmpty) log.Info("Refresh complete") } @@ -1105,7 +1120,7 @@ func (s *Server) jsonJobBugsFromDB(w http.ResponseWriter, req *http.Request) { func (s *Server) jsonTestsReportFromDB(w http.ResponseWriter, req *http.Request) { release := s.getParamOrFail(w, req, "release") if release != "" { - api.PrintTestsJSONFromDB(release, w, req, s.db) + api.PrintTestsJSONFromDB(release, w, req, s.db, s.cache) } } @@ -1245,13 +1260,6 @@ func (s *Server) printReportDate(w http.ResponseWriter, req *http.Request) { api.RespondWithJSON(http.StatusOK, w, map[string]interface{}{"pinnedDateTime": reportDate}) } -func (s *Server) printCanaryReportFromDB(w http.ResponseWriter, req *http.Request) { - release := s.getParamOrFail(w, req, "release") - if release != "" { - api.PrintCanaryTestsFromDB(release, w, s.db) - } -} - func (s *Server) jsonVariantsReportFromDB(w http.ResponseWriter, req *http.Request) { release := s.getParamOrFail(w, req, "release") if release != "" { @@ -1467,7 +1475,7 @@ func (s *Server) jsonJobRunRiskAnalysis(w http.ResponseWriter, req *http.Request } logger.Infof("job run = %+v", *jobRun) - result, err := api.JobRunRiskAnalysis(req.Context(), s.db, s.bigQueryClient, jobRun, logger, false) + result, err := api.JobRunRiskAnalysis(req.Context(), logger, s.db, s.bigQueryClient, s.cache, jobRun, false) if err != nil { failureResponse(w, http.StatusBadRequest, err.Error()) return @@ -2447,7 +2455,6 @@ func (s *Server) Serve() { EndpointPath: "/api/tests", Description: "Reports on tests", Capabilities: []string{LocalDBCapability}, - CacheTime: 1 * time.Hour, HandlerFunc: s.jsonTestsReportFromDB, }, { @@ -2581,12 +2588,6 @@ func (s *Server) Serve() { Capabilities: []string{LocalDBCapability}, HandlerFunc: s.jsonVariantsReportFromDB, }, - { - EndpointPath: "/api/canary", - Description: "Displays canary report from database", - Capabilities: []string{LocalDBCapability}, - HandlerFunc: s.printCanaryReportFromDB, - }, { EndpointPath: "/api/report_date", Description: "Displays report date", diff --git a/scripts/updatesuites.go b/scripts/updatesuites.go index fefcc18e3d..6bfac0b36f 100644 --- a/scripts/updatesuites.go +++ b/scripts/updatesuites.go @@ -150,6 +150,7 @@ func testNameWithoutSuite(dbc *gorm.DB) error { log.Infof("update complete, total rows updated %d", rowsUpdated) // Refresh materialized views + // NOTE: does not update timestamps to invalidate cached matview data; not clear if the use case for this script requires that. sippyserver.RefreshData(&db.DB{ DB: dbc, }, nil, false)