about summary refs log tree commit diff
path: root/fun/clbot/gerrit
diff options
context:
space:
mode:
Diffstat (limited to 'fun/clbot/gerrit')
-rw-r--r--fun/clbot/gerrit/default.nix18
-rw-r--r--fun/clbot/gerrit/gerritevents/default.nix10
-rw-r--r--fun/clbot/gerrit/gerritevents/events.go321
-rw-r--r--fun/clbot/gerrit/gerritevents/time.go38
-rw-r--r--fun/clbot/gerrit/gerritevents/types.go221
-rw-r--r--fun/clbot/gerrit/watcher.go252
-rw-r--r--fun/clbot/gerrit/watcher_test.go190
7 files changed, 1050 insertions, 0 deletions
diff --git a/fun/clbot/gerrit/default.nix b/fun/clbot/gerrit/default.nix
new file mode 100644
index 000000000000..3b6ce0a7392f
--- /dev/null
+++ b/fun/clbot/gerrit/default.nix
@@ -0,0 +1,18 @@
+{ depot, ... }:
+
+let
+  inherit (depot.fun) clbot;
+  inherit (depot.third_party) gopkgs;
+in
+depot.nix.buildGo.package {
+  name = "code.tvl.fyi/fun/clbot/gerrit";
+  srcs = [
+    ./watcher.go
+  ];
+  deps = [
+    clbot.gerrit.gerritevents
+    clbot.backoffutil
+    gopkgs."github.com".golang.glog.gopkg
+    gopkgs."golang.org".x.crypto.ssh.gopkg
+  ];
+}
diff --git a/fun/clbot/gerrit/gerritevents/default.nix b/fun/clbot/gerrit/gerritevents/default.nix
new file mode 100644
index 000000000000..024451858bbc
--- /dev/null
+++ b/fun/clbot/gerrit/gerritevents/default.nix
@@ -0,0 +1,10 @@
+{ depot, ... }:
+
+depot.nix.buildGo.package {
+  name = "code.tvl.fyi/fun/clbot/gerrit/gerritevents";
+  srcs = [
+    ./time.go
+    ./types.go
+    ./events.go
+  ];
+}
diff --git a/fun/clbot/gerrit/gerritevents/events.go b/fun/clbot/gerrit/gerritevents/events.go
new file mode 100644
index 000000000000..c02b30f76ea8
--- /dev/null
+++ b/fun/clbot/gerrit/gerritevents/events.go
@@ -0,0 +1,321 @@
+package gerritevents
+
+import (
+	"encoding/json"
+	"fmt"
+)
+
+var events = map[string]func() Event{}
+
+func registerEvent(e func() Event) {
+	t := e().EventType()
+	if _, ok := events[t]; ok {
+		panic(fmt.Sprintf("%s already registered", t))
+	}
+	events[t] = e
+}
+
+// These events are taken from https://cl.tvl.fyi/Documentation/cmd-stream-events.html.
+
+// Event is implemented by Gerrit event structs.
+type Event interface {
+	EventType() string
+}
+
+type simpleEvent struct {
+	Type string `json:"type"`
+}
+
+// Parse parses a Gerrit event from JSON.
+func Parse(bs []byte) (Event, error) {
+	var s simpleEvent
+	if err := json.Unmarshal(bs, &s); err != nil {
+		return nil, fmt.Errorf("unmarshalling %q as Gerrit Event: %v", string(bs), err)
+	}
+	ef, ok := events[s.Type]
+	if !ok {
+		return nil, fmt.Errorf("unknown event type %q", s.Type)
+	}
+	e := ef()
+	if err := json.Unmarshal(bs, e); err != nil {
+		return nil, fmt.Errorf("unmarshalling %q as Gerrit Event %q: %v", string(bs), e.EventType(), err)
+	}
+	return e, nil
+}
+
+// AssigneeChanged indicates that a change's assignee has been changed.
+type AssigneeChanged struct {
+	Type           string  `json:"type"`
+	Change         Change  `json:"change"`
+	Changer        Account `json:"changer"`
+	OldAssignee    Account `json:"oldAssignee"`
+	EventCreatedOn Time    `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (AssigneeChanged) EventType() string { return "assignee-changed" }
+
+func init() {
+	registerEvent(func() Event { return &AssigneeChanged{} })
+}
+
+// ChangeAbandoned indicates that a change has been abandoned.
+type ChangeAbandoned struct {
+	Type           string   `json:"type"`
+	Change         Change   `json:"change"`
+	PatchSet       PatchSet `json:"patchSet"`
+	Abandoner      Account  `json:"abandoner"`
+	Reason         string   `json:"reason"`
+	EventCreatedOn Time     `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (ChangeAbandoned) EventType() string { return "change-abandoned" }
+
+func init() {
+	registerEvent(func() Event { return &ChangeAbandoned{} })
+}
+
+// ChangeDeleted indicates that a change has been deleted.
+type ChangeDeleted struct {
+	Type    string  `json:"type"`
+	Change  Change  `json:"change"`
+	Deleter Account `json:"deleter"`
+}
+
+// EventType implements Event.
+func (ChangeDeleted) EventType() string { return "change-deleted" }
+
+func init() {
+	registerEvent(func() Event { return &ChangeDeleted{} })
+}
+
+// ChangeMerged indicates that a change has been merged into the target branch.
+type ChangeMerged struct {
+	Type           string   `json:"type"`
+	Change         Change   `json:"change"`
+	PatchSet       PatchSet `json:"patchSet"`
+	Submitter      Account  `json:"submitter"`
+	NewRev         string   `json:"newRev"`
+	EventCreatedOn Time     `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (ChangeMerged) EventType() string { return "change-merged" }
+
+func init() {
+	registerEvent(func() Event { return &ChangeMerged{} })
+}
+
+// ChangeRestored indicates a change has been restored (i.e. un-abandoned).
+type ChangeRestored struct {
+	Type           string   `json:"type"`
+	Change         Change   `json:"change"`
+	PatchSet       PatchSet `json:"patchSet"`
+	Restorer       Account  `json:"restorer"`
+	Reason         string   `json:"reason"`
+	EventCreatedOn Time     `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (ChangeRestored) EventType() string { return "change-restored" }
+
+func init() {
+	registerEvent(func() Event { return &ChangeRestored{} })
+}
+
+// CommentAdded indicates someone has commented on a patchset.
+type CommentAdded struct {
+	Type           string     `json:"type"`
+	Change         Change     `json:"change"`
+	PatchSet       PatchSet   `json:"patchSet"`
+	Author         Account    `json:"author"`
+	Approvals      []Approval `json:"approvals"`
+	Comment        string     `json:"comment"`
+	EventCreatedOn Time       `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (CommentAdded) EventType() string { return "comment-added" }
+
+func init() {
+	registerEvent(func() Event { return &CommentAdded{} })
+}
+
+// DroppedOutput indicates that some events may be missing from the stream.
+type DroppedOutput struct {
+	Type string `json:"type"`
+}
+
+// EventType implements Event.
+func (DroppedOutput) EventType() string { return "dropped-output" }
+
+func init() {
+	registerEvent(func() Event { return &DroppedOutput{} })
+}
+
+// HashtagsChanged indicates that someone has added or removed hashtags from a change.
+type HashtagsChanged struct {
+	Type           string   `json:"type"`
+	Change         Change   `json:"change"`
+	Editor         Account  `json:"editor"`
+	Added          []string `json:"added"`
+	Removed        []string `json:"removed"`
+	Hashtags       []string `json:"hashtags"`
+	EventCreatedOn Time     `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (HashtagsChanged) EventType() string { return "hashtags-changed" }
+
+func init() {
+	registerEvent(func() Event { return &HashtagsChanged{} })
+}
+
+// ProjectCreated indicates that a new project has been created.
+type ProjectCreated struct {
+	Type           string `json:"type"`
+	ProjectName    string `json:"projectName"`
+	ProjectHead    string `json:"projectHead"`
+	EventCreatedOn Time   `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (ProjectCreated) EventType() string { return "project-created" }
+
+func init() {
+	registerEvent(func() Event { return &ProjectCreated{} })
+}
+
+// PatchSetCreated indicates that a new patchset has been added to a change.
+type PatchSetCreated struct {
+	Type           string   `json:"type"`
+	Change         Change   `json:"change"`
+	PatchSet       PatchSet `json:"patchSet"`
+	Uploader       Account  `json:"uploader"`
+	EventCreatedOn Time     `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (PatchSetCreated) EventType() string { return "patchset-created" }
+
+func init() {
+	registerEvent(func() Event { return &PatchSetCreated{} })
+}
+
+// RefUpdated indicates that a ref has been updated.
+type RefUpdated struct {
+	Type           string    `json:"type"`
+	Submitter      Account   `json:"submitter"`
+	RefUpdate      RefUpdate `json:"refUpdate"`
+	EventCreatedOn Time      `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (RefUpdated) EventType() string { return "ref-updated" }
+
+func init() {
+	registerEvent(func() Event { return &RefUpdated{} })
+}
+
+// ReviewerAdded indicates that a reviewer has been added to a change.
+type ReviewerAdded struct {
+	Type           string   `json:"type"`
+	Change         Change   `json:"change"`
+	PatchSet       PatchSet `json:"patchSet"`
+	Reviewer       Account  `json:"reviewer"`
+	Adder          Account  `json:"adder"`
+	EventCreatedOn Time     `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (ReviewerAdded) EventType() string { return "reviewer-added" }
+
+func init() {
+	registerEvent(func() Event { return &ReviewerAdded{} })
+}
+
+// ReviewerDeleted indicates that a reviewer has been removed from a change, possibly removing one or more approvals.
+type ReviewerDeleted struct {
+	Type           string     `json:"type"`
+	Change         Change     `json:"change"`
+	PatchSet       PatchSet   `json:"patchSet"`
+	Reviewer       Account    `json:"reviewer"`
+	Remover        Account    `json:"remover"`
+	Approvals      []Approval `json:"approvals"`
+	Comment        string     `json:"comment"`
+	EventCreatedOn Time       `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (ReviewerDeleted) EventType() string { return "reviewer-deleted" }
+
+func init() {
+	registerEvent(func() Event { return &ReviewerDeleted{} })
+}
+
+// TopicChanged indicates that the topic attached to a change has been changed.
+type TopicChanged struct {
+	Type           string  `json:"type"`
+	Change         Change  `json:"change"`
+	Changer        Account `json:"changer"`
+	OldTopic       string  `json:"oldTopic"`
+	EventCreatedOn Time    `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (TopicChanged) EventType() string { return "topic-changed" }
+
+func init() {
+	registerEvent(func() Event { return &TopicChanged{} })
+}
+
+// WIPStateChanged indicates that the work-in-progress state of a change has changed.
+type WIPStateChanged struct {
+	Type           string   `json:"type"`
+	Change         Change   `json:"change"`
+	PatchSet       PatchSet `json:"patchSet"`
+	Changer        Account  `json:"changer"`
+	EventCreatedOn Time     `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (WIPStateChanged) EventType() string { return "wip-state-changed" }
+
+func init() {
+	registerEvent(func() Event { return &WIPStateChanged{} })
+}
+
+// PrivateStateChanged indicates that the private state of a change has changed.
+type PrivateStateChanged struct {
+	Type           string   `json:"type"`
+	Change         Change   `json:"change"`
+	PatchSet       PatchSet `json:"patchSet"`
+	Changer        Account  `json:"changer"`
+	EventCreatedOn Time     `json:"eventCreatedOn"`
+}
+
+// EventType implements Event.
+func (PrivateStateChanged) EventType() string { return "private-state-changed" }
+
+func init() {
+	registerEvent(func() Event { return &PrivateStateChanged{} })
+}
+
+// VoteDeleted indicates that an approval vote has been deleted from a change.
+type VoteDeleted struct {
+	Type      string     `json:"type"`
+	Change    Change     `json:"change"`
+	PatchSet  PatchSet   `json:"patchSet"`
+	Reviewer  Account    `json:"reviewer"`
+	Remover   Account    `json:"remover"`
+	Approvals []Approval `json:"approvals"`
+	Comment   string     `json:"comment"`
+}
+
+// EventType implements Event.
+func (VoteDeleted) EventType() string { return "vote-deleted" }
+
+func init() {
+	registerEvent(func() Event { return &VoteDeleted{} })
+}
diff --git a/fun/clbot/gerrit/gerritevents/time.go b/fun/clbot/gerrit/gerritevents/time.go
new file mode 100644
index 000000000000..7fbfaa3f5c47
--- /dev/null
+++ b/fun/clbot/gerrit/gerritevents/time.go
@@ -0,0 +1,38 @@
+package gerritevents
+
+import (
+	"fmt"
+	"strconv"
+	"time"
+)
+
+// Time is a time.Time that is formatted as a Unix timestamp in JSON.
+type Time struct {
+	time.Time
+}
+
+// UnmarshalJSON unmarshals a Unix timestamp into a Time.
+func (t *Time) UnmarshalJSON(bs []byte) error {
+	if string(bs) == "null" {
+		return nil
+	}
+	u, err := strconv.ParseInt(string(bs), 10, 64)
+	if err != nil {
+		return err
+	}
+	t.Time = time.Unix(u, 0)
+	return nil
+}
+
+// MarshalJSON marshals a Time into a Unix timestamp.
+func (t *Time) MarshalJSON() ([]byte, error) {
+	if t.IsZero() {
+		return []byte("null"), nil
+	}
+	return []byte(fmt.Sprintf("%d", t.Unix())), nil
+}
+
+// IsSet returns true if the time.Time is non-zero.
+func (t *Time) IsSet() bool {
+	return !t.IsZero()
+}
diff --git a/fun/clbot/gerrit/gerritevents/types.go b/fun/clbot/gerrit/gerritevents/types.go
new file mode 100644
index 000000000000..75987a2da601
--- /dev/null
+++ b/fun/clbot/gerrit/gerritevents/types.go
@@ -0,0 +1,221 @@
+package gerritevents
+
+// These types are taken from https://cl.tvl.fyi/Documentation/json.html.
+
+// Account is a Gerrit account (or just a Git name+email pair).
+type Account struct {
+	Name     string `json:"name"`
+	Email    string `json:"email"`
+	Username string `json:"username"`
+}
+
+// ChangeStatus represents the states a change can be in.
+type ChangeStatus string
+
+const (
+	// ChangeStatusNew is the state a change is in during review.
+	ChangeStatusNew ChangeStatus = "NEW"
+
+	// ChangeStatusMerged indicates a change was merged to the target branch.
+	ChangeStatusMerged ChangeStatus = "MERGED"
+
+	// ChangeStatusAbandoned indicates a change was marked as abandoned.
+	ChangeStatusAbandoned ChangeStatus = "ABANDONED"
+)
+
+// Message is a message left by a reviewer.
+type Message struct {
+	Timestamp Time    `json:"timestamp"`
+	Reviewer  Account `json:"reviewer"`
+	Message   string  `json:"message"`
+}
+
+// TrackingID allows storing identifiers from external systems, i.e. bug trackers.
+type TrackingID struct {
+	System string `json:"system"`
+	ID     string `json:"id"`
+}
+
+// ChangeKind indicates the different changes that can be made to a change.
+type ChangeKind string
+
+const (
+	// ChangeKindRework indicates a non-trivial content change.
+	ChangeKindRework ChangeKind = "REWORK"
+
+	// ChangeKindTrivialRebase indicates a conflict-free merge between the new parent and the prior patch set.
+	ChangeKindTrivialRebase ChangeKind = "TRIVIAL_REBASE"
+
+	// ChangeKindMergeFirstParentUpdate indicates a conflict-free change of the first parent of a merge commit.
+	ChangeKindMergeFirstParentUpdate ChangeKind = "MERGE_FIRST_PARENT_UPDATE"
+
+	// ChangeKindNoCodeChange indicates no code change (the tree and parent trees are unchanged) - commit message probably changed.
+	ChangeKindNoCodeChange ChangeKind = "NO_CODE_CHANGE"
+
+	// ChangeKindNoChange indicates nothing changes: the commit message, tree, and parent tree are unchanged.
+	ChangeKindNoChange ChangeKind = "NO_CHANGE"
+)
+
+// Approval represents the current and past state of an approval label.
+type Approval struct {
+	Type        string   `json:"type"`
+	Description string   `json:"description"`
+	Value       string   `json:"value"`
+	OldValue    *string  `json:"oldValue"`
+	GrantedOn   *Time    `json:"grantedOn"`
+	By          *Account `json:"by"`
+}
+
+// PatchSetComment is a single comment left on a patchset.
+type PatchSetComment struct {
+	File     string  `json:"file"`
+	Line     int     `json:"line"`
+	Reviewer Account `json:"reviewer"`
+	Message  string  `json:"message"`
+}
+
+// FilePatchType represents the different modifications that can be made to a file by a patchset.
+type FilePatchType string
+
+const (
+	// FilePatchTypeAdded indicates the file did not exist, and this patchset adds it to the tree.
+	FilePatchTypeAdded FilePatchType = "ADDED"
+
+	// FilePatchTypeModified indicates the file exists before and after this patchset.
+	FilePatchTypeModified FilePatchType = "MODIFIED"
+
+	// FilePatchTypeDeleted indicates the file is removed by this patchset.
+	FilePatchTypeDeleted FilePatchType = "DELETED"
+
+	// FilePatchTypeRenamed indicates the file has a different name before this patchset than after.
+	FilePatchTypeRenamed FilePatchType = "RENAMED"
+
+	// FilePatchTypeCopied indicates the file was copied from a different file.
+	FilePatchTypeCopied FilePatchType = "COPIED"
+
+	// FilePatchTypeRewrite indicates the file had a significant quantity of content changed.
+	FilePatchTypeRewrite FilePatchType = "REWRITE"
+)
+
+// File represents a file in a patchset as well as how it is being modified.
+type File struct {
+	File    string        `json:"file"`
+	FileOld string        `json:"fileOld"`
+	Type    FilePatchType `json:"type"`
+}
+
+// PatchSet represents a single patchset within a change.
+type PatchSet struct {
+	Number         int               `json:"number"`
+	Revision       string            `json:"revision"`
+	Parents        []string          `json:"parents"`
+	Ref            string            `json:"ref"`
+	Uploader       Account           `json:"uploader"`
+	Author         Account           `json:"author"`
+	CreatedOn      Time              `json:"createdOn"`
+	Kind           ChangeKind        `json:"kind"`
+	Approvals      []Approval        `json:"approvals"`
+	Comments       []PatchSetComment `json:"comments"`
+	Files          []File            `json:"file"`
+	SizeInsertions int               `json:"sizeInsertions"`
+	SizeDeletions  int               `json:"sizeDeletions"`
+}
+
+// Dependency represents a change on which this change is dependent.
+type Dependency struct {
+	ID                string `json:"id"`
+	Number            int    `json:"number"`
+	Revision          string `json:"revision"`
+	Ref               string `json:"ref"`
+	IsCurrentPatchSet bool   `json:"isCurrentPatchSet"`
+}
+
+// SubmitStatus indicates whether this change has met the submit conditions and is ready to submit.
+type SubmitStatus string
+
+const (
+	// SubmitStatusOK indicates this change is ready to submit - all submit requirements are met.
+	SubmitStatusOK SubmitStatus = "OK"
+
+	// SubmitStatusNotReady indicates this change cannot yet be submitted.
+	SubmitStatusNotReady SubmitStatus = "NOT_READY"
+
+	// SubmitStatusRuleError indicates the submit rules could not be evaluted. Administrator intervention is required.
+	SubmitStatusRuleError SubmitStatus = "RULE_ERROR"
+)
+
+// LabelStatus indicates whether this label permits submission and if the label can be granted by anyone.
+type LabelStatus string
+
+const (
+	// LabelStatusOK indicates that this label provides what is necessary for submission (e.g. CR+2).
+	LabelStatusOK LabelStatus = "OK"
+
+	// LabelStatusReject indicates this label prevents submission (e.g. CR-2).
+	LabelStatusReject LabelStatus = "REJECT"
+
+	// LabelStatusNeed indicates this label is required for submission, but has not been satisfied (e.g. CR0).
+	LabelStatusNeed LabelStatus = "NEED"
+
+	// LabelStatusMay indicates this label is not required for submission. It may or may not be set.
+	LabelStatusMay LabelStatus = "MAY"
+
+	// LabelStatusImpossible indicates this label is required for submission, but cannot be satisfied. The ACLs on this label may be set incorrectly.
+	LabelStatusImpossible LabelStatus = "IMPOSSIBLE"
+)
+
+// Label represents the status of a particular label.
+type Label struct {
+	Label  string      `json:"label"`
+	Status LabelStatus `json:"status"`
+	By     Account     `json:"by"`
+}
+
+// Requirement represents a submit requirement.
+type Requirement struct {
+	FallbackText string `json:"fallbackText"`
+	Type         string `json:"type"`
+	// TODO(lukegb): data
+}
+
+// SubmitRecord represents the current submission state of a change.
+type SubmitRecord struct {
+	Status       SubmitStatus  `json:"status"`
+	Labels       []Label       `json:"labels"`
+	Requirements []Requirement `json:"requirements"`
+}
+
+// Change represents a Gerrit CL.
+type Change struct {
+	Project         string         `json:"project"`
+	Branch          string         `json:"branch"`
+	Topic           string         `json:"topic"`
+	ID              string         `json:"id"`
+	Number          int            `json:"number"`
+	Subject         string         `json:"subject"`
+	Owner           Account        `json:"owner"`
+	URL             string         `json:"url"`
+	CommitMessage   string         `json:"commitMessage"`
+	CreatedOn       Time           `json:"createdOn"`
+	LastUpdated     *Time          `json:"lastUpdated"`
+	Open            bool           `json:"open"`
+	Status          ChangeStatus   `json:"status"`
+	Private         bool           `json:"private"`
+	WIP             bool           `json:"wip"`
+	Comments        []Message      `json:"comments"`
+	TrackingIDs     []TrackingID   `json:"trackingIds"`
+	CurrentPatchSet *PatchSet      `json:"currentPatchSet"`
+	PatchSets       []PatchSet     `json:"patchSets"`
+	DependsOn       []Dependency   `json:"dependsOn"`
+	NeededBy        []Dependency   `json:"neededBy"`
+	SubmitRecords   []SubmitRecord `json:"submitRecord"`
+	AllReviewers    []Account      `json:"allReviewers"`
+}
+
+// RefUpdate represents a change in a ref.
+type RefUpdate struct {
+	OldRev  string `json:"oldRev"`
+	NewRev  string `json:"newRev"`
+	RefName string `json:"refName"`
+	Project string `json:"project"`
+}
diff --git a/fun/clbot/gerrit/watcher.go b/fun/clbot/gerrit/watcher.go
new file mode 100644
index 000000000000..d45876129b3a
--- /dev/null
+++ b/fun/clbot/gerrit/watcher.go
@@ -0,0 +1,252 @@
+// Package gerrit implements a watcher for Gerrit events.
+package gerrit
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net"
+	"strings"
+	"time"
+
+	"code.tvl.fyi/fun/clbot/backoffutil"
+	"code.tvl.fyi/fun/clbot/gerrit/gerritevents"
+	log "github.com/golang/glog"
+	"golang.org/x/crypto/ssh"
+)
+
+// closer provides an embeddable implementation of Close which awaits a main loop acknowledging it has stopped.
+type closer struct {
+	stop    chan struct{}
+	stopped chan struct{}
+}
+
+// newCloser returns a closer with the channels initialised.
+func newCloser() closer {
+	return closer{
+		stop:    make(chan struct{}),
+		stopped: make(chan struct{}),
+	}
+}
+
+// Close stops the main loop, waiting for the main loop to stop until it stops or the context is cancelled, whichever happens first.
+func (c *closer) Close(ctx context.Context) error {
+	select {
+	case <-c.stopped:
+		return nil
+	case <-c.stop:
+		return nil
+	case <-ctx.Done():
+		return ctx.Err()
+	default:
+	}
+	close(c.stop)
+	select {
+	case <-c.stopped:
+		return nil
+	case <-ctx.Done():
+		return ctx.Err()
+	}
+}
+
+// lineWriter is an io.Writer which splits on \n and outputs each line (with no trailing newline) to its output channel.
+type lineWriter struct {
+	buf string
+	out chan string
+}
+
+// Write accepts a slice of bytes containing zero or more new lines.
+// If the contained channel is non-buffering or is full, this will block.
+func (w *lineWriter) Write(p []byte) (n int, err error) {
+	w.buf += string(p)
+	pieces := strings.Split(w.buf, "\n")
+	w.buf = pieces[len(pieces)-1]
+	for n := 0; n < len(pieces)-1; n++ {
+		w.out <- pieces[n]
+	}
+	return len(p), nil
+}
+
+// restartingClient is a simple SSH client that repeatedly connects to an SSH server, runs a command, and outputs the lines output by it on stdout onto a channel.
+type restartingClient struct {
+	closer
+
+	network string
+	addr    string
+	cfg     *ssh.ClientConfig
+
+	exec     string
+	output   chan string
+	shutdown func()
+}
+
+var (
+	errStopConnect = errors.New("gerrit: told to stop reconnecting by remote server")
+)
+
+func (c *restartingClient) runOnce() error {
+	netConn, err := net.Dial(c.network, c.addr)
+	if err != nil {
+		return fmt.Errorf("connecting to %v/%v: %w", c.network, c.addr, err)
+	}
+	defer netConn.Close()
+
+	sshConn, newCh, newReq, err := ssh.NewClientConn(netConn, c.addr, c.cfg)
+	if err != nil {
+		return fmt.Errorf("creating SSH connection to %v/%v: %w", c.network, c.addr, err)
+	}
+	defer sshConn.Close()
+
+	goAway := false
+	passedThroughReqs := make(chan *ssh.Request)
+	go func() {
+		defer close(passedThroughReqs)
+		for req := range newReq {
+			if req.Type == "goaway" {
+				goAway = true
+				log.Warningf("remote end %v/%v told me to go away!", c.network, c.addr)
+				sshConn.Close()
+				netConn.Close()
+			}
+			passedThroughReqs <- req
+		}
+	}()
+
+	cl := ssh.NewClient(sshConn, newCh, passedThroughReqs)
+
+	sess, err := cl.NewSession()
+	if err != nil {
+		return fmt.Errorf("NewSession on %v/%v: %w", c.network, c.addr, err)
+	}
+	defer sess.Close()
+
+	sess.Stdout = &lineWriter{out: c.output}
+
+	if err := sess.Start(c.exec); err != nil {
+		return fmt.Errorf("Start(%q) on %v/%v: %w", c.exec, c.network, c.addr, err)
+	}
+
+	log.Infof("connected to %v/%v", c.network, c.addr)
+
+	done := make(chan struct{})
+	go func() {
+		sess.Wait()
+		close(done)
+	}()
+	go func() {
+		select {
+		case <-c.stop:
+			sess.Close()
+		case <-done:
+		}
+		return
+	}()
+	<-done
+
+	if goAway {
+		return errStopConnect
+	}
+	return nil
+}
+
+func (c *restartingClient) run() {
+	defer close(c.stopped)
+	bo := backoffutil.NewDefaultBackOff()
+	for {
+		timer := time.NewTimer(bo.NextBackOff())
+		select {
+		case <-c.stop:
+			timer.Stop()
+			return
+		case <-timer.C:
+			break
+		}
+		if err := c.runOnce(); err == errStopConnect {
+			if c.shutdown != nil {
+				c.shutdown()
+				return
+			}
+		} else if err != nil {
+			log.Errorf("SSH: %v", err)
+		} else {
+			bo.Reset()
+		}
+	}
+}
+
+// Output returns the channel on which each newline-delimited string output by the executed command's stdout can be received.
+func (c *restartingClient) Output() <-chan string {
+	return c.output
+}
+
+// dialRestartingClient creates a new restartingClient.
+func dialRestartingClient(network, addr string, config *ssh.ClientConfig, exec string, shutdown func()) (*restartingClient, error) {
+	c := &restartingClient{
+		closer:   newCloser(),
+		network:  network,
+		addr:     addr,
+		cfg:      config,
+		exec:     exec,
+		output:   make(chan string),
+		shutdown: shutdown,
+	}
+	go c.run()
+	return c, nil
+}
+
+// Watcher watches
+type Watcher struct {
+	closer
+	c *restartingClient
+
+	output chan gerritevents.Event
+}
+
+// Close shuts down the SSH client connection, if any, and closes the output channel.
+// It blocks until shutdown is complete or until the context is cancelled, whichever comes first.
+func (w *Watcher) Close(ctx context.Context) {
+	w.c.Close(ctx)
+	w.closer.Close(ctx)
+}
+
+func (w *Watcher) run() {
+	defer close(w.stopped)
+	defer close(w.output)
+	for {
+		select {
+		case <-w.stop:
+			return
+		case o := <-w.c.Output():
+			ev, err := gerritevents.Parse([]byte(o))
+			if err != nil {
+				log.Errorf("failed to parse event %v: %v", o, err)
+				continue
+			}
+			w.output <- ev
+		}
+	}
+}
+
+// Events returns the channel upon which parsed Gerrit events can be received.
+func (w *Watcher) Events() <-chan gerritevents.Event {
+	return w.output
+}
+
+// New returns a running Watcher from which events can be read.
+// It will begin connecting to the provided address immediately.
+func New(ctx context.Context, network, addr string, cfg *ssh.ClientConfig) (*Watcher, error) {
+	wc := newCloser()
+	rc, err := dialRestartingClient(network, addr, cfg, "gerrit stream-events", func() {
+		wc.Close(context.Background())
+	})
+	if err != nil {
+		return nil, fmt.Errorf("dialRestartingClient: %w", err)
+	}
+	w := &Watcher{
+		closer: wc,
+		c:      rc,
+		output: make(chan gerritevents.Event),
+	}
+	go w.run()
+	return w, nil
+}
diff --git a/fun/clbot/gerrit/watcher_test.go b/fun/clbot/gerrit/watcher_test.go
new file mode 100644
index 000000000000..ae69b2fc4c18
--- /dev/null
+++ b/fun/clbot/gerrit/watcher_test.go
@@ -0,0 +1,190 @@
+package gerrit
+
+import (
+	"context"
+	"crypto/ecdsa"
+	"crypto/elliptic"
+	"crypto/rand"
+	"crypto/subtle"
+	"fmt"
+	"net"
+	"testing"
+	"time"
+
+	"code.tvl.fyi/fun/clbot/gerrit/gerritevents"
+	log "github.com/golang/glog"
+	"github.com/google/go-cmp/cmp"
+	"golang.org/x/crypto/ssh"
+)
+
+var (
+	sshServerSigner, sshServerPublicKey = mustNewKey()
+	sshClientSigner, sshClientPublicKey = mustNewKey()
+)
+
+func mustNewKey() (ssh.Signer, ssh.PublicKey) {
+	key, err := ecdsa.GenerateKey(elliptic.P384(), rand.Reader)
+	if err != nil {
+		panic(err)
+	}
+	signer, err := ssh.NewSignerFromKey(key)
+	if err != nil {
+		panic(err)
+	}
+	publicKey, err := ssh.NewPublicKey(key.Public())
+	if err != nil {
+		panic(err)
+	}
+	return signer, publicKey
+}
+
+func newSSHServer(lines string) (addr string, cleanup func(), err error) {
+	config := &ssh.ServerConfig{
+		PublicKeyCallback: func(c ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) {
+			pkBytes := pubKey.Marshal()
+			wantPKBytes := sshClientPublicKey.Marshal()
+			if subtle.ConstantTimeCompare(pkBytes, wantPKBytes) == 0 {
+				return nil, fmt.Errorf("unauthorized")
+			}
+			return &ssh.Permissions{}, nil
+		},
+	}
+	config.AddHostKey(sshServerSigner)
+
+	ln, err := net.Listen("tcp", ":0")
+	if err != nil {
+		log.Fatalf("Listen on tcp/:0: %v", err)
+	}
+	handle := func(conn net.Conn) {
+		defer conn.Close()
+
+		sc, newchch, newreqch, err := ssh.NewServerConn(conn, config)
+		if err != nil {
+			log.Fatalf("NewServerConn: %v", err)
+		}
+		go ssh.DiscardRequests(newreqch)
+		for newCh := range newchch {
+			if newCh.ChannelType() != "session" {
+				newCh.Reject(ssh.UnknownChannelType, "unknown channel type")
+				continue
+			}
+
+			channel, reqs, err := newCh.Accept()
+			if err != nil {
+				log.Fatalf("Could not accept channel: %v", err)
+			}
+			go func(in <-chan *ssh.Request) {
+				for req := range in {
+					req.Reply(req.Type == "exec", nil)
+				}
+			}(reqs)
+			channel.Write([]byte(lines))
+			sc.SendRequest("goaway", false, nil)
+		}
+	}
+	go func() {
+		for {
+			conn, err := ln.Accept()
+			if err != nil {
+				return
+			}
+			go handle(conn)
+		}
+	}()
+
+	cleanup = func() {
+		ln.Close()
+	}
+	return ln.Addr().String(), cleanup, err
+}
+
+func ts(s string) gerritevents.Time {
+	t, err := time.Parse("2006-01-02 15:04:05 -0700 MST", s)
+	if err != nil {
+		panic(err)
+	}
+	return gerritevents.Time{t}
+}
+
+func optStr(s string) *string { return &s }
+
+func TestWatcher(t *testing.T) {
+	tcs := []struct {
+		name  string
+		lines string
+		want  []gerritevents.Event
+	}{{
+		name: "no events",
+	}, {
+		name: "single test event",
+		lines: `{"author":{"name":"tazjin","email":"mail@tazj.in","username":"tazjin"},"approvals":[{"type":"Code-Review","description":"Code-Review","value":"2","oldValue":"0"}],"comment":"Patch Set 3: Code-Review+2","patchSet":{"number":3,"revision":"6fe272d3f82c6efdfe1167fab98bf918efc03fe5","parents":["d984b6018cf68c7e8b7169b475d90134fbcee767"],"ref":"refs/changes/44/244/3","uploader":{"name":"tazjin","email":"mail@tazj.in","username":"tazjin"},"createdOn":1592081910,"author":{"name":"tazjin","email":"mail@tazj.in","username":"tazjin"},"kind":"REWORK","sizeInsertions":83,"sizeDeletions":-156},"change":{"project":"depot","branch":"master","id":"I546c701145fa204b7ba7518a8a56a783588629e0","number":244,"subject":"refactor(ops/nixos): Move my NixOS configurations to //users/tazjin","owner":{"name":"tazjin","email":"mail@tazj.in","username":"tazjin"},"url":"https://cl.tvl.fyi/c/depot/+/244","commitMessage":"refactor(ops/nixos): Move my NixOS configurations to //users/tazjin\n\nNixOS modules move one level up because it\u0027s unlikely that //ops/nixos\nwill contain actual systems at this point (they\u0027re user-specific).\n\nThis is the first users folder, so it is also added to the root\nreadTree invocation for the repository.\n\nChange-Id: I546c701145fa204b7ba7518a8a56a783588629e0\n","createdOn":1592081577,"status":"NEW"},"project":"depot","refName":"refs/heads/master","changeKey":{"id":"I546c701145fa204b7ba7518a8a56a783588629e0"},"type":"comment-added","eventCreatedOn":1592081929}
+`,
+		want: []gerritevents.Event{
+			&gerritevents.CommentAdded{
+				Type: "comment-added",
+				Change: gerritevents.Change{
+					Project:       "depot",
+					Branch:        "master",
+					ID:            "I546c701145fa204b7ba7518a8a56a783588629e0",
+					Number:        244,
+					Subject:       "refactor(ops/nixos): Move my NixOS configurations to //users/tazjin",
+					Owner:         gerritevents.Account{Name: "tazjin", Email: "mail@tazj.in", Username: "tazjin"},
+					URL:           "https://cl.tvl.fyi/c/depot/+/244",
+					CommitMessage: "refactor(ops/nixos): Move my NixOS configurations to //users/tazjin\n\nNixOS modules move one level up because it's unlikely that //ops/nixos\nwill contain actual systems at this point (they're user-specific).\n\nThis is the first users folder, so it is also added to the root\nreadTree invocation for the repository.\n\nChange-Id: I546c701145fa204b7ba7518a8a56a783588629e0\n",
+					CreatedOn:     ts("2020-06-13 21:52:57 +0100 BST"),
+					Status:        "NEW",
+				},
+				PatchSet: gerritevents.PatchSet{
+					Number:         3,
+					Revision:       "6fe272d3f82c6efdfe1167fab98bf918efc03fe5",
+					Parents:        []string{"d984b6018cf68c7e8b7169b475d90134fbcee767"},
+					Ref:            "refs/changes/44/244/3",
+					Uploader:       gerritevents.Account{Name: "tazjin", Email: "mail@tazj.in", Username: "tazjin"},
+					Author:         gerritevents.Account{Name: "tazjin", Email: "mail@tazj.in", Username: "tazjin"},
+					CreatedOn:      ts("2020-06-13 21:58:30 +0100 BST"),
+					Kind:           "REWORK",
+					SizeInsertions: 83,
+					SizeDeletions:  -156,
+				},
+				Author:         gerritevents.Account{Name: "tazjin", Email: "mail@tazj.in", Username: "tazjin"},
+				Approvals:      []gerritevents.Approval{{Type: "Code-Review", Description: "Code-Review", Value: "2", OldValue: optStr("0")}},
+				Comment:        "Patch Set 3: Code-Review+2",
+				EventCreatedOn: ts("2020-06-13 21:58:49 +0100 BST"),
+			},
+		},
+	}}
+	for _, tc := range tcs {
+		tc := tc
+		t.Run(tc.name, func(t *testing.T) {
+			t.Parallel()
+
+			ctx, cancel := context.WithCancel(context.Background())
+			defer cancel()
+
+			serverAddr, cleanup, err := newSSHServer(tc.lines)
+			if err != nil {
+				t.Fatalf("newSSHServer: %v", err)
+			}
+			t.Cleanup(cleanup)
+
+			config := &ssh.ClientConfig{
+				User:            "bert",
+				Auth:            []ssh.AuthMethod{ssh.PublicKeys(sshClientSigner)},
+				HostKeyCallback: ssh.FixedHostKey(sshServerPublicKey),
+				Timeout:         10 * time.Millisecond,
+			}
+			w, err := New(ctx, "tcp", serverAddr, config)
+			if err != nil {
+				t.Fatalf("New: %v", err)
+			}
+
+			var gotEvents []gerritevents.Event
+			for ev := range w.Events() {
+				gotEvents = append(gotEvents, ev)
+			}
+			if diff := cmp.Diff(gotEvents, tc.want); diff != "" {
+				t.Errorf("got events != want events: diff:\n%v", diff)
+			}
+		})
+	}
+}