// Copyright 2022 The TVL Contributors
// SPDX-License-Identifier: Apache-2.0
// Google Cloud Storage backend for Nixery.
package storage
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"time"
"cloud.google.com/go/storage"
log "github.com/sirupsen/logrus"
"golang.org/x/oauth2/google"
)
// HTTP client to use for direct calls to APIs that are not part of the SDK
var client = &http.Client{}
// API scope needed for renaming objects in GCS
const gcsScope = "https://www.googleapis.com/auth/devstorage.read_write"
type GCSBackend struct {
bucket string
handle *storage.BucketHandle
signing *storage.SignedURLOptions
}
// Constructs a new GCS bucket backend based on the configured
// environment variables.
func NewGCSBackend() (*GCSBackend, error) {
bucket := os.Getenv("GCS_BUCKET")
if bucket == "" {
return nil, fmt.Errorf("GCS_BUCKET must be configured for GCS usage")
}
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
log.WithError(err).Fatal("failed to set up Cloud Storage client")
}
handle := client.Bucket(bucket)
if _, err := handle.Attrs(ctx); err != nil {
log.WithError(err).WithField("bucket", bucket).Error("could not access configured bucket")
return nil, err
}
signing, err := signingOptsFromEnv()
if err != nil {
log.WithError(err).Error("failed to configure GCS bucket signing")
return nil, err
}
return &GCSBackend{
bucket: bucket,
handle: handle,
signing: signing,
}, nil
}
func (b *GCSBackend) Name() string {
return "Google Cloud Storage (" + b.bucket + ")"
}
func (b *GCSBackend) Persist(ctx context.Context, path, contentType string, f Persister) (string, int64, error) {
obj := b.handle.Object(path)
w := obj.NewWriter(ctx)
hash, size, err := f(w)
if err != nil {
log.WithError(err).WithField("path", path).Error("failed to write to GCS")
return hash, size, err
}
err = w.Close()
if err != nil {
log.WithError(err).WithField("path", path).Error("failed to complete GCS upload")
return hash, size, err
}
// GCS natively supports content types for objects, which will be
// used when serving them back.
if contentType != "" {
_, err = obj.Update(ctx, storage.ObjectAttrsToUpdate{
ContentType: contentType,
})
if err != nil {
log.WithError(err).WithField("path", path).Error("failed to update object attrs")
return hash, size, err
}
}
return hash, size, nil
}
func (b *GCSBackend) Fetch(ctx context.Context, path string) (io.ReadCloser, error) {
obj := b.handle.Object(path)
// Probe whether the file exists before trying to fetch it
_, err := obj.Attrs(ctx)
if err != nil {
return nil, err
}
return obj.NewReader(ctx)
}
// renameObject renames an object in the specified Cloud Storage
// bucket.
//
// The Go API for Cloud Storage does not support renaming objects, but
// the HTTP API does. The code below makes the relevant call manually.
func (b *GCSBackend) Move(ctx context.Context, old, new string) error {
creds, err := google.FindDefaultCredentials(ctx, gcsScope)
if err != nil {
return err
}
token, err := creds.TokenSource.Token()
if err != nil {
return err
}
// as per https://cloud.google.com/storage/docs/renaming-copying-moving-objects#rename
url := fmt.Sprintf(
"https://www.googleapis.com/storage/v1/b/%s/o/%s/rewriteTo/b/%s/o/%s",
url.PathEscape(b.bucket), url.PathEscape(old),
url.PathEscape(b.bucket), url.PathEscape(new),
)
req, err := http.NewRequest("POST", url, nil)
req.Header.Add("Authorization", "Bearer "+token.AccessToken)
_, err = client.Do(req)
if err != nil {
return err
}
// It seems that 'rewriteTo' copies objects instead of
// renaming/moving them, hence a deletion call afterwards is
// required.
if err = b.handle.Object(old).Delete(ctx); err != nil {
log.WithError(err).WithFields(log.Fields{
"new": new,
"old": old,
}).Warn("failed to delete renamed object")
// this error should not break renaming and is not returned
}
return nil
}
func (b *GCSBackend) Serve(digest string, r *http.Request, w http.ResponseWriter) error {
url, err := b.constructLayerUrl(digest)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"digest": digest,
"bucket": b.bucket,
}).Error("failed to sign GCS URL")
return err
}
log.WithField("digest", digest).Info("redirecting blob request to GCS bucket")
w.Header().Set("Location", url)
w.WriteHeader(303)
return nil
}
// Configure GCS URL signing in the presence of a service account key
// (toggled if the user has set GOOGLE_APPLICATION_CREDENTIALS).
func signingOptsFromEnv() (*storage.SignedURLOptions, error) {
path := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
if path == "" {
// No credentials configured -> no URL signing
return nil, nil
}
key, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read service account key: %s", err)
}
conf, err := google.JWTConfigFromJSON(key)
if err != nil {
return nil, fmt.Errorf("failed to parse service account key: %s", err)
}
log.WithField("account", conf.Email).Info("GCS URL signing enabled")
return &storage.SignedURLOptions{
Scheme: storage.SigningSchemeV4,
GoogleAccessID: conf.Email,
PrivateKey: conf.PrivateKey,
Method: "GET",
}, nil
}
// layerRedirect constructs the public URL of the layer object in the Cloud
// Storage bucket, signs it and redirects the user there.
//
// Signing the URL allows unauthenticated clients to retrieve objects from the
// bucket.
//
// In case signing is not configured, a redirect to storage.googleapis.com is
// issued, which means the underlying bucket objects need to be publicly
// accessible.
//
// The Docker client is known to follow redirects, but this might not be true
// for all other registry clients.
func (b *GCSBackend) constructLayerUrl(digest string) (string, error) {
log.WithField("layer", digest).Info("redirecting layer request to bucket")
object := "layers/" + digest
if b.signing != nil {
opts := *b.signing
opts.Expires = time.Now().Add(5 * time.Minute)
return storage.SignedURL(b.bucket, object, &opts)
} else {
return ("https://storage.googleapis.com/" + b.bucket + "/" + object), nil
}
}