Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 127 additions & 77 deletions pkg/dataloader/prowloader/prow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"net/url"
"os"
"reflect"
"regexp"
"strconv"
Expand All @@ -19,6 +20,7 @@ import (
"cloud.google.com/go/civil"
"cloud.google.com/go/storage"
"github.com/jackc/pgtype"
"github.com/lib/pq"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -592,17 +594,15 @@ func (pl *ProwLoader) syncPRStatus() error {
// if we see that any sha has merged for this pr then we should clear out any risk analysis pending comment records
// if we don't get them here we will catch them before writing the risk analysis comment
// but, we should clean up here if possible
if recentMergedAt != nil {
pendingComments, err := pl.ghCommenter.QueryPRPendingComments(pr.Org, pr.Repo, pr.Number, models.CommentTypeRiskAnalysis)
pendingComments, err := pl.ghCommenter.QueryPRPendingComments(pr.Org, pr.Repo, pr.Number, models.CommentTypeRiskAnalysis)

if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
logger.WithError(err).Error("Unable to fetch pending comments ")
}
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
logger.WithError(err).Error("Unable to fetch pending comments ")
}

for _, pc := range pendingComments {
pcp := pc
pl.ghCommenter.ClearPendingRecord(pcp.Org, pcp.Repo, pcp.PullNumber, pcp.SHA, models.CommentTypeRiskAnalysis, &pcp)
}
for _, pc := range pendingComments {
pcp := pc
pl.ghCommenter.ClearPendingRecord(pcp.Org, pcp.Repo, pcp.PullNumber, pcp.SHA, models.CommentTypeRiskAnalysis, &pcp)
}
}
}
Expand Down Expand Up @@ -668,25 +668,6 @@ func GetClusterDataBytes(ctx context.Context, bkt *storage.BucketHandle, path st
return bytes, nil
}

func GetClusterData(ctx context.Context, bkt *storage.BucketHandle, path string, matches []string) models.ClusterData {
cd := models.ClusterData{}
bytes, err := GetClusterDataBytes(ctx, bkt, path, matches)
if err != nil {
log.WithError(err).Error("failed to get prow job variant data, returning empty cluster data and proceeding")
return cd
} else if bytes == nil {
log.Warnf("empty job variant data file, returning empty cluster data and proceeding")
return cd
}
err = json.Unmarshal(bytes, &cd)
if err != nil {
log.WithError(err).Error("failed to unmarshal cluster-data bytes, returning empty cluster data")
return cd
}

return cd
}

func ParseVariantDataFile(bytes []byte) (map[string]string, error) {
rawJSONMap := make(map[string]interface{})
err := json.Unmarshal(bytes, &rawJSONMap)
Expand Down Expand Up @@ -839,6 +820,29 @@ func (pl *ProwLoader) prowJobToJobRun(ctx context.Context, pj *prow.ProwJob, rel
// Lock the whole prow job block to avoid trying to create the pj multiple times concurrently\
// (resulting in a DB error)
pl.prowJobCacheLock.Lock()
dbProwJob, err := pl.createOrUpdateProwJob(ctx, pj, release, pjLog)
pl.prowJobCacheLock.Unlock()
if err != nil {
return err
}

pl.prowJobRunCacheLock.RLock()
_, ok := pl.prowJobRunCache[uint(id)]
pl.prowJobRunCacheLock.RUnlock()
if ok {
pjLog.Infof("processing complete; job run was already processed")
return nil
}

pjLog.Info("processing GCS bucket")
if err := pl.processGCSBucketJobRun(ctx, pj, id, path, junitMatches, dbProwJob); err != nil {
return err
}
pjLog.Infof("processing complete")
return nil
}

func (pl *ProwLoader) createOrUpdateProwJob(ctx context.Context, pj *prow.ProwJob, release string, pjLog *log.Entry) (*models.ProwJob, error) {
dbProwJob, foundProwJob := pl.prowJobCache[pj.Spec.Job]
if !foundProwJob {
pjLog.Info("creating new ProwJob")
Expand All @@ -851,7 +855,7 @@ func (pl *ProwLoader) prowJobToJobRun(ctx context.Context, pj *prow.ProwJob, rel
}
err := pl.dbc.DB.WithContext(ctx).Clauses(clause.OnConflict{UpdateAll: true}).Create(dbProwJob).Error
if err != nil {
return errors.Wrapf(err, "error loading prow job into db: %s", pj.Spec.Job)
return nil, errors.Wrapf(err, "error loading prow job into db: %s", pj.Spec.Job)
}
pl.prowJobCache[pj.Spec.Job] = dbProwJob
} else {
Expand All @@ -870,63 +874,60 @@ func (pl *ProwLoader) prowJobToJobRun(ctx context.Context, pj *prow.ProwJob, rel
}
if saveDB {
if res := pl.dbc.DB.WithContext(ctx).Save(&dbProwJob); res.Error != nil {
return res.Error
return nil, res.Error
}
}
}
pl.prowJobCacheLock.Unlock()

pl.prowJobRunCacheLock.RLock()
_, ok := pl.prowJobRunCache[uint(id)]
pl.prowJobRunCacheLock.RUnlock()
if ok {
pjLog.Infof("job run was already processed")
} else {
pjLog.Info("processing GCS bucket")
return dbProwJob, nil
}

tests, failures, overallResult, err := pl.prowJobRunTestsFromGCS(ctx, pj, uint(id), path, junitMatches)
if err != nil {
return err
}
func (pl *ProwLoader) processGCSBucketJobRun(ctx context.Context, pj *prow.ProwJob, id uint64, path string, junitMatches []string, dbProwJob *models.ProwJob) error {
tests, failures, overallResult, err := pl.prowJobRunTestsFromGCS(ctx, pj, uint(id), path, junitMatches)
if err != nil {
return err
}

pulls := pl.findOrAddPullRequests(pj.Spec.Refs, path)
pulls := pl.findOrAddPullRequests(pj.Spec.Refs, path)

var duration time.Duration
if pj.Status.CompletionTime != nil {
duration = pj.Status.CompletionTime.Sub(pj.Status.StartTime)
}

err = pl.dbc.DB.WithContext(ctx).Create(&models.ProwJobRun{
Model: gorm.Model{
ID: uint(id),
},
Cluster: pj.Spec.Cluster,
Duration: duration,
ProwJob: *dbProwJob,
ProwJobID: dbProwJob.ID,
URL: pj.Status.URL,
GCSBucket: pj.Spec.DecorationConfig.GCSConfiguration.Bucket,
Timestamp: pj.Status.StartTime,
OverallResult: overallResult,
PullRequests: pulls,
TestFailures: failures,
Succeeded: overallResult == sippyprocessingv1.JobSucceeded,
}).Error
if err != nil {
return err
}
// Looks like sometimes, we might be getting duplicate entries from bigquery:
pl.prowJobRunCacheLock.Lock()
pl.prowJobRunCache[uint(id)] = true
pl.prowJobRunCacheLock.Unlock()
labels, err := GatherLabelsFromBQ(ctx, pl.bigQueryClient, pj.Status.BuildID, pj.Status.StartTime)
if err != nil {
return err
}

err = pl.dbc.DB.WithContext(ctx).Debug().CreateInBatches(tests, 1000).Error
if err != nil {
return err
}
var duration time.Duration
if pj.Status.CompletionTime != nil {
duration = pj.Status.CompletionTime.Sub(pj.Status.StartTime)
}

err = pl.dbc.DB.WithContext(ctx).Create(&models.ProwJobRun{
Model: gorm.Model{
ID: uint(id),
},
Cluster: pj.Spec.Cluster,
Duration: duration,
ProwJob: *dbProwJob,
ProwJobID: dbProwJob.ID,
URL: pj.Status.URL,
GCSBucket: pj.Spec.DecorationConfig.GCSConfiguration.Bucket,
Timestamp: pj.Status.StartTime,
OverallResult: overallResult,
PullRequests: pulls,
TestFailures: failures,
Succeeded: overallResult == sippyprocessingv1.JobSucceeded,
Labels: labels,
}).Error
if err != nil {
return err
}
// Looks like sometimes, we might be getting duplicate entries from bigquery:
pl.prowJobRunCacheLock.Lock()
pl.prowJobRunCache[uint(id)] = true
pl.prowJobRunCacheLock.Unlock()

pjLog.Infof("processing complete")
err = pl.dbc.DB.WithContext(ctx).Debug().CreateInBatches(tests, 1000).Error
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -1036,6 +1037,55 @@ func (pl *ProwLoader) findOrAddPullRequests(refs *prow.Refs, pjPath string) []mo
return pulls
}

const LabelsDatasetEnv = "JOB_LABELS_DATASET"
const LabelsTableName = "job_labels"

// GatherLabelsFromBQ queries BigQuery for labels associated with this job run.
// Labels are stored in the job_labels table and indexed by prowjob_build_id.
func GatherLabelsFromBQ(ctx context.Context, bqClient *bqcachedclient.Client, buildID string, startTime time.Time) (pq.StringArray, error) {
if bqClient == nil {
return nil, nil
}
logger := log.WithField("buildID", buildID)

dataset := os.Getenv(LabelsDatasetEnv)
if dataset == "" {
dataset = bqClient.Dataset
}
table := fmt.Sprintf("`%s.%s`", dataset, LabelsTableName)
q := bqClient.BQ.Query(`
SELECT ARRAY_AGG(DISTINCT label ORDER BY label ASC) AS labels
FROM ` + table + `
WHERE prowjob_build_id = @BuildID
AND DATE(prowjob_start) = DATE(@StartTime)
`)
q.Parameters = []bigquery.QueryParameter{
{
Name: "BuildID",
Value: buildID,
},
{
Name: "StartTime",
Value: startTime,
},
}

var result struct {
Labels []string `bigquery:"labels"`
}
it, err := q.Read(ctx)
if err != nil {
logger.WithError(err).Warning("error querying labels from bigquery")
return nil, err
}
if err = it.Next(&result); err != nil && err != iterator.Done {
logger.WithError(err).Warning("error parsing labels from bigquery")
return nil, err
}

return result.Labels, nil
}

func (pl *ProwLoader) findOrAddTest(name string) (uint, error) {
pl.prowJobRunTestCacheLock.RLock()
if id, ok := pl.prowJobRunTestCache[name]; ok {
Expand Down