about summary refs log tree commit diff
path: root/tools/nixery/server/builder
diff options
context:
space:
mode:
authorVincent Ambo <tazjin@google.com>2019-10-27T15·49+0100
committerVincent Ambo <github@tazj.in>2019-10-28T21·31+0100
commitf7d16c5d454ea2aee65e7180e19a9bb891178bbb (patch)
tree88b2e8aa87f2b7499cd71edc7f0ac5a355b39706 /tools/nixery/server/builder
parentffe58d6cb510d0274ea1afed3e0e2b44c69e32c2 (diff)
refactor(server): Introduce pluggable interface for storage backends
This abstracts over the functionality of Google Cloud Storage and
other potential underlying storage backends to make it possible to
replace these in Nixery.

The GCS backend is not yet reimplemented.
Diffstat (limited to 'tools/nixery/server/builder')
-rw-r--r--tools/nixery/server/builder/builder.go101
-rw-r--r--tools/nixery/server/builder/cache.go94
2 files changed, 63 insertions, 132 deletions
diff --git a/tools/nixery/server/builder/builder.go b/tools/nixery/server/builder/builder.go
index 2a3aa182ac14..e2982b993dac 100644
--- a/tools/nixery/server/builder/builder.go
+++ b/tools/nixery/server/builder/builder.go
@@ -28,18 +28,16 @@ import (
 	"io"
 	"io/ioutil"
 	"net/http"
-	"net/url"
 	"os"
 	"os/exec"
 	"sort"
 	"strings"
 
-	"cloud.google.com/go/storage"
 	"github.com/google/nixery/server/config"
 	"github.com/google/nixery/server/layers"
 	"github.com/google/nixery/server/manifest"
+	"github.com/google/nixery/server/storage"
 	log "github.com/sirupsen/logrus"
-	"golang.org/x/oauth2/google"
 )
 
 // The maximum number of layers in an image is 125. To allow for
@@ -47,19 +45,16 @@ import (
 // use up is set at a lower point.
 const LayerBudget int = 94
 
-// API scope needed for renaming objects in GCS
-const gcsScope = "https://www.googleapis.com/auth/devstorage.read_write"
-
 // HTTP client to use for direct calls to APIs that are not part of the SDK
 var client = &http.Client{}
 
 // State holds the runtime state that is carried around in Nixery and
 // passed to builder functions.
 type State struct {
-	Bucket *storage.BucketHandle
-	Cache  *LocalCache
-	Cfg    config.Config
-	Pop    layers.Popularity
+	Storage storage.Backend
+	Cache   *LocalCache
+	Cfg     config.Config
+	Pop     layers.Popularity
 }
 
 // Image represents the information necessary for building a container image.
@@ -349,53 +344,6 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
 	return entries, nil
 }
 
-// renameObject renames an object in the specified Cloud Storage
-// bucket.
-//
-// The Go API for Cloud Storage does not support renaming objects, but
-// the HTTP API does. The code below makes the relevant call manually.
-func renameObject(ctx context.Context, s *State, old, new string) error {
-	bucket := s.Cfg.Bucket
-
-	creds, err := google.FindDefaultCredentials(ctx, gcsScope)
-	if err != nil {
-		return err
-	}
-
-	token, err := creds.TokenSource.Token()
-	if err != nil {
-		return err
-	}
-
-	// as per https://cloud.google.com/storage/docs/renaming-copying-moving-objects#rename
-	url := fmt.Sprintf(
-		"https://www.googleapis.com/storage/v1/b/%s/o/%s/rewriteTo/b/%s/o/%s",
-		url.PathEscape(bucket), url.PathEscape(old),
-		url.PathEscape(bucket), url.PathEscape(new),
-	)
-
-	req, err := http.NewRequest("POST", url, nil)
-	req.Header.Add("Authorization", "Bearer "+token.AccessToken)
-	_, err = client.Do(req)
-	if err != nil {
-		return err
-	}
-
-	// It seems that 'rewriteTo' copies objects instead of
-	// renaming/moving them, hence a deletion call afterwards is
-	// required.
-	if err = s.Bucket.Object(old).Delete(ctx); err != nil {
-		log.WithError(err).WithFields(log.Fields{
-			"new": new,
-			"old": old,
-		}).Warn("failed to delete renamed object")
-
-		// this error should not break renaming and is not returned
-	}
-
-	return nil
-}
-
 // layerWriter is the type for functions that can write a layer to the
 // multiwriter used for uploading & hashing.
 //
@@ -430,33 +378,32 @@ func (b *byteCounter) Write(p []byte) (n int, err error) {
 // The return value is the layer's SHA256 hash, which is used in the
 // image manifest.
 func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) (*manifest.Entry, error) {
-	staging := s.Bucket.Object("staging/" + key)
-
-	// Sets up a "multiwriter" that simultaneously runs both hash
-	// algorithms and uploads to the bucket
-	sw := staging.NewWriter(ctx)
-	shasum := sha256.New()
-	counter := &byteCounter{}
-	multi := io.MultiWriter(sw, shasum, counter)
+	path := "staging/" + key
+	sha256sum, size, err := s.Storage.Persist(path, func(sw io.Writer) (string, int64, error) {
+		// Sets up a "multiwriter" that simultaneously runs both hash
+		// algorithms and uploads to the storage backend.
+		shasum := sha256.New()
+		counter := &byteCounter{}
+		multi := io.MultiWriter(sw, shasum, counter)
+
+		err := lw(multi)
+		sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{}))
+
+		return sha256sum, counter.count, err
+	})
 
-	err := lw(multi)
 	if err != nil {
-		log.WithError(err).WithField("layer", key).
-			Error("failed to create and upload layer")
+		log.WithError(err).WithFields(log.Fields{
+			"layer":   key,
+			"backend": s.Storage.Name(),
+		}).Error("failed to create and store layer")
 
 		return nil, err
 	}
 
-	if err = sw.Close(); err != nil {
-		log.WithError(err).WithField("layer", key).
-			Error("failed to upload layer to staging")
-	}
-
-	sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{}))
-
 	// Hashes are now known and the object is in the bucket, what
 	// remains is to move it to the correct location and cache it.
-	err = renameObject(ctx, s, "staging/"+key, "layers/"+sha256sum)
+	err = s.Storage.Move("staging/"+key, "layers/"+sha256sum)
 	if err != nil {
 		log.WithError(err).WithField("layer", key).
 			Error("failed to move layer from staging")
@@ -464,8 +411,6 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter)
 		return nil, err
 	}
 
-	size := counter.count
-
 	log.WithFields(log.Fields{
 		"layer":  key,
 		"sha256": sha256sum,
diff --git a/tools/nixery/server/builder/cache.go b/tools/nixery/server/builder/cache.go
index 88bf30de4812..2af214cd9188 100644
--- a/tools/nixery/server/builder/cache.go
+++ b/tools/nixery/server/builder/cache.go
@@ -114,24 +114,18 @@ func (c *LocalCache) localCacheLayer(key string, e manifest.Entry) {
 }
 
 // Retrieve a manifest from the cache(s). First the local cache is
-// checked, then the GCS-bucket cache.
+// checked, then the storage backend.
 func manifestFromCache(ctx context.Context, s *State, key string) (json.RawMessage, bool) {
 	if m, cached := s.Cache.manifestFromLocalCache(key); cached {
 		return m, true
 	}
 
-	obj := s.Bucket.Object("manifests/" + key)
-
-	// Probe whether the file exists before trying to fetch it.
-	_, err := obj.Attrs(ctx)
+	r, err := s.Storage.Fetch("manifests/" + key)
 	if err != nil {
-		return nil, false
-	}
-
-	r, err := obj.NewReader(ctx)
-	if err != nil {
-		log.WithError(err).WithField("manifest", key).
-			Error("failed to retrieve manifest from bucket cache")
+		log.WithError(err).WithFields(log.Fields{
+			"manifest": key,
+			"backend":  s.Storage.Name(),
+		})
 
 		return nil, false
 	}
@@ -139,8 +133,10 @@ func manifestFromCache(ctx context.Context, s *State, key string) (json.RawMessa
 
 	m, err := ioutil.ReadAll(r)
 	if err != nil {
-		log.WithError(err).WithField("manifest", key).
-			Error("failed to read cached manifest from bucket")
+		log.WithError(err).WithFields(log.Fields{
+			"manifest": key,
+			"backend":  s.Storage.Name(),
+		}).Error("failed to read cached manifest from storage backend")
 
 		return nil, false
 	}
@@ -155,21 +151,17 @@ func manifestFromCache(ctx context.Context, s *State, key string) (json.RawMessa
 func cacheManifest(ctx context.Context, s *State, key string, m json.RawMessage) {
 	go s.Cache.localCacheManifest(key, m)
 
-	obj := s.Bucket.Object("manifests/" + key)
-	w := obj.NewWriter(ctx)
-	r := bytes.NewReader([]byte(m))
+	path := "manifests/" + key
+	_, size, err := s.Storage.Persist(path, func(w io.Writer) (string, int64, error) {
+		size, err := io.Copy(w, bytes.NewReader([]byte(m)))
+		return "", size, err
+	})
 
-	size, err := io.Copy(w, r)
 	if err != nil {
-		log.WithError(err).WithField("manifest", key).
-			Error("failed to cache manifest to GCS")
-
-		return
-	}
-
-	if err = w.Close(); err != nil {
-		log.WithError(err).WithField("manifest", key).
-			Error("failed to cache manifest to GCS")
+		log.WithError(err).WithFields(log.Fields{
+			"manifest": key,
+			"backend":  s.Storage.Name(),
+		}).Error("failed to cache manifest to storage backend")
 
 		return
 	}
@@ -177,7 +169,8 @@ func cacheManifest(ctx context.Context, s *State, key string, m json.RawMessage)
 	log.WithFields(log.Fields{
 		"manifest": key,
 		"size":     size,
-	}).Info("cached manifest to GCS")
+		"backend":  s.Storage.Name(),
+	}).Info("cached manifest to storage backend")
 }
 
 // Retrieve a layer build from the cache, first checking the local
@@ -187,16 +180,12 @@ func layerFromCache(ctx context.Context, s *State, key string) (*manifest.Entry,
 		return entry, true
 	}
 
-	obj := s.Bucket.Object("builds/" + key)
-	_, err := obj.Attrs(ctx)
-	if err != nil {
-		return nil, false
-	}
-
-	r, err := obj.NewReader(ctx)
+	r, err := s.Storage.Fetch("builds/" + key)
 	if err != nil {
-		log.WithError(err).WithField("layer", key).
-			Error("failed to retrieve cached layer from GCS")
+		log.WithError(err).WithFields(log.Fields{
+			"layer":   key,
+			"backend": s.Storage.Name(),
+		}).Warn("failed to retrieve cached layer from storage backend")
 
 		return nil, false
 	}
@@ -205,8 +194,10 @@ func layerFromCache(ctx context.Context, s *State, key string) (*manifest.Entry,
 	jb := bytes.NewBuffer([]byte{})
 	_, err = io.Copy(jb, r)
 	if err != nil {
-		log.WithError(err).WithField("layer", key).
-			Error("failed to read cached layer from GCS")
+		log.WithError(err).WithFields(log.Fields{
+			"layer":   key,
+			"backend": s.Storage.Name(),
+		}).Error("failed to read cached layer from storage backend")
 
 		return nil, false
 	}
@@ -227,24 +218,19 @@ func layerFromCache(ctx context.Context, s *State, key string) (*manifest.Entry,
 func cacheLayer(ctx context.Context, s *State, key string, entry manifest.Entry) {
 	s.Cache.localCacheLayer(key, entry)
 
-	obj := s.Bucket.Object("builds/" + key)
-
 	j, _ := json.Marshal(&entry)
+	path := "builds/" + key
+	_, _, err := s.Storage.Persist(path, func(w io.Writer) (string, int64, error) {
+		size, err := io.Copy(w, bytes.NewReader(j))
+		return "", size, err
+	})
 
-	w := obj.NewWriter(ctx)
-
-	_, err := io.Copy(w, bytes.NewReader(j))
 	if err != nil {
-		log.WithError(err).WithField("layer", key).
-			Error("failed to cache layer")
-
-		return
+		log.WithError(err).WithFields(log.Fields{
+			"layer":   key,
+			"backend": s.Storage.Name(),
+		}).Error("failed to cache layer")
 	}
 
-	if err = w.Close(); err != nil {
-		log.WithError(err).WithField("layer", key).
-			Error("failed to cache layer")
-
-		return
-	}
+	return
 }