From 72d199f319da51f52537655bfc80f1e194a6b18e Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Tue, 13 Jan 2026 11:15:35 -0500 Subject: [PATCH 1/2] prowloader: minor refactoring of prow.go prepare for expanding the loader to handle job labels. from too-long method prowJobToJobRun break out methods createOrUpdateProwJob and processGCSBucketJobRun. remove GetClusterData which has no evident caller in sippy or cloud functions. other minor simplifications. --- pkg/dataloader/prowloader/prow.go | 151 ++++++++++++++---------------- 1 file changed, 72 insertions(+), 79 deletions(-) diff --git a/pkg/dataloader/prowloader/prow.go b/pkg/dataloader/prowloader/prow.go index aa4d490f2e..f2cd3cf939 100644 --- a/pkg/dataloader/prowloader/prow.go +++ b/pkg/dataloader/prowloader/prow.go @@ -592,17 +592,15 @@ func (pl *ProwLoader) syncPRStatus() error { // if we see that any sha has merged for this pr then we should clear out any risk analysis pending comment records // if we don't get them here we will catch them before writing the risk analysis comment // but, we should clean up here if possible - if recentMergedAt != nil { - pendingComments, err := pl.ghCommenter.QueryPRPendingComments(pr.Org, pr.Repo, pr.Number, models.CommentTypeRiskAnalysis) + pendingComments, err := pl.ghCommenter.QueryPRPendingComments(pr.Org, pr.Repo, pr.Number, models.CommentTypeRiskAnalysis) - if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { - logger.WithError(err).Error("Unable to fetch pending comments ") - } + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + logger.WithError(err).Error("Unable to fetch pending comments ") + } - for _, pc := range pendingComments { - pcp := pc - pl.ghCommenter.ClearPendingRecord(pcp.Org, pcp.Repo, pcp.PullNumber, pcp.SHA, models.CommentTypeRiskAnalysis, &pcp) - } + for _, pc := range pendingComments { + pcp := pc + pl.ghCommenter.ClearPendingRecord(pcp.Org, pcp.Repo, pcp.PullNumber, pcp.SHA, models.CommentTypeRiskAnalysis, &pcp) } } } @@ -668,25 +666,6 @@ func GetClusterDataBytes(ctx context.Context, bkt *storage.BucketHandle, path st return bytes, nil } -func GetClusterData(ctx context.Context, bkt *storage.BucketHandle, path string, matches []string) models.ClusterData { - cd := models.ClusterData{} - bytes, err := GetClusterDataBytes(ctx, bkt, path, matches) - if err != nil { - log.WithError(err).Error("failed to get prow job variant data, returning empty cluster data and proceeding") - return cd - } else if bytes == nil { - log.Warnf("empty job variant data file, returning empty cluster data and proceeding") - return cd - } - err = json.Unmarshal(bytes, &cd) - if err != nil { - log.WithError(err).Error("failed to unmarshal cluster-data bytes, returning empty cluster data") - return cd - } - - return cd -} - func ParseVariantDataFile(bytes []byte) (map[string]string, error) { rawJSONMap := make(map[string]interface{}) err := json.Unmarshal(bytes, &rawJSONMap) @@ -839,6 +818,29 @@ func (pl *ProwLoader) prowJobToJobRun(ctx context.Context, pj *prow.ProwJob, rel // Lock the whole prow job block to avoid trying to create the pj multiple times concurrently\ // (resulting in a DB error) pl.prowJobCacheLock.Lock() + dbProwJob, err := pl.createOrUpdateProwJob(ctx, pj, release, pjLog) + pl.prowJobCacheLock.Unlock() + if err != nil { + return err + } + + pl.prowJobRunCacheLock.RLock() + _, ok := pl.prowJobRunCache[uint(id)] + pl.prowJobRunCacheLock.RUnlock() + if ok { + pjLog.Infof("processing complete; job run was already processed") + return nil + } + + pjLog.Info("processing GCS bucket") + if err := pl.processGCSBucketJobRun(ctx, pj, id, path, junitMatches, dbProwJob); err != nil { + return err + } + pjLog.Infof("processing complete") + return nil +} + +func (pl *ProwLoader) createOrUpdateProwJob(ctx context.Context, pj *prow.ProwJob, release string, pjLog *log.Entry) (*models.ProwJob, error) { dbProwJob, foundProwJob := pl.prowJobCache[pj.Spec.Job] if !foundProwJob { pjLog.Info("creating new ProwJob") @@ -851,7 +853,7 @@ func (pl *ProwLoader) prowJobToJobRun(ctx context.Context, pj *prow.ProwJob, rel } err := pl.dbc.DB.WithContext(ctx).Clauses(clause.OnConflict{UpdateAll: true}).Create(dbProwJob).Error if err != nil { - return errors.Wrapf(err, "error loading prow job into db: %s", pj.Spec.Job) + return nil, errors.Wrapf(err, "error loading prow job into db: %s", pj.Spec.Job) } pl.prowJobCache[pj.Spec.Job] = dbProwJob } else { @@ -870,63 +872,54 @@ func (pl *ProwLoader) prowJobToJobRun(ctx context.Context, pj *prow.ProwJob, rel } if saveDB { if res := pl.dbc.DB.WithContext(ctx).Save(&dbProwJob); res.Error != nil { - return res.Error + return nil, res.Error } } } - pl.prowJobCacheLock.Unlock() - - pl.prowJobRunCacheLock.RLock() - _, ok := pl.prowJobRunCache[uint(id)] - pl.prowJobRunCacheLock.RUnlock() - if ok { - pjLog.Infof("job run was already processed") - } else { - pjLog.Info("processing GCS bucket") - - tests, failures, overallResult, err := pl.prowJobRunTestsFromGCS(ctx, pj, uint(id), path, junitMatches) - if err != nil { - return err - } - - pulls := pl.findOrAddPullRequests(pj.Spec.Refs, path) - - var duration time.Duration - if pj.Status.CompletionTime != nil { - duration = pj.Status.CompletionTime.Sub(pj.Status.StartTime) - } + return dbProwJob, nil +} - err = pl.dbc.DB.WithContext(ctx).Create(&models.ProwJobRun{ - Model: gorm.Model{ - ID: uint(id), - }, - Cluster: pj.Spec.Cluster, - Duration: duration, - ProwJob: *dbProwJob, - ProwJobID: dbProwJob.ID, - URL: pj.Status.URL, - GCSBucket: pj.Spec.DecorationConfig.GCSConfiguration.Bucket, - Timestamp: pj.Status.StartTime, - OverallResult: overallResult, - PullRequests: pulls, - TestFailures: failures, - Succeeded: overallResult == sippyprocessingv1.JobSucceeded, - }).Error - if err != nil { - return err - } - // Looks like sometimes, we might be getting duplicate entries from bigquery: - pl.prowJobRunCacheLock.Lock() - pl.prowJobRunCache[uint(id)] = true - pl.prowJobRunCacheLock.Unlock() +func (pl *ProwLoader) processGCSBucketJobRun(ctx context.Context, pj *prow.ProwJob, id uint64, path string, junitMatches []string, dbProwJob *models.ProwJob) error { + tests, failures, overallResult, err := pl.prowJobRunTestsFromGCS(ctx, pj, uint(id), path, junitMatches) + if err != nil { + return err + } - err = pl.dbc.DB.WithContext(ctx).Debug().CreateInBatches(tests, 1000).Error - if err != nil { - return err - } + pulls := pl.findOrAddPullRequests(pj.Spec.Refs, path) + + var duration time.Duration + if pj.Status.CompletionTime != nil { + duration = pj.Status.CompletionTime.Sub(pj.Status.StartTime) + } + + err = pl.dbc.DB.WithContext(ctx).Create(&models.ProwJobRun{ + Model: gorm.Model{ + ID: uint(id), + }, + Cluster: pj.Spec.Cluster, + Duration: duration, + ProwJob: *dbProwJob, + ProwJobID: dbProwJob.ID, + URL: pj.Status.URL, + GCSBucket: pj.Spec.DecorationConfig.GCSConfiguration.Bucket, + Timestamp: pj.Status.StartTime, + OverallResult: overallResult, + PullRequests: pulls, + TestFailures: failures, + Succeeded: overallResult == sippyprocessingv1.JobSucceeded, + }).Error + if err != nil { + return err } + // Looks like sometimes, we might be getting duplicate entries from bigquery: + pl.prowJobRunCacheLock.Lock() + pl.prowJobRunCache[uint(id)] = true + pl.prowJobRunCacheLock.Unlock() - pjLog.Infof("processing complete") + err = pl.dbc.DB.WithContext(ctx).Debug().CreateInBatches(tests, 1000).Error + if err != nil { + return err + } return nil } From c17f2089004e49e61107373ea272176faded165d Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Tue, 13 Jan 2026 11:54:58 -0500 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=A4=96=20prowloader:=20record=20job?= =?UTF-8?q?=5Flabels=20from=20BQ=20in=20job=20runs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Assisted by Claude Code --- pkg/dataloader/prowloader/prow.go | 57 +++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/pkg/dataloader/prowloader/prow.go b/pkg/dataloader/prowloader/prow.go index f2cd3cf939..dd5d43127c 100644 --- a/pkg/dataloader/prowloader/prow.go +++ b/pkg/dataloader/prowloader/prow.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/url" + "os" "reflect" "regexp" "strconv" @@ -19,6 +20,7 @@ import ( "cloud.google.com/go/civil" "cloud.google.com/go/storage" "github.com/jackc/pgtype" + "github.com/lib/pq" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "google.golang.org/api/iterator" @@ -887,6 +889,11 @@ func (pl *ProwLoader) processGCSBucketJobRun(ctx context.Context, pj *prow.ProwJ pulls := pl.findOrAddPullRequests(pj.Spec.Refs, path) + labels, err := GatherLabelsFromBQ(ctx, pl.bigQueryClient, pj.Status.BuildID, pj.Status.StartTime) + if err != nil { + return err + } + var duration time.Duration if pj.Status.CompletionTime != nil { duration = pj.Status.CompletionTime.Sub(pj.Status.StartTime) @@ -907,6 +914,7 @@ func (pl *ProwLoader) processGCSBucketJobRun(ctx context.Context, pj *prow.ProwJ PullRequests: pulls, TestFailures: failures, Succeeded: overallResult == sippyprocessingv1.JobSucceeded, + Labels: labels, }).Error if err != nil { return err @@ -1029,6 +1037,55 @@ func (pl *ProwLoader) findOrAddPullRequests(refs *prow.Refs, pjPath string) []mo return pulls } +const LabelsDatasetEnv = "JOB_LABELS_DATASET" +const LabelsTableName = "job_labels" + +// GatherLabelsFromBQ queries BigQuery for labels associated with this job run. +// Labels are stored in the job_labels table and indexed by prowjob_build_id. +func GatherLabelsFromBQ(ctx context.Context, bqClient *bqcachedclient.Client, buildID string, startTime time.Time) (pq.StringArray, error) { + if bqClient == nil { + return nil, nil + } + logger := log.WithField("buildID", buildID) + + dataset := os.Getenv(LabelsDatasetEnv) + if dataset == "" { + dataset = bqClient.Dataset + } + table := fmt.Sprintf("`%s.%s`", dataset, LabelsTableName) + q := bqClient.BQ.Query(` + SELECT ARRAY_AGG(DISTINCT label ORDER BY label ASC) AS labels + FROM ` + table + ` + WHERE prowjob_build_id = @BuildID + AND DATE(prowjob_start) = DATE(@StartTime) + `) + q.Parameters = []bigquery.QueryParameter{ + { + Name: "BuildID", + Value: buildID, + }, + { + Name: "StartTime", + Value: startTime, + }, + } + + var result struct { + Labels []string `bigquery:"labels"` + } + it, err := q.Read(ctx) + if err != nil { + logger.WithError(err).Warning("error querying labels from bigquery") + return nil, err + } + if err = it.Next(&result); err != nil && err != iterator.Done { + logger.WithError(err).Warning("error parsing labels from bigquery") + return nil, err + } + + return result.Labels, nil +} + func (pl *ProwLoader) findOrAddTest(name string) (uint, error) { pl.prowJobRunTestCacheLock.RLock() if id, ok := pl.prowJobRunTestCache[name]; ok {