diff options
Diffstat (limited to 'submitqueue')
-rw-r--r-- | submitqueue/result.go | 55 | ||||
-rw-r--r-- | submitqueue/runner.go | 207 | ||||
-rw-r--r-- | submitqueue/serie.go | 116 | ||||
-rw-r--r-- | submitqueue/series.go | 125 | ||||
-rw-r--r-- | submitqueue/submitqueue.go | 231 |
5 files changed, 175 insertions, 559 deletions
diff --git a/submitqueue/result.go b/submitqueue/result.go deleted file mode 100644 index a15c2968f30d..000000000000 --- a/submitqueue/result.go +++ /dev/null @@ -1,55 +0,0 @@ -package submitqueue - -import ( - "time" - - "github.com/sirupsen/logrus" -) - -// Problem: no inspection during the run -// Problem: record the state - -// Result contains all data necessary to inspect a previous run -// This includes the Series from that run, and all Log Entries collected. -// It also implements the interface required for logrus.Hook. -type Result struct { - LogEntries []*logrus.Entry - Series []Serie - Error error - startTime time.Time - HEAD string -} - -// MakeResult produces a new Result struct, -// and initializes startTime with the current time. -func MakeResult() *Result { - return &Result{ - startTime: time.Now(), - } -} - -// StartTime returns the startTime -func (r Result) StartTime() time.Time { - return r.startTime -} - -// EndTime returns the time of the latest log entry -func (r Result) EndTime() time.Time { - if len(r.LogEntries) == 0 { - return r.startTime - } - return r.LogEntries[len(r.LogEntries)-1].Time -} - -// Fire is called by logrus on each log event, -// we collect all log entries in the struct variable -func (r *Result) Fire(entry *logrus.Entry) error { - r.LogEntries = append(r.LogEntries, entry) - return nil -} - -// Levels is called by logrus to determine whether to Fire the handler. -// As we want to collect all log entries, we return logrus.AllLevels -func (r *Result) Levels() []logrus.Level { - return logrus.AllLevels -} diff --git a/submitqueue/runner.go b/submitqueue/runner.go index 4fd9134a0236..d85467b63d5d 100644 --- a/submitqueue/runner.go +++ b/submitqueue/runner.go @@ -1,60 +1,203 @@ package submitqueue import ( + "fmt" "sync" - "time" + + "github.com/apex/log" + + "github.com/tweag/gerrit-queue/gerrit" ) -// Runner supervises the submit queue and records historical data about it +// Runner is a struct existing across the lifetime of a single run of the submit queue +// it contains a mutex to avoid being run multiple times. +// In fact, it even cancels runs while another one is still in progress. +// It contains a Gerrit object facilitating access, a log object, the configured submit queue tag +// and a `wipSerie` (only populated if waiting for a rebase) type Runner struct { mut sync.Mutex - submitQueue *SubmitQueue - currentlyRunning *time.Time - results []*Result + currentlyRunning bool + wipSerie *gerrit.Serie + logger *log.Logger + gerrit *gerrit.Client + submitQueueTag string // the tag used to submit something to the submit queue } -// NewRunner initializes a new runner object -func NewRunner(sq *SubmitQueue) *Runner { +// NewRunner creates a new Runner struct +func NewRunner(logger *log.Logger, gerrit *gerrit.Client, submitQueueTag string) *Runner { return &Runner{ - submitQueue: sq, - results: []*Result{}, + logger: logger, + gerrit: gerrit, + submitQueueTag: submitQueueTag, } } -// GetState returns a copy of all the state for the frontend -func (r *Runner) GetState() (SubmitQueue, *time.Time, []*Result) { +// isAutoSubmittable determines if something could be autosubmitted, potentially requiring a rebase +// for this, it needs to: +// * have the auto-submit label +// * has +2 review +// * has +1 CI +func (r *Runner) isAutoSubmittable(s *gerrit.Serie) bool { + for _, c := range s.ChangeSets { + if c.Verified != 1 || c.CodeReviewed != 2 || !c.HasTag(r.submitQueueTag) { + return false + } + } + return true +} + +// IsCurrentlyRunning returns true if the runner is currently running +func (r *Runner) IsCurrentlyRunning() bool { + return r.currentlyRunning +} + +// GetWIPSerie returns the current wipSerie, if any, nil otherwiese +// Acquires a lock, so check with IsCurrentlyRunning first +func (r *Runner) GetWIPSerie() *gerrit.Serie { r.mut.Lock() - defer r.mut.Unlock() - return *r.submitQueue, r.currentlyRunning, r.results + defer func() { + r.mut.Unlock() + }() + return r.wipSerie } -// Trigger starts a new batch job -// TODO: make sure only one batch job is started at the same time -// if a batch job is already started, ignore the newest request -// TODO: be more granular in dry-run mode -func (r *Runner) Trigger(fetchOnly bool) { +// Trigger gets triggered periodically +func (r *Runner) Trigger(fetchOnly bool) error { + // TODO: If CI fails, remove the auto-submit labels => rules.pl + // Only one trigger can run at the same time r.mut.Lock() - if r.currentlyRunning != nil { - return + if r.currentlyRunning { + return fmt.Errorf("Already running, skipping") } - now := time.Now() - r.currentlyRunning = &now + r.currentlyRunning = true r.mut.Unlock() - defer func() { r.mut.Lock() - r.currentlyRunning = nil + r.currentlyRunning = false r.mut.Unlock() }() - result := r.submitQueue.Run(fetchOnly) + // isReady means a series is auto submittbale and rebased on HEAD + isReady := func(s *gerrit.Serie) bool { + return r.isAutoSubmittable(s) && r.gerrit.SerieIsRebasedOnHEAD(s) + } - r.mut.Lock() - // drop tail if size > 10 - if len(r.results) > 10 { - r.results = append([]*Result{result}, r.results[:9]...) - } else { - r.results = append([]*Result{result}, r.results...) + isAwaitingCI := func(s *gerrit.Serie) bool { + for _, c := range s.ChangeSets { + if !(c.Verified == 0 && c.CodeReviewed != 2 && c.HasTag(r.submitQueueTag)) { + return false + } + } + return true } - r.mut.Unlock() + + // Prepare the work by creating a local cache of gerrit state + r.gerrit.Refresh() + + // early return if we only want to fetch + if fetchOnly { + return nil + } + + if r.wipSerie != nil { + // refresh wipSerie with how it looks like in gerrit now + wipSerie := r.gerrit.FindSerie(func(s *gerrit.Serie) bool { + // the new wipSerie needs to have the same number of changesets + if len(r.wipSerie.ChangeSets) != len(s.ChangeSets) { + return false + } + // … and the same ChangeIDs. + for idx, c := range s.ChangeSets { + if r.wipSerie.ChangeSets[idx].ChangeID != c.ChangeID { + return false + } + } + return true + }) + if wipSerie == nil { + r.logger.WithField("wipSerie", r.wipSerie).Warn("wipSerie has disappeared") + r.wipSerie = nil + } else { + r.wipSerie = wipSerie + } + } + + for { + // initialize logger + r.logger.Info("Running") + if r.wipSerie != nil { + // if we have a wipSerie + l := r.logger.WithField("wipSerie", r.wipSerie) + l.Info("Checking wipSerie") + + if !r.gerrit.SerieIsRebasedOnHEAD(r.wipSerie) { + // check for chaos monkeys + l.Warnf("HEAD has moved to {} while still waiting for wipSerie, discarding it", r.gerrit.GetHEAD()) + r.wipSerie = nil + } else if isAwaitingCI(r.wipSerie) { + // the changeset is still awaiting for CI feedback + l.Info("keep waiting for wipSerie") + + // break the loop, take a look at it at the next trigger. + break + } else if isReady(r.wipSerie) { + // if the WIP changeset is ready (auto submittable and rebased on HEAD), submit + for _, changeset := range r.wipSerie.ChangeSets { + _, err := r.gerrit.SubmitChangeset(changeset) + if err != nil { + l.WithField("changeset", changeset).Error("error submitting changeset") + r.wipSerie = nil + return err + } + } + r.wipSerie = nil + } else { + // should never be reached?! + } + } + + r.logger.Info("Looking for series ready to submit") + // Find serie, that: + // * has the auto-submit label + // * has +2 review + // * has +1 CI + // * is rebased on master + serie := r.gerrit.FindSerie(isReady) + if serie != nil { + r.logger.WithField("serie", serie).Info("Found serie to submit without necessary rebase") + r.wipSerie = serie + continue + } + + // Find serie, that: + // * has the auto-submit label + // * has +2 review + // * has +1 CI + // * is NOT rebased on master + serie = r.gerrit.FindSerie(r.isAutoSubmittable) + if serie == nil { + r.logger.Info("nothing to do, going back to sleep.") + break + } + + l := r.logger.WithField("serie", serie) + l.Info("found serie, which needs a rebase") + // TODO: move into Client.RebaseSeries function + head := r.gerrit.GetHEAD() + for _, changeset := range serie.ChangeSets { + changeset, err := r.gerrit.RebaseChangeset(changeset, head) + if err != nil { + l.Error(err.Error()) + return err + } + head = changeset.CommitID + } + // it doesn't matter this serie isn't in its rebased state, + // we'll refetch it on the beginning of the next trigger anyways + r.wipSerie = serie + break + } + + r.logger.Info("Run complete") + return nil } diff --git a/submitqueue/serie.go b/submitqueue/serie.go deleted file mode 100644 index d4cd739cdfdd..000000000000 --- a/submitqueue/serie.go +++ /dev/null @@ -1,116 +0,0 @@ -package submitqueue - -import ( - "fmt" - "strings" - - "github.com/tweag/gerrit-queue/gerrit" - - log "github.com/sirupsen/logrus" -) - -// Serie represents a list of successive changesets with an unbroken parent -> child relation, -// starting from the parent. -type Serie struct { - ChangeSets []*gerrit.Changeset -} - -// GetParentCommitIDs returns the parent commit IDs -func (s *Serie) GetParentCommitIDs() ([]string, error) { - if len(s.ChangeSets) == 0 { - return nil, fmt.Errorf("Can't return parent on a serie with zero ChangeSets") - } - return s.ChangeSets[0].ParentCommitIDs, nil -} - -// GetLeafCommitID returns the commit id of the last commit in ChangeSets -func (s *Serie) GetLeafCommitID() (string, error) { - if len(s.ChangeSets) == 0 { - return "", fmt.Errorf("Can't return leaf on a serie with zero ChangeSets") - } - return s.ChangeSets[len(s.ChangeSets)-1].CommitID, nil -} - -// CheckIntegrity checks that the series contains a properly ordered and connected chain of commits -func (s *Serie) CheckIntegrity() error { - logger := log.WithFields(log.Fields{ - "serie": s, - }) - // an empty serie is invalid - if len(s.ChangeSets) == 0 { - return fmt.Errorf("An empty serie is invalid") - } - - previousCommitID := "" - for i, changeset := range s.ChangeSets { - // we can't really check the parent of the first commit - // so skip verifying that one - logger.WithFields(log.Fields{ - "changeset": changeset.String(), - "previousCommitID": fmt.Sprintf("%.7s", previousCommitID), - }).Debug(" - verifying changeset") - - parentCommitIDs := changeset.ParentCommitIDs - if len(parentCommitIDs) == 0 { - return fmt.Errorf("Changesets without any parent are not supported") - } - // we don't check parents of the first changeset in a series - if i != 0 { - if len(parentCommitIDs) != 1 { - return fmt.Errorf("Merge commits in the middle of a series are not supported (only at the beginning)") - } - if parentCommitIDs[0] != previousCommitID { - return fmt.Errorf("changesets parent commit id doesn't match previous commit id") - } - } - // update previous commit id for the next loop iteration - previousCommitID = changeset.CommitID - } - return nil -} - -// FilterAllChangesets applies a filter function on all of the changesets in the series. -// returns true if it returns true for all changesets, false otherwise -func (s *Serie) FilterAllChangesets(f func(c *gerrit.Changeset) bool) bool { - for _, changeset := range s.ChangeSets { - if f(changeset) == false { - return false - } - } - return true -} - -func (s *Serie) String() string { - var sb strings.Builder - sb.WriteString(fmt.Sprintf("Serie[%d]", len(s.ChangeSets))) - if len(s.ChangeSets) == 0 { - sb.WriteString("()\n") - return sb.String() - } - parentCommitIDs, err := s.GetParentCommitIDs() - if err == nil { - if len(parentCommitIDs) == 1 { - sb.WriteString(fmt.Sprintf("(parent: %.7s)", parentCommitIDs[0])) - } else { - sb.WriteString("(merge: ") - - for i, parentCommitID := range parentCommitIDs { - sb.WriteString(fmt.Sprintf("%.7s", parentCommitID)) - if i < len(parentCommitIDs) { - sb.WriteString(", ") - } - } - - sb.WriteString(")") - - } - } - sb.WriteString(fmt.Sprintf("(%.7s..%.7s)", - s.ChangeSets[0].CommitID, - s.ChangeSets[len(s.ChangeSets)-1].CommitID)) - return sb.String() -} - -func shortCommitID(commitID string) string { - return commitID[:6] -} diff --git a/submitqueue/series.go b/submitqueue/series.go deleted file mode 100644 index c607e3ec2135..000000000000 --- a/submitqueue/series.go +++ /dev/null @@ -1,125 +0,0 @@ -package submitqueue - -import ( - "sort" - - "github.com/tweag/gerrit-queue/gerrit" - - "github.com/sirupsen/logrus" -) - -// AssembleSeries consumes a list of `Changeset`, and groups them together to series -// -// As we have no control over the order of the passed changesets, -// we maintain two lookup tables, -// mapLeafToSerie, which allows to lookup a serie by its leaf commit id, -// to append to an existing serie -// and mapParentToSeries, which allows to lookup all series having a certain parent commit id, -// to prepend to any of the existing series -// if we can't find anything, we create a new series -func AssembleSeries(changesets []*gerrit.Changeset, log *logrus.Logger) ([]*Serie, error) { - series := make([]*Serie, 0) - mapLeafToSerie := make(map[string]*Serie, 0) - - for _, changeset := range changesets { - logger := log.WithFields(logrus.Fields{ - "changeset": changeset.String(), - }) - - logger.Debug("creating initial serie") - serie := &Serie{ - ChangeSets: []*gerrit.Changeset{changeset}, - } - series = append(series, serie) - mapLeafToSerie[changeset.CommitID] = serie - } - - // Combine series using a fixpoint approach, with a max iteration count. - log.Debug("glueing together phase") - for i := 1; i < 100; i++ { - didUpdate := false - log.Debugf("at iteration %d", i) - for _, serie := range series { - logger := log.WithField("serie", serie.String()) - parentCommitIDs, err := serie.GetParentCommitIDs() - if err != nil { - return series, err - } - if len(parentCommitIDs) != 1 { - // We can't append merge commits to other series - logger.Infof("No single parent, skipping.") - continue - } - parentCommitID := parentCommitIDs[0] - logger.Debug("Looking for a predecessor.") - // if there's another serie that has this parent as a leaf, glue together - if otherSerie, ok := mapLeafToSerie[parentCommitID]; ok { - if otherSerie == serie { - continue - } - logger := logger.WithField("otherSerie", otherSerie) - - myLeafCommitID, err := serie.GetLeafCommitID() - if err != nil { - return series, err - } - - // append our changesets to the other serie - logger.Debug("Splicing together.") - otherSerie.ChangeSets = append(otherSerie.ChangeSets, serie.ChangeSets...) - - delete(mapLeafToSerie, parentCommitID) - mapLeafToSerie[myLeafCommitID] = otherSerie - - // orphan our serie - serie.ChangeSets = []*gerrit.Changeset{} - // remove the orphaned serie from the lookup table - delete(mapLeafToSerie, myLeafCommitID) - - didUpdate = true - } else { - logger.Debug("Not found.") - } - } - series = removeOrphanedSeries(series) - if !didUpdate { - log.Infof("converged after %d iterations", i) - break - } - } - - // Check integrity, just to be on the safe side. - for _, serie := range series { - logger := log.WithFields(logrus.Fields{ - "serie": serie.String(), - }) - logger.Debugf("checking integrity") - err := serie.CheckIntegrity() - if err != nil { - logger.Errorf("checking integrity failed: %s", err) - } - } - return series, nil -} - -// removeOrphanedSeries removes all empty series (that contain zero changesets) -func removeOrphanedSeries(series []*Serie) []*Serie { - newSeries := []*Serie{} - for _, serie := range series { - if len(serie.ChangeSets) != 0 { - newSeries = append(newSeries, serie) - } - } - return newSeries -} - -// SortSeries sorts a list of series by the number of changesets in each serie, descending -func SortSeries(series []*Serie) []*Serie { - newSeries := make([]*Serie, len(series)) - copy(newSeries, series) - sort.Slice(newSeries, func(i, j int) bool { - // the weight depends on the amount of changesets series changeset size - return len(series[i].ChangeSets) > len(series[j].ChangeSets) - }) - return newSeries -} diff --git a/submitqueue/submitqueue.go b/submitqueue/submitqueue.go deleted file mode 100644 index eb622db6832d..000000000000 --- a/submitqueue/submitqueue.go +++ /dev/null @@ -1,231 +0,0 @@ -package submitqueue - -import ( - "fmt" - - "github.com/tweag/gerrit-queue/gerrit" - - "github.com/sirupsen/logrus" -) - -// SubmitQueue contains a list of series, a gerrit connection, and some project configuration -type SubmitQueue struct { - Series []*Serie - gerrit gerrit.IClient - ProjectName string - BranchName string - HEAD string - SubmitQueueTag string // the tag used to submit something to the submit queue - URL string -} - -// MakeSubmitQueue builds a new submit queue -func MakeSubmitQueue(gerritClient gerrit.IClient, projectName string, branchName string, submitQueueTag string) *SubmitQueue { - return &SubmitQueue{ - Series: make([]*Serie, 0), - gerrit: gerritClient, - ProjectName: projectName, - BranchName: branchName, - SubmitQueueTag: submitQueueTag, - } -} - -// LoadSeries fills .Series by searching changesets, and assembling them to Series. -func (s *SubmitQueue) LoadSeries(log *logrus.Logger) error { - var queryString = fmt.Sprintf("status:open project:%s branch:%s", s.ProjectName, s.BranchName) - log.Debugf("Running query %s", queryString) - - // Download changesets from gerrit - changesets, err := s.gerrit.SearchChangesets(queryString) - if err != nil { - return err - } - - // Assemble to series - series, err := AssembleSeries(changesets, log) - if err != nil { - return err - } - - // Sort by size - s.Series = SortSeries(series) - return nil -} - -// TODO: clear submit queue tag if missing +1/+2? - -// IsAutoSubmittable returns true if a given Serie has all the necessary flags set -// meaning it would be fine to rebase and/or submit it. -// This means, every changeset needs to: -// - have the s.SubmitQueueTag hashtag -// - be verified (+1 by CI) -// - be code reviewed (+2 by a human) -func (s *SubmitQueue) IsAutoSubmittable(serie *Serie) bool { - return serie.FilterAllChangesets(func(c *gerrit.Changeset) bool { - return c.HasTag(s.SubmitQueueTag) && c.IsVerified && c.IsCodeReviewed - }) -} - -// GetChangesetURL returns the URL to view a given changeset -func (s *SubmitQueue) GetChangesetURL(changeset *gerrit.Changeset) string { - return fmt.Sprintf("%s/c/%s/+/%d", s.gerrit.GetBaseURL(), s.ProjectName, changeset.Number) -} - -// DoSubmit submits changes that can be submitted, -// and updates `Series` to contain the remaining ones -// Also updates `HEAD`. -func (s *SubmitQueue) DoSubmit(log *logrus.Logger) error { - var remainingSeries []*Serie - - // TODO: actually log more! - - for _, serie := range s.Series { - serieParentCommitIDs, err := serie.GetParentCommitIDs() - if err != nil { - return err - } - // we can only submit series with a single parent commit (otherwise they're not rebased) - if len(serieParentCommitIDs) != 1 { - return fmt.Errorf("%s has more than one parent commit, skipping", serie.String()) - } - - // if serie is auto-submittable and rebased on top of current master… - if s.IsAutoSubmittable(serie) && serieParentCommitIDs[0] == s.HEAD { - // submit the last changeset of the series, which submits intermediate ones too - _, err := s.gerrit.SubmitChangeset(serie.ChangeSets[len(serie.ChangeSets)-1]) - if err != nil { - // this might fail, for various reasons: - // - developers could have updated the changeset meanwhile, clearing +1/+2 bits - // - master might have advanced, so this changeset isn't rebased on top of master - // TODO: we currently bail out entirely, but should be fine on the - // next loop. We might later want to improve the logic to be a bit more - // smarter (like log and try with the next one) - return err - } - // advance head to the leaf of the current serie for the next iteration - newHead, err := serie.GetLeafCommitID() - if err != nil { - return err - } - s.HEAD = newHead - } else { - remainingSeries = append(remainingSeries, serie) - } - } - - s.Series = remainingSeries - return nil -} - -// DoRebase rebases the next auto-submittable series on top of current HEAD -// they are still ordered by series size -// After a DoRebase, consumers are supposed to fetch state again via LoadSeries, -// as things most likely have changed, and error handling during partially failed rebases -// is really tricky -func (s *SubmitQueue) DoRebase(log *logrus.Logger) error { - if s.HEAD == "" { - return fmt.Errorf("current HEAD is an empty string, bailing out") - } - for _, serie := range s.Series { - logger := log.WithFields(logrus.Fields{ - "serie": serie, - }) - if !s.IsAutoSubmittable(serie) { - logger.Debug("skipping non-auto-submittable series") - continue - } - - logger.Infof("rebasing on top of %s", s.HEAD) - _, err := s.RebaseSerie(serie, s.HEAD) - if err != nil { - // We skip trivial rebase errors instead of bailing out. - // TODO: we might want to remove s.SubmitQueueTag from the changeset, - // but even without doing it, - // we're merly spanning, and won't get stuck in trying to rebase the same - // changeset over and over again, as some other changeset will likely succeed - // with rebasing and will be merged by DoSubmit. - logger.Warnf("failure while rebasing, continuing with next one: %s", err) - continue - } else { - logger.Info("success rebasing on top of %s", s.HEAD) - break - } - } - - return nil -} - -// Run starts the submit and rebase logic. -func (s *SubmitQueue) Run(fetchOnly bool) *Result { - r := MakeResult() - //TODO: log decisions made and add to some ring buffer - var err error - - log := logrus.New() - log.AddHook(r) - - commitID, err := s.gerrit.GetHEAD(s.ProjectName, s.BranchName) - if err != nil { - log.Errorf("Unable to retrieve HEAD of branch %s at project %s: %s", s.BranchName, s.ProjectName, err) - r.Error = err - return r - } - s.HEAD = commitID - r.HEAD = commitID - - err = s.LoadSeries(log) - if err != nil { - r.Error = err - return r - } - - // copy series to result object - for _, serie := range s.Series { - r.Series = append(r.Series, *serie) - } - - if len(s.Series) == 0 { - // Nothing to do! - log.Warn("Nothing to do here") - return r - } - if fetchOnly { - return r - } - err = s.DoSubmit(log) - if err != nil { - r.Error = err - return r - } - err = s.DoRebase(log) - if err != nil { - r.Error = err - return r - } - return r -} - -// RebaseSerie rebases a whole serie on top of a given ref -// TODO: only rebase a single changeset. we don't really want to join disconnected series, by rebasing them on top of each other. -func (s *SubmitQueue) RebaseSerie(serie *Serie, ref string) (*Serie, error) { - newSeries := &Serie{ - ChangeSets: make([]*gerrit.Changeset, len(serie.ChangeSets)), - } - - rebaseOnto := ref - for _, changeset := range serie.ChangeSets { - newChangeset, err := s.gerrit.RebaseChangeset(changeset, rebaseOnto) - - if err != nil { - // uh-oh… - // TODO: think about error handling - // TODO: remove the submit queue tag if the rebase fails (but only then, not on other errors) - return newSeries, err - } - newSeries.ChangeSets = append(newSeries.ChangeSets, newChangeset) - - // the next changeset should be rebased on top of the current commit - rebaseOnto = newChangeset.CommitID - } - return newSeries, nil -} |