diff options
Diffstat (limited to 'fun/clbot/gerrit')
-rw-r--r-- | fun/clbot/gerrit/default.nix | 18 | ||||
-rw-r--r-- | fun/clbot/gerrit/gerritevents/default.nix | 10 | ||||
-rw-r--r-- | fun/clbot/gerrit/gerritevents/events.go | 321 | ||||
-rw-r--r-- | fun/clbot/gerrit/gerritevents/time.go | 38 | ||||
-rw-r--r-- | fun/clbot/gerrit/gerritevents/types.go | 221 | ||||
-rw-r--r-- | fun/clbot/gerrit/watcher.go | 252 | ||||
-rw-r--r-- | fun/clbot/gerrit/watcher_test.go | 190 |
7 files changed, 1050 insertions, 0 deletions
diff --git a/fun/clbot/gerrit/default.nix b/fun/clbot/gerrit/default.nix new file mode 100644 index 000000000000..3b6ce0a7392f --- /dev/null +++ b/fun/clbot/gerrit/default.nix @@ -0,0 +1,18 @@ +{ depot, ... }: + +let + inherit (depot.fun) clbot; + inherit (depot.third_party) gopkgs; +in +depot.nix.buildGo.package { + name = "code.tvl.fyi/fun/clbot/gerrit"; + srcs = [ + ./watcher.go + ]; + deps = [ + clbot.gerrit.gerritevents + clbot.backoffutil + gopkgs."github.com".golang.glog.gopkg + gopkgs."golang.org".x.crypto.ssh.gopkg + ]; +} diff --git a/fun/clbot/gerrit/gerritevents/default.nix b/fun/clbot/gerrit/gerritevents/default.nix new file mode 100644 index 000000000000..024451858bbc --- /dev/null +++ b/fun/clbot/gerrit/gerritevents/default.nix @@ -0,0 +1,10 @@ +{ depot, ... }: + +depot.nix.buildGo.package { + name = "code.tvl.fyi/fun/clbot/gerrit/gerritevents"; + srcs = [ + ./time.go + ./types.go + ./events.go + ]; +} diff --git a/fun/clbot/gerrit/gerritevents/events.go b/fun/clbot/gerrit/gerritevents/events.go new file mode 100644 index 000000000000..c02b30f76ea8 --- /dev/null +++ b/fun/clbot/gerrit/gerritevents/events.go @@ -0,0 +1,321 @@ +package gerritevents + +import ( + "encoding/json" + "fmt" +) + +var events = map[string]func() Event{} + +func registerEvent(e func() Event) { + t := e().EventType() + if _, ok := events[t]; ok { + panic(fmt.Sprintf("%s already registered", t)) + } + events[t] = e +} + +// These events are taken from https://cl.tvl.fyi/Documentation/cmd-stream-events.html. + +// Event is implemented by Gerrit event structs. +type Event interface { + EventType() string +} + +type simpleEvent struct { + Type string `json:"type"` +} + +// Parse parses a Gerrit event from JSON. +func Parse(bs []byte) (Event, error) { + var s simpleEvent + if err := json.Unmarshal(bs, &s); err != nil { + return nil, fmt.Errorf("unmarshalling %q as Gerrit Event: %v", string(bs), err) + } + ef, ok := events[s.Type] + if !ok { + return nil, fmt.Errorf("unknown event type %q", s.Type) + } + e := ef() + if err := json.Unmarshal(bs, e); err != nil { + return nil, fmt.Errorf("unmarshalling %q as Gerrit Event %q: %v", string(bs), e.EventType(), err) + } + return e, nil +} + +// AssigneeChanged indicates that a change's assignee has been changed. +type AssigneeChanged struct { + Type string `json:"type"` + Change Change `json:"change"` + Changer Account `json:"changer"` + OldAssignee Account `json:"oldAssignee"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (AssigneeChanged) EventType() string { return "assignee-changed" } + +func init() { + registerEvent(func() Event { return &AssigneeChanged{} }) +} + +// ChangeAbandoned indicates that a change has been abandoned. +type ChangeAbandoned struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Abandoner Account `json:"abandoner"` + Reason string `json:"reason"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (ChangeAbandoned) EventType() string { return "change-abandoned" } + +func init() { + registerEvent(func() Event { return &ChangeAbandoned{} }) +} + +// ChangeDeleted indicates that a change has been deleted. +type ChangeDeleted struct { + Type string `json:"type"` + Change Change `json:"change"` + Deleter Account `json:"deleter"` +} + +// EventType implements Event. +func (ChangeDeleted) EventType() string { return "change-deleted" } + +func init() { + registerEvent(func() Event { return &ChangeDeleted{} }) +} + +// ChangeMerged indicates that a change has been merged into the target branch. +type ChangeMerged struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Submitter Account `json:"submitter"` + NewRev string `json:"newRev"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (ChangeMerged) EventType() string { return "change-merged" } + +func init() { + registerEvent(func() Event { return &ChangeMerged{} }) +} + +// ChangeRestored indicates a change has been restored (i.e. un-abandoned). +type ChangeRestored struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Restorer Account `json:"restorer"` + Reason string `json:"reason"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (ChangeRestored) EventType() string { return "change-restored" } + +func init() { + registerEvent(func() Event { return &ChangeRestored{} }) +} + +// CommentAdded indicates someone has commented on a patchset. +type CommentAdded struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Author Account `json:"author"` + Approvals []Approval `json:"approvals"` + Comment string `json:"comment"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (CommentAdded) EventType() string { return "comment-added" } + +func init() { + registerEvent(func() Event { return &CommentAdded{} }) +} + +// DroppedOutput indicates that some events may be missing from the stream. +type DroppedOutput struct { + Type string `json:"type"` +} + +// EventType implements Event. +func (DroppedOutput) EventType() string { return "dropped-output" } + +func init() { + registerEvent(func() Event { return &DroppedOutput{} }) +} + +// HashtagsChanged indicates that someone has added or removed hashtags from a change. +type HashtagsChanged struct { + Type string `json:"type"` + Change Change `json:"change"` + Editor Account `json:"editor"` + Added []string `json:"added"` + Removed []string `json:"removed"` + Hashtags []string `json:"hashtags"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (HashtagsChanged) EventType() string { return "hashtags-changed" } + +func init() { + registerEvent(func() Event { return &HashtagsChanged{} }) +} + +// ProjectCreated indicates that a new project has been created. +type ProjectCreated struct { + Type string `json:"type"` + ProjectName string `json:"projectName"` + ProjectHead string `json:"projectHead"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (ProjectCreated) EventType() string { return "project-created" } + +func init() { + registerEvent(func() Event { return &ProjectCreated{} }) +} + +// PatchSetCreated indicates that a new patchset has been added to a change. +type PatchSetCreated struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Uploader Account `json:"uploader"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (PatchSetCreated) EventType() string { return "patchset-created" } + +func init() { + registerEvent(func() Event { return &PatchSetCreated{} }) +} + +// RefUpdated indicates that a ref has been updated. +type RefUpdated struct { + Type string `json:"type"` + Submitter Account `json:"submitter"` + RefUpdate RefUpdate `json:"refUpdate"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (RefUpdated) EventType() string { return "ref-updated" } + +func init() { + registerEvent(func() Event { return &RefUpdated{} }) +} + +// ReviewerAdded indicates that a reviewer has been added to a change. +type ReviewerAdded struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Reviewer Account `json:"reviewer"` + Adder Account `json:"adder"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (ReviewerAdded) EventType() string { return "reviewer-added" } + +func init() { + registerEvent(func() Event { return &ReviewerAdded{} }) +} + +// ReviewerDeleted indicates that a reviewer has been removed from a change, possibly removing one or more approvals. +type ReviewerDeleted struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Reviewer Account `json:"reviewer"` + Remover Account `json:"remover"` + Approvals []Approval `json:"approvals"` + Comment string `json:"comment"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (ReviewerDeleted) EventType() string { return "reviewer-deleted" } + +func init() { + registerEvent(func() Event { return &ReviewerDeleted{} }) +} + +// TopicChanged indicates that the topic attached to a change has been changed. +type TopicChanged struct { + Type string `json:"type"` + Change Change `json:"change"` + Changer Account `json:"changer"` + OldTopic string `json:"oldTopic"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (TopicChanged) EventType() string { return "topic-changed" } + +func init() { + registerEvent(func() Event { return &TopicChanged{} }) +} + +// WIPStateChanged indicates that the work-in-progress state of a change has changed. +type WIPStateChanged struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Changer Account `json:"changer"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (WIPStateChanged) EventType() string { return "wip-state-changed" } + +func init() { + registerEvent(func() Event { return &WIPStateChanged{} }) +} + +// PrivateStateChanged indicates that the private state of a change has changed. +type PrivateStateChanged struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Changer Account `json:"changer"` + EventCreatedOn Time `json:"eventCreatedOn"` +} + +// EventType implements Event. +func (PrivateStateChanged) EventType() string { return "private-state-changed" } + +func init() { + registerEvent(func() Event { return &PrivateStateChanged{} }) +} + +// VoteDeleted indicates that an approval vote has been deleted from a change. +type VoteDeleted struct { + Type string `json:"type"` + Change Change `json:"change"` + PatchSet PatchSet `json:"patchSet"` + Reviewer Account `json:"reviewer"` + Remover Account `json:"remover"` + Approvals []Approval `json:"approvals"` + Comment string `json:"comment"` +} + +// EventType implements Event. +func (VoteDeleted) EventType() string { return "vote-deleted" } + +func init() { + registerEvent(func() Event { return &VoteDeleted{} }) +} diff --git a/fun/clbot/gerrit/gerritevents/time.go b/fun/clbot/gerrit/gerritevents/time.go new file mode 100644 index 000000000000..7fbfaa3f5c47 --- /dev/null +++ b/fun/clbot/gerrit/gerritevents/time.go @@ -0,0 +1,38 @@ +package gerritevents + +import ( + "fmt" + "strconv" + "time" +) + +// Time is a time.Time that is formatted as a Unix timestamp in JSON. +type Time struct { + time.Time +} + +// UnmarshalJSON unmarshals a Unix timestamp into a Time. +func (t *Time) UnmarshalJSON(bs []byte) error { + if string(bs) == "null" { + return nil + } + u, err := strconv.ParseInt(string(bs), 10, 64) + if err != nil { + return err + } + t.Time = time.Unix(u, 0) + return nil +} + +// MarshalJSON marshals a Time into a Unix timestamp. +func (t *Time) MarshalJSON() ([]byte, error) { + if t.IsZero() { + return []byte("null"), nil + } + return []byte(fmt.Sprintf("%d", t.Unix())), nil +} + +// IsSet returns true if the time.Time is non-zero. +func (t *Time) IsSet() bool { + return !t.IsZero() +} diff --git a/fun/clbot/gerrit/gerritevents/types.go b/fun/clbot/gerrit/gerritevents/types.go new file mode 100644 index 000000000000..75987a2da601 --- /dev/null +++ b/fun/clbot/gerrit/gerritevents/types.go @@ -0,0 +1,221 @@ +package gerritevents + +// These types are taken from https://cl.tvl.fyi/Documentation/json.html. + +// Account is a Gerrit account (or just a Git name+email pair). +type Account struct { + Name string `json:"name"` + Email string `json:"email"` + Username string `json:"username"` +} + +// ChangeStatus represents the states a change can be in. +type ChangeStatus string + +const ( + // ChangeStatusNew is the state a change is in during review. + ChangeStatusNew ChangeStatus = "NEW" + + // ChangeStatusMerged indicates a change was merged to the target branch. + ChangeStatusMerged ChangeStatus = "MERGED" + + // ChangeStatusAbandoned indicates a change was marked as abandoned. + ChangeStatusAbandoned ChangeStatus = "ABANDONED" +) + +// Message is a message left by a reviewer. +type Message struct { + Timestamp Time `json:"timestamp"` + Reviewer Account `json:"reviewer"` + Message string `json:"message"` +} + +// TrackingID allows storing identifiers from external systems, i.e. bug trackers. +type TrackingID struct { + System string `json:"system"` + ID string `json:"id"` +} + +// ChangeKind indicates the different changes that can be made to a change. +type ChangeKind string + +const ( + // ChangeKindRework indicates a non-trivial content change. + ChangeKindRework ChangeKind = "REWORK" + + // ChangeKindTrivialRebase indicates a conflict-free merge between the new parent and the prior patch set. + ChangeKindTrivialRebase ChangeKind = "TRIVIAL_REBASE" + + // ChangeKindMergeFirstParentUpdate indicates a conflict-free change of the first parent of a merge commit. + ChangeKindMergeFirstParentUpdate ChangeKind = "MERGE_FIRST_PARENT_UPDATE" + + // ChangeKindNoCodeChange indicates no code change (the tree and parent trees are unchanged) - commit message probably changed. + ChangeKindNoCodeChange ChangeKind = "NO_CODE_CHANGE" + + // ChangeKindNoChange indicates nothing changes: the commit message, tree, and parent tree are unchanged. + ChangeKindNoChange ChangeKind = "NO_CHANGE" +) + +// Approval represents the current and past state of an approval label. +type Approval struct { + Type string `json:"type"` + Description string `json:"description"` + Value string `json:"value"` + OldValue *string `json:"oldValue"` + GrantedOn *Time `json:"grantedOn"` + By *Account `json:"by"` +} + +// PatchSetComment is a single comment left on a patchset. +type PatchSetComment struct { + File string `json:"file"` + Line int `json:"line"` + Reviewer Account `json:"reviewer"` + Message string `json:"message"` +} + +// FilePatchType represents the different modifications that can be made to a file by a patchset. +type FilePatchType string + +const ( + // FilePatchTypeAdded indicates the file did not exist, and this patchset adds it to the tree. + FilePatchTypeAdded FilePatchType = "ADDED" + + // FilePatchTypeModified indicates the file exists before and after this patchset. + FilePatchTypeModified FilePatchType = "MODIFIED" + + // FilePatchTypeDeleted indicates the file is removed by this patchset. + FilePatchTypeDeleted FilePatchType = "DELETED" + + // FilePatchTypeRenamed indicates the file has a different name before this patchset than after. + FilePatchTypeRenamed FilePatchType = "RENAMED" + + // FilePatchTypeCopied indicates the file was copied from a different file. + FilePatchTypeCopied FilePatchType = "COPIED" + + // FilePatchTypeRewrite indicates the file had a significant quantity of content changed. + FilePatchTypeRewrite FilePatchType = "REWRITE" +) + +// File represents a file in a patchset as well as how it is being modified. +type File struct { + File string `json:"file"` + FileOld string `json:"fileOld"` + Type FilePatchType `json:"type"` +} + +// PatchSet represents a single patchset within a change. +type PatchSet struct { + Number int `json:"number"` + Revision string `json:"revision"` + Parents []string `json:"parents"` + Ref string `json:"ref"` + Uploader Account `json:"uploader"` + Author Account `json:"author"` + CreatedOn Time `json:"createdOn"` + Kind ChangeKind `json:"kind"` + Approvals []Approval `json:"approvals"` + Comments []PatchSetComment `json:"comments"` + Files []File `json:"file"` + SizeInsertions int `json:"sizeInsertions"` + SizeDeletions int `json:"sizeDeletions"` +} + +// Dependency represents a change on which this change is dependent. +type Dependency struct { + ID string `json:"id"` + Number int `json:"number"` + Revision string `json:"revision"` + Ref string `json:"ref"` + IsCurrentPatchSet bool `json:"isCurrentPatchSet"` +} + +// SubmitStatus indicates whether this change has met the submit conditions and is ready to submit. +type SubmitStatus string + +const ( + // SubmitStatusOK indicates this change is ready to submit - all submit requirements are met. + SubmitStatusOK SubmitStatus = "OK" + + // SubmitStatusNotReady indicates this change cannot yet be submitted. + SubmitStatusNotReady SubmitStatus = "NOT_READY" + + // SubmitStatusRuleError indicates the submit rules could not be evaluted. Administrator intervention is required. + SubmitStatusRuleError SubmitStatus = "RULE_ERROR" +) + +// LabelStatus indicates whether this label permits submission and if the label can be granted by anyone. +type LabelStatus string + +const ( + // LabelStatusOK indicates that this label provides what is necessary for submission (e.g. CR+2). + LabelStatusOK LabelStatus = "OK" + + // LabelStatusReject indicates this label prevents submission (e.g. CR-2). + LabelStatusReject LabelStatus = "REJECT" + + // LabelStatusNeed indicates this label is required for submission, but has not been satisfied (e.g. CR0). + LabelStatusNeed LabelStatus = "NEED" + + // LabelStatusMay indicates this label is not required for submission. It may or may not be set. + LabelStatusMay LabelStatus = "MAY" + + // LabelStatusImpossible indicates this label is required for submission, but cannot be satisfied. The ACLs on this label may be set incorrectly. + LabelStatusImpossible LabelStatus = "IMPOSSIBLE" +) + +// Label represents the status of a particular label. +type Label struct { + Label string `json:"label"` + Status LabelStatus `json:"status"` + By Account `json:"by"` +} + +// Requirement represents a submit requirement. +type Requirement struct { + FallbackText string `json:"fallbackText"` + Type string `json:"type"` + // TODO(lukegb): data +} + +// SubmitRecord represents the current submission state of a change. +type SubmitRecord struct { + Status SubmitStatus `json:"status"` + Labels []Label `json:"labels"` + Requirements []Requirement `json:"requirements"` +} + +// Change represents a Gerrit CL. +type Change struct { + Project string `json:"project"` + Branch string `json:"branch"` + Topic string `json:"topic"` + ID string `json:"id"` + Number int `json:"number"` + Subject string `json:"subject"` + Owner Account `json:"owner"` + URL string `json:"url"` + CommitMessage string `json:"commitMessage"` + CreatedOn Time `json:"createdOn"` + LastUpdated *Time `json:"lastUpdated"` + Open bool `json:"open"` + Status ChangeStatus `json:"status"` + Private bool `json:"private"` + WIP bool `json:"wip"` + Comments []Message `json:"comments"` + TrackingIDs []TrackingID `json:"trackingIds"` + CurrentPatchSet *PatchSet `json:"currentPatchSet"` + PatchSets []PatchSet `json:"patchSets"` + DependsOn []Dependency `json:"dependsOn"` + NeededBy []Dependency `json:"neededBy"` + SubmitRecords []SubmitRecord `json:"submitRecord"` + AllReviewers []Account `json:"allReviewers"` +} + +// RefUpdate represents a change in a ref. +type RefUpdate struct { + OldRev string `json:"oldRev"` + NewRev string `json:"newRev"` + RefName string `json:"refName"` + Project string `json:"project"` +} diff --git a/fun/clbot/gerrit/watcher.go b/fun/clbot/gerrit/watcher.go new file mode 100644 index 000000000000..d45876129b3a --- /dev/null +++ b/fun/clbot/gerrit/watcher.go @@ -0,0 +1,252 @@ +// Package gerrit implements a watcher for Gerrit events. +package gerrit + +import ( + "context" + "errors" + "fmt" + "net" + "strings" + "time" + + "code.tvl.fyi/fun/clbot/backoffutil" + "code.tvl.fyi/fun/clbot/gerrit/gerritevents" + log "github.com/golang/glog" + "golang.org/x/crypto/ssh" +) + +// closer provides an embeddable implementation of Close which awaits a main loop acknowledging it has stopped. +type closer struct { + stop chan struct{} + stopped chan struct{} +} + +// newCloser returns a closer with the channels initialised. +func newCloser() closer { + return closer{ + stop: make(chan struct{}), + stopped: make(chan struct{}), + } +} + +// Close stops the main loop, waiting for the main loop to stop until it stops or the context is cancelled, whichever happens first. +func (c *closer) Close(ctx context.Context) error { + select { + case <-c.stopped: + return nil + case <-c.stop: + return nil + case <-ctx.Done(): + return ctx.Err() + default: + } + close(c.stop) + select { + case <-c.stopped: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// lineWriter is an io.Writer which splits on \n and outputs each line (with no trailing newline) to its output channel. +type lineWriter struct { + buf string + out chan string +} + +// Write accepts a slice of bytes containing zero or more new lines. +// If the contained channel is non-buffering or is full, this will block. +func (w *lineWriter) Write(p []byte) (n int, err error) { + w.buf += string(p) + pieces := strings.Split(w.buf, "\n") + w.buf = pieces[len(pieces)-1] + for n := 0; n < len(pieces)-1; n++ { + w.out <- pieces[n] + } + return len(p), nil +} + +// restartingClient is a simple SSH client that repeatedly connects to an SSH server, runs a command, and outputs the lines output by it on stdout onto a channel. +type restartingClient struct { + closer + + network string + addr string + cfg *ssh.ClientConfig + + exec string + output chan string + shutdown func() +} + +var ( + errStopConnect = errors.New("gerrit: told to stop reconnecting by remote server") +) + +func (c *restartingClient) runOnce() error { + netConn, err := net.Dial(c.network, c.addr) + if err != nil { + return fmt.Errorf("connecting to %v/%v: %w", c.network, c.addr, err) + } + defer netConn.Close() + + sshConn, newCh, newReq, err := ssh.NewClientConn(netConn, c.addr, c.cfg) + if err != nil { + return fmt.Errorf("creating SSH connection to %v/%v: %w", c.network, c.addr, err) + } + defer sshConn.Close() + + goAway := false + passedThroughReqs := make(chan *ssh.Request) + go func() { + defer close(passedThroughReqs) + for req := range newReq { + if req.Type == "goaway" { + goAway = true + log.Warningf("remote end %v/%v told me to go away!", c.network, c.addr) + sshConn.Close() + netConn.Close() + } + passedThroughReqs <- req + } + }() + + cl := ssh.NewClient(sshConn, newCh, passedThroughReqs) + + sess, err := cl.NewSession() + if err != nil { + return fmt.Errorf("NewSession on %v/%v: %w", c.network, c.addr, err) + } + defer sess.Close() + + sess.Stdout = &lineWriter{out: c.output} + + if err := sess.Start(c.exec); err != nil { + return fmt.Errorf("Start(%q) on %v/%v: %w", c.exec, c.network, c.addr, err) + } + + log.Infof("connected to %v/%v", c.network, c.addr) + + done := make(chan struct{}) + go func() { + sess.Wait() + close(done) + }() + go func() { + select { + case <-c.stop: + sess.Close() + case <-done: + } + return + }() + <-done + + if goAway { + return errStopConnect + } + return nil +} + +func (c *restartingClient) run() { + defer close(c.stopped) + bo := backoffutil.NewDefaultBackOff() + for { + timer := time.NewTimer(bo.NextBackOff()) + select { + case <-c.stop: + timer.Stop() + return + case <-timer.C: + break + } + if err := c.runOnce(); err == errStopConnect { + if c.shutdown != nil { + c.shutdown() + return + } + } else if err != nil { + log.Errorf("SSH: %v", err) + } else { + bo.Reset() + } + } +} + +// Output returns the channel on which each newline-delimited string output by the executed command's stdout can be received. +func (c *restartingClient) Output() <-chan string { + return c.output +} + +// dialRestartingClient creates a new restartingClient. +func dialRestartingClient(network, addr string, config *ssh.ClientConfig, exec string, shutdown func()) (*restartingClient, error) { + c := &restartingClient{ + closer: newCloser(), + network: network, + addr: addr, + cfg: config, + exec: exec, + output: make(chan string), + shutdown: shutdown, + } + go c.run() + return c, nil +} + +// Watcher watches +type Watcher struct { + closer + c *restartingClient + + output chan gerritevents.Event +} + +// Close shuts down the SSH client connection, if any, and closes the output channel. +// It blocks until shutdown is complete or until the context is cancelled, whichever comes first. +func (w *Watcher) Close(ctx context.Context) { + w.c.Close(ctx) + w.closer.Close(ctx) +} + +func (w *Watcher) run() { + defer close(w.stopped) + defer close(w.output) + for { + select { + case <-w.stop: + return + case o := <-w.c.Output(): + ev, err := gerritevents.Parse([]byte(o)) + if err != nil { + log.Errorf("failed to parse event %v: %v", o, err) + continue + } + w.output <- ev + } + } +} + +// Events returns the channel upon which parsed Gerrit events can be received. +func (w *Watcher) Events() <-chan gerritevents.Event { + return w.output +} + +// New returns a running Watcher from which events can be read. +// It will begin connecting to the provided address immediately. +func New(ctx context.Context, network, addr string, cfg *ssh.ClientConfig) (*Watcher, error) { + wc := newCloser() + rc, err := dialRestartingClient(network, addr, cfg, "gerrit stream-events", func() { + wc.Close(context.Background()) + }) + if err != nil { + return nil, fmt.Errorf("dialRestartingClient: %w", err) + } + w := &Watcher{ + closer: wc, + c: rc, + output: make(chan gerritevents.Event), + } + go w.run() + return w, nil +} diff --git a/fun/clbot/gerrit/watcher_test.go b/fun/clbot/gerrit/watcher_test.go new file mode 100644 index 000000000000..ae69b2fc4c18 --- /dev/null +++ b/fun/clbot/gerrit/watcher_test.go @@ -0,0 +1,190 @@ +package gerrit + +import ( + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/subtle" + "fmt" + "net" + "testing" + "time" + + "code.tvl.fyi/fun/clbot/gerrit/gerritevents" + log "github.com/golang/glog" + "github.com/google/go-cmp/cmp" + "golang.org/x/crypto/ssh" +) + +var ( + sshServerSigner, sshServerPublicKey = mustNewKey() + sshClientSigner, sshClientPublicKey = mustNewKey() +) + +func mustNewKey() (ssh.Signer, ssh.PublicKey) { + key, err := ecdsa.GenerateKey(elliptic.P384(), rand.Reader) + if err != nil { + panic(err) + } + signer, err := ssh.NewSignerFromKey(key) + if err != nil { + panic(err) + } + publicKey, err := ssh.NewPublicKey(key.Public()) + if err != nil { + panic(err) + } + return signer, publicKey +} + +func newSSHServer(lines string) (addr string, cleanup func(), err error) { + config := &ssh.ServerConfig{ + PublicKeyCallback: func(c ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) { + pkBytes := pubKey.Marshal() + wantPKBytes := sshClientPublicKey.Marshal() + if subtle.ConstantTimeCompare(pkBytes, wantPKBytes) == 0 { + return nil, fmt.Errorf("unauthorized") + } + return &ssh.Permissions{}, nil + }, + } + config.AddHostKey(sshServerSigner) + + ln, err := net.Listen("tcp", ":0") + if err != nil { + log.Fatalf("Listen on tcp/:0: %v", err) + } + handle := func(conn net.Conn) { + defer conn.Close() + + sc, newchch, newreqch, err := ssh.NewServerConn(conn, config) + if err != nil { + log.Fatalf("NewServerConn: %v", err) + } + go ssh.DiscardRequests(newreqch) + for newCh := range newchch { + if newCh.ChannelType() != "session" { + newCh.Reject(ssh.UnknownChannelType, "unknown channel type") + continue + } + + channel, reqs, err := newCh.Accept() + if err != nil { + log.Fatalf("Could not accept channel: %v", err) + } + go func(in <-chan *ssh.Request) { + for req := range in { + req.Reply(req.Type == "exec", nil) + } + }(reqs) + channel.Write([]byte(lines)) + sc.SendRequest("goaway", false, nil) + } + } + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + go handle(conn) + } + }() + + cleanup = func() { + ln.Close() + } + return ln.Addr().String(), cleanup, err +} + +func ts(s string) gerritevents.Time { + t, err := time.Parse("2006-01-02 15:04:05 -0700 MST", s) + if err != nil { + panic(err) + } + return gerritevents.Time{t} +} + +func optStr(s string) *string { return &s } + +func TestWatcher(t *testing.T) { + tcs := []struct { + name string + lines string + want []gerritevents.Event + }{{ + name: "no events", + }, { + name: "single test event", + lines: `{"author":{"name":"tazjin","email":"mail@tazj.in","username":"tazjin"},"approvals":[{"type":"Code-Review","description":"Code-Review","value":"2","oldValue":"0"}],"comment":"Patch Set 3: Code-Review+2","patchSet":{"number":3,"revision":"6fe272d3f82c6efdfe1167fab98bf918efc03fe5","parents":["d984b6018cf68c7e8b7169b475d90134fbcee767"],"ref":"refs/changes/44/244/3","uploader":{"name":"tazjin","email":"mail@tazj.in","username":"tazjin"},"createdOn":1592081910,"author":{"name":"tazjin","email":"mail@tazj.in","username":"tazjin"},"kind":"REWORK","sizeInsertions":83,"sizeDeletions":-156},"change":{"project":"depot","branch":"master","id":"I546c701145fa204b7ba7518a8a56a783588629e0","number":244,"subject":"refactor(ops/nixos): Move my NixOS configurations to //users/tazjin","owner":{"name":"tazjin","email":"mail@tazj.in","username":"tazjin"},"url":"https://cl.tvl.fyi/c/depot/+/244","commitMessage":"refactor(ops/nixos): Move my NixOS configurations to //users/tazjin\n\nNixOS modules move one level up because it\u0027s unlikely that //ops/nixos\nwill contain actual systems at this point (they\u0027re user-specific).\n\nThis is the first users folder, so it is also added to the root\nreadTree invocation for the repository.\n\nChange-Id: I546c701145fa204b7ba7518a8a56a783588629e0\n","createdOn":1592081577,"status":"NEW"},"project":"depot","refName":"refs/heads/master","changeKey":{"id":"I546c701145fa204b7ba7518a8a56a783588629e0"},"type":"comment-added","eventCreatedOn":1592081929} +`, + want: []gerritevents.Event{ + &gerritevents.CommentAdded{ + Type: "comment-added", + Change: gerritevents.Change{ + Project: "depot", + Branch: "master", + ID: "I546c701145fa204b7ba7518a8a56a783588629e0", + Number: 244, + Subject: "refactor(ops/nixos): Move my NixOS configurations to //users/tazjin", + Owner: gerritevents.Account{Name: "tazjin", Email: "mail@tazj.in", Username: "tazjin"}, + URL: "https://cl.tvl.fyi/c/depot/+/244", + CommitMessage: "refactor(ops/nixos): Move my NixOS configurations to //users/tazjin\n\nNixOS modules move one level up because it's unlikely that //ops/nixos\nwill contain actual systems at this point (they're user-specific).\n\nThis is the first users folder, so it is also added to the root\nreadTree invocation for the repository.\n\nChange-Id: I546c701145fa204b7ba7518a8a56a783588629e0\n", + CreatedOn: ts("2020-06-13 21:52:57 +0100 BST"), + Status: "NEW", + }, + PatchSet: gerritevents.PatchSet{ + Number: 3, + Revision: "6fe272d3f82c6efdfe1167fab98bf918efc03fe5", + Parents: []string{"d984b6018cf68c7e8b7169b475d90134fbcee767"}, + Ref: "refs/changes/44/244/3", + Uploader: gerritevents.Account{Name: "tazjin", Email: "mail@tazj.in", Username: "tazjin"}, + Author: gerritevents.Account{Name: "tazjin", Email: "mail@tazj.in", Username: "tazjin"}, + CreatedOn: ts("2020-06-13 21:58:30 +0100 BST"), + Kind: "REWORK", + SizeInsertions: 83, + SizeDeletions: -156, + }, + Author: gerritevents.Account{Name: "tazjin", Email: "mail@tazj.in", Username: "tazjin"}, + Approvals: []gerritevents.Approval{{Type: "Code-Review", Description: "Code-Review", Value: "2", OldValue: optStr("0")}}, + Comment: "Patch Set 3: Code-Review+2", + EventCreatedOn: ts("2020-06-13 21:58:49 +0100 BST"), + }, + }, + }} + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + serverAddr, cleanup, err := newSSHServer(tc.lines) + if err != nil { + t.Fatalf("newSSHServer: %v", err) + } + t.Cleanup(cleanup) + + config := &ssh.ClientConfig{ + User: "bert", + Auth: []ssh.AuthMethod{ssh.PublicKeys(sshClientSigner)}, + HostKeyCallback: ssh.FixedHostKey(sshServerPublicKey), + Timeout: 10 * time.Millisecond, + } + w, err := New(ctx, "tcp", serverAddr, config) + if err != nil { + t.Fatalf("New: %v", err) + } + + var gotEvents []gerritevents.Event + for ev := range w.Events() { + gotEvents = append(gotEvents, ev) + } + if diff := cmp.Diff(gotEvents, tc.want); diff != "" { + t.Errorf("got events != want events: diff:\n%v", diff) + } + }) + } +} |