Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/sippy-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ func NewSippyDaemonCommand() *cobra.Command {
// 4 potential GitHub calls per comment gives us a safe buffer
// get comment data, get existing comments, possible delete existing, and adding the comment
// could lower to 3 seconds if we need, most writes likely won't have to delete
processes = append(processes, sippyserver.NewWorkProcessor(dbc,
gcsClient.Bucket(f.GoogleCloudFlags.StorageBucket),
10, bigQueryClient, 5*time.Minute, 5*time.Second, ghCommenter, f.GithubCommenterFlags.CommentProcessingDryRun))
processes = append(processes, sippyserver.NewWorkProcessor(dbc, bigQueryClient, gcsClient.Bucket(f.GoogleCloudFlags.StorageBucket), cacheClient, ghCommenter, 10, 5*time.Minute, 5*time.Second, f.GithubCommenterFlags.CommentProcessingDryRun))
}

daemonServer := sippyserver.NewDaemonServer(processes)
Expand Down
9 changes: 6 additions & 3 deletions cmd/sippy/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,13 @@ func NewLoadCommand() *cobra.Command {
}
}

// likewise get a cache client if possible, though some things operate without it.
cacheClient, cacheErr := f.CacheFlags.GetCacheClient()
releaseConfigs := []sippyv1.Release{}
if cacheErr != nil {
cacheClient = nil // error hygiene, since we pass this down to quite a few functions
}

releaseConfigs := []sippyv1.Release{}
// initializing a bigquery client different from the normal one
opCtx, ctx := bqcachedclient.OpCtxForCronEnv(ctx, "load")
bqc, bigqueryErr := bqcachedclient.New(
Expand Down Expand Up @@ -337,9 +341,8 @@ func NewLoadCommand() *cobra.Command {
elapsed := time.Since(start)
log.WithField("elapsed", elapsed).Info("database load complete")

pinnedTime := f.DBFlags.GetPinnedTime()
if refreshMatviews && !f.SkipMatviewRefresh {
sippyserver.RefreshData(dbc, pinnedTime, false)
sippyserver.RefreshData(dbc, cacheClient, false)
}

elapsed = time.Since(start)
Expand Down
17 changes: 14 additions & 3 deletions cmd/sippy/refresh.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package main

import (
"fmt"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

Expand All @@ -10,17 +13,20 @@ import (

type RefreshFlags struct {
DBFlags *flags.PostgresFlags
CacheFlags *flags.CacheFlags
RefreshOnlyIfEmpty bool
}

func NewRefreshFlags() *RefreshFlags {
return &RefreshFlags{
DBFlags: flags.NewPostgresDatabaseFlags(),
DBFlags: flags.NewPostgresDatabaseFlags(),
CacheFlags: flags.NewCacheFlags(),
}
}

func (f *RefreshFlags) BindFlags(fs *pflag.FlagSet) {
f.DBFlags.BindFlags(fs)
f.CacheFlags.BindFlags(fs)
fs.BoolVar(&f.RefreshOnlyIfEmpty, "refresh-only-if-empty", f.RefreshOnlyIfEmpty, "only refresh matviews if they're empty")
}

Expand All @@ -35,8 +41,13 @@ func NewRefreshCommand() *cobra.Command {
if err != nil {
return err
}
pinnedDateTime := f.DBFlags.GetPinnedTime()
sippyserver.RefreshData(dbc, pinnedDateTime, f.RefreshOnlyIfEmpty)
cacheClient, cacheErr := f.CacheFlags.GetCacheClient()
if cacheErr != nil {
return fmt.Errorf("failed to get cache client: %v", cacheErr)
} else if cacheClient == nil {
logrus.Warn("no cache provided; refresh will not update cached timestamps, so cached data may not be properly invalidated")
}
sippyserver.RefreshData(dbc, cacheClient, f.RefreshOnlyIfEmpty)
return nil
},
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/sippy/seed_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

type SeedDataFlags struct {
DBFlags *flags.PostgresFlags
CacheFlags *flags.CacheFlags
InitDatabase bool
Releases []string
JobsPerRelease int
Expand All @@ -30,6 +31,7 @@ type SeedDataFlags struct {
func NewSeedDataFlags() *SeedDataFlags {
return &SeedDataFlags{
DBFlags: flags.NewPostgresDatabaseFlags(),
CacheFlags: flags.NewCacheFlags(),
Releases: []string{"5.0", "4.22", "4.21"}, // Default releases
JobsPerRelease: 3, // Default jobs per release
TestNames: []string{
Expand All @@ -48,6 +50,7 @@ func NewSeedDataFlags() *SeedDataFlags {

func (f *SeedDataFlags) BindFlags(fs *pflag.FlagSet) {
f.DBFlags.BindFlags(fs)
f.CacheFlags.BindFlags(fs)
fs.BoolVar(&f.InitDatabase, "init-database", false, "Initialize the DB schema before seeding data")
fs.StringSliceVar(&f.Releases, "release", f.Releases, "Releases to create ProwJobs for (can be specified multiple times)")
fs.IntVar(&f.JobsPerRelease, "jobs", f.JobsPerRelease, "Number of ProwJobs to create for each release")
Expand Down Expand Up @@ -86,6 +89,13 @@ rolled off the 1 week window.
log.Info("Database schema initialized successfully")
}

cacheClient, cacheErr := f.CacheFlags.GetCacheClient()
if cacheErr != nil {
return fmt.Errorf("failed to get cache client: %v", cacheErr)
} else if cacheClient == nil {
log.Warn("no cache provided; refresh timestamps will not be cached")
}

log.Info("Starting to seed test data...")

// Create the test suite
Expand Down Expand Up @@ -131,7 +141,7 @@ rolled off the 1 week window.
totalTestResults := totalRuns * len(f.TestNames)

log.Info("Refreshing materialized views...")
sippyserver.RefreshData(dbc, nil, false)
sippyserver.RefreshData(dbc, cacheClient, false)

log.Infof("Successfully seeded test data! Created %d ProwJobs, %d Tests, %d ProwJobRuns, and %d test results",
totalProwJobs, len(f.TestNames), totalRuns, totalTestResults)
Expand Down
76 changes: 76 additions & 0 deletions pkg/api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,79 @@ func isStructWithNoPublicFields(v interface{}) bool {
}
return true
}

// GetDataFromCacheOrMatview caches data that is based on a matview and invalidates it when the matview is refreshed
func GetDataFromCacheOrMatview[T any](ctx context.Context,
cacheClient cache.Cache, cacheSpec CacheSpec,
matview string,
cacheDuration time.Duration,
generateFn func(context.Context) (T, []error),
defaultVal T,
) (T, []error) {
if cacheClient == nil {
return generateFn(ctx)
}

cacheKey, err := cacheSpec.GetCacheKey()
if err != nil {
return defaultVal, []error{err}
}

// If someone gives us an uncacheable cacheKey, panic so it gets detected in testing
if len(cacheKey) == 0 {
panic(fmt.Sprintf("cache key is empty for %s", reflect.TypeOf(defaultVal)))
}
// If someone gives us an uncacheable value, panic so it gets detected in testing
if isStructWithNoPublicFields(defaultVal) {
panic(fmt.Sprintf("cannot cache type %s that exports no fields", reflect.TypeOf(defaultVal)))
}

var cacheVal struct {
Val T // the actual value we want to cache
Timestamp time.Time // the time when it was cached (for comparison to matview refresh time)
}
if cached, err := cacheClient.Get(ctx, string(cacheKey), 0); err == nil {
logrus.WithFields(logrus.Fields{
"key": string(cacheKey),
"type": reflect.TypeOf(defaultVal).String(),
}).Debugf("cache hit")

if err := json.Unmarshal(cached, &cacheVal); err != nil {
return defaultVal, []error{errors.WithMessagef(err, "failed to unmarshal cached item. cacheKey=%+v", cacheKey)}
}

// look up when the matview was refreshed to see if the cached value is stale
var lastRefresh time.Time
if lastRefreshBytes, err := cacheClient.Get(ctx, RefreshMatviewKey(matview), 0); err == nil {
if parsed, err := time.Parse(time.RFC3339, string(lastRefreshBytes)); err != nil {
logrus.WithError(err).Warnf("failed to parse matview refresh timestamp %q for %q; cache will not be invalidated", lastRefreshBytes, matview)
} else {
lastRefresh = parsed
}
}

if lastRefresh.Before(cacheVal.Timestamp) {
// not invalidated by a newer refresh, so use it (if we don't know the last refresh, still use it)
return cacheVal.Val, nil
}
logrus.Debugf("matview %q refreshed at %v, will not use earlier cache entry from %v", matview, lastRefresh, cacheVal.Timestamp)
} else if strings.Contains(err.Error(), "connection refused") {
logrus.WithError(err).Fatalf("redis URL specified but got connection refused; exiting due to cost issues in this configuration")
} else {
logrus.WithFields(logrus.Fields{"key": string(cacheKey)}).Debugf("cache miss")
}

// Cache missed or refresh invalidated the data, so generate it.
logrus.Debugf("cache duration set to %s or approx %s for key %s", cacheDuration, time.Now().Add(cacheDuration).Format(time.RFC3339), cacheKey)
result, errs := generateFn(ctx)
if len(errs) == 0 {
cacheVal.Val = result
cacheVal.Timestamp = time.Now().UTC()
CacheSet(ctx, cacheClient, cacheVal, cacheKey, cacheDuration)
}
return result, errs
}

func RefreshMatviewKey(matview string) string {
return "matview_refreshed:" + matview
}
Loading