about summary refs log tree commit diff
path: root/submitqueue/runner.go
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2019-12-02T09·00+0100
committerFlorian Klink <flokli@flokli.de>2019-12-02T09·00+0100
commit04a24a0c601c28d01e1110fb82f57c7a7c3f5d74 (patch)
treeabe8872d565a82db38167b07b0a4526883031983 /submitqueue/runner.go
parent7bafef7a848cc80c79551441630210e947b2481b (diff)
Use Runner
This revamps code quite a bit. Series handling has been moved into the
gerrit client, it also handles caching.

The Runner logic itself has been greatly simplified.

The runner logic has been moved into the runner.go, submitqueue.go is
gone.

The "per-run result object" concept has been dropped - we instead just
use annotated logs.

Also, we switched to apex/log
Diffstat (limited to 'submitqueue/runner.go')
-rw-r--r--submitqueue/runner.go207
1 files changed, 175 insertions, 32 deletions
diff --git a/submitqueue/runner.go b/submitqueue/runner.go
index 4fd9134a02..d85467b63d 100644
--- a/submitqueue/runner.go
+++ b/submitqueue/runner.go
@@ -1,60 +1,203 @@
 package submitqueue
 
 import (
+	"fmt"
 	"sync"
-	"time"
+
+	"github.com/apex/log"
+
+	"github.com/tweag/gerrit-queue/gerrit"
 )
 
-// Runner supervises the submit queue and records historical data about it
+// Runner is a struct existing across the lifetime of a single run of the submit queue
+// it contains a mutex to avoid being run multiple times.
+// In fact, it even cancels runs while another one is still in progress.
+// It contains a Gerrit object facilitating access, a log object, the configured submit queue tag
+// and a `wipSerie` (only populated if waiting for a rebase)
 type Runner struct {
 	mut              sync.Mutex
-	submitQueue      *SubmitQueue
-	currentlyRunning *time.Time
-	results          []*Result
+	currentlyRunning bool
+	wipSerie         *gerrit.Serie
+	logger           *log.Logger
+	gerrit           *gerrit.Client
+	submitQueueTag   string // the tag used to submit something to the submit queue
 }
 
-// NewRunner initializes a new runner object
-func NewRunner(sq *SubmitQueue) *Runner {
+// NewRunner creates a new Runner struct
+func NewRunner(logger *log.Logger, gerrit *gerrit.Client, submitQueueTag string) *Runner {
 	return &Runner{
-		submitQueue: sq,
-		results:     []*Result{},
+		logger:         logger,
+		gerrit:         gerrit,
+		submitQueueTag: submitQueueTag,
 	}
 }
 
-// GetState returns a copy of all the state for the frontend
-func (r *Runner) GetState() (SubmitQueue, *time.Time, []*Result) {
+// isAutoSubmittable determines if something could be autosubmitted, potentially requiring a rebase
+// for this, it needs to:
+//  * have the auto-submit label
+//  * has +2 review
+//  * has +1 CI
+func (r *Runner) isAutoSubmittable(s *gerrit.Serie) bool {
+	for _, c := range s.ChangeSets {
+		if c.Verified != 1 || c.CodeReviewed != 2 || !c.HasTag(r.submitQueueTag) {
+			return false
+		}
+	}
+	return true
+}
+
+// IsCurrentlyRunning returns true if the runner is currently running
+func (r *Runner) IsCurrentlyRunning() bool {
+	return r.currentlyRunning
+}
+
+// GetWIPSerie returns the current wipSerie, if any, nil otherwiese
+// Acquires a lock, so check with IsCurrentlyRunning first
+func (r *Runner) GetWIPSerie() *gerrit.Serie {
 	r.mut.Lock()
-	defer r.mut.Unlock()
-	return *r.submitQueue, r.currentlyRunning, r.results
+	defer func() {
+		r.mut.Unlock()
+	}()
+	return r.wipSerie
 }
 
-// Trigger starts a new batch job
-// TODO: make sure only one batch job is started at the same time
-// if a batch job is already started, ignore the newest request
-// TODO: be more granular in dry-run mode
-func (r *Runner) Trigger(fetchOnly bool) {
+// Trigger gets triggered periodically
+func (r *Runner) Trigger(fetchOnly bool) error {
+	// TODO: If CI fails, remove the auto-submit labels => rules.pl
+	// Only one trigger can run at the same time
 	r.mut.Lock()
-	if r.currentlyRunning != nil {
-		return
+	if r.currentlyRunning {
+		return fmt.Errorf("Already running, skipping")
 	}
-	now := time.Now()
-	r.currentlyRunning = &now
+	r.currentlyRunning = true
 	r.mut.Unlock()
-
 	defer func() {
 		r.mut.Lock()
-		r.currentlyRunning = nil
+		r.currentlyRunning = false
 		r.mut.Unlock()
 	}()
 
-	result := r.submitQueue.Run(fetchOnly)
+	// isReady means a series is auto submittbale and rebased on HEAD
+	isReady := func(s *gerrit.Serie) bool {
+		return r.isAutoSubmittable(s) && r.gerrit.SerieIsRebasedOnHEAD(s)
+	}
 
-	r.mut.Lock()
-	// drop tail if size > 10
-	if len(r.results) > 10 {
-		r.results = append([]*Result{result}, r.results[:9]...)
-	} else {
-		r.results = append([]*Result{result}, r.results...)
+	isAwaitingCI := func(s *gerrit.Serie) bool {
+		for _, c := range s.ChangeSets {
+			if !(c.Verified == 0 && c.CodeReviewed != 2 && c.HasTag(r.submitQueueTag)) {
+				return false
+			}
+		}
+		return true
 	}
-	r.mut.Unlock()
+
+	// Prepare the work by creating a local cache of gerrit state
+	r.gerrit.Refresh()
+
+	// early return if we only want to fetch
+	if fetchOnly {
+		return nil
+	}
+
+	if r.wipSerie != nil {
+		// refresh wipSerie with how it looks like in gerrit now
+		wipSerie := r.gerrit.FindSerie(func(s *gerrit.Serie) bool {
+			// the new wipSerie needs to have the same number of changesets
+			if len(r.wipSerie.ChangeSets) != len(s.ChangeSets) {
+				return false
+			}
+			// … and the same ChangeIDs.
+			for idx, c := range s.ChangeSets {
+				if r.wipSerie.ChangeSets[idx].ChangeID != c.ChangeID {
+					return false
+				}
+			}
+			return true
+		})
+		if wipSerie == nil {
+			r.logger.WithField("wipSerie", r.wipSerie).Warn("wipSerie has disappeared")
+			r.wipSerie = nil
+		} else {
+			r.wipSerie = wipSerie
+		}
+	}
+
+	for {
+		// initialize logger
+		r.logger.Info("Running")
+		if r.wipSerie != nil {
+			// if we have a wipSerie
+			l := r.logger.WithField("wipSerie", r.wipSerie)
+			l.Info("Checking wipSerie")
+
+			if !r.gerrit.SerieIsRebasedOnHEAD(r.wipSerie) {
+				// check for chaos monkeys
+				l.Warnf("HEAD has moved to {} while still waiting for wipSerie, discarding it", r.gerrit.GetHEAD())
+				r.wipSerie = nil
+			} else if isAwaitingCI(r.wipSerie) {
+				// the changeset is still awaiting for CI feedback
+				l.Info("keep waiting for wipSerie")
+
+				// break the loop, take a look at it at the next trigger.
+				break
+			} else if isReady(r.wipSerie) {
+				// if the WIP changeset is ready (auto submittable and rebased on HEAD), submit
+				for _, changeset := range r.wipSerie.ChangeSets {
+					_, err := r.gerrit.SubmitChangeset(changeset)
+					if err != nil {
+						l.WithField("changeset", changeset).Error("error submitting changeset")
+						r.wipSerie = nil
+						return err
+					}
+				}
+				r.wipSerie = nil
+			} else {
+				// should never be reached?!
+			}
+		}
+
+		r.logger.Info("Looking for series ready to submit")
+		// Find serie, that:
+		//  * has the auto-submit label
+		//  * has +2 review
+		//  * has +1 CI
+		//  * is rebased on master
+		serie := r.gerrit.FindSerie(isReady)
+		if serie != nil {
+			r.logger.WithField("serie", serie).Info("Found serie to submit without necessary rebase")
+			r.wipSerie = serie
+			continue
+		}
+
+		// Find serie, that:
+		//  * has the auto-submit label
+		//  * has +2 review
+		//  * has +1 CI
+		//  * is NOT rebased on master
+		serie = r.gerrit.FindSerie(r.isAutoSubmittable)
+		if serie == nil {
+			r.logger.Info("nothing to do, going back to sleep.")
+			break
+		}
+
+		l := r.logger.WithField("serie", serie)
+		l.Info("found serie, which needs a rebase")
+		// TODO: move into Client.RebaseSeries function
+		head := r.gerrit.GetHEAD()
+		for _, changeset := range serie.ChangeSets {
+			changeset, err := r.gerrit.RebaseChangeset(changeset, head)
+			if err != nil {
+				l.Error(err.Error())
+				return err
+			}
+			head = changeset.CommitID
+		}
+		// it doesn't matter this serie isn't in its rebased state,
+		// we'll refetch it on the beginning of the next trigger anyways
+		r.wipSerie = serie
+		break
+	}
+
+	r.logger.Info("Run complete")
+	return nil
 }