diff options
Diffstat (limited to 'tools/nixery/storage')
-rw-r--r-- | tools/nixery/storage/filesystem.go | 110 | ||||
-rw-r--r-- | tools/nixery/storage/gcs.go | 242 | ||||
-rw-r--r-- | tools/nixery/storage/storage.go | 51 |
3 files changed, 403 insertions, 0 deletions
diff --git a/tools/nixery/storage/filesystem.go b/tools/nixery/storage/filesystem.go new file mode 100644 index 000000000000..2be3457f324a --- /dev/null +++ b/tools/nixery/storage/filesystem.go @@ -0,0 +1,110 @@ +// Copyright 2019 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +// Filesystem storage backend for Nixery. +package storage + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path" + + "github.com/pkg/xattr" + log "github.com/sirupsen/logrus" +) + +type FSBackend struct { + path string +} + +func NewFSBackend() (*FSBackend, error) { + p := os.Getenv("STORAGE_PATH") + if p == "" { + return nil, fmt.Errorf("STORAGE_PATH must be set for filesystem storage") + } + + p = path.Clean(p) + err := os.MkdirAll(p, 0755) + if err != nil { + return nil, fmt.Errorf("failed to create storage dir: %s", err) + } + + return &FSBackend{p}, nil +} + +func (b *FSBackend) Name() string { + return fmt.Sprintf("Filesystem (%s)", b.path) +} + +func (b *FSBackend) Persist(ctx context.Context, key, contentType string, f Persister) (string, int64, error) { + full := path.Join(b.path, key) + dir := path.Dir(full) + err := os.MkdirAll(dir, 0755) + if err != nil { + log.WithError(err).WithField("path", dir).Error("failed to create storage directory") + return "", 0, err + } + + file, err := os.OpenFile(full, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + log.WithError(err).WithField("file", full).Error("failed to write file") + return "", 0, err + } + defer file.Close() + + err = xattr.Set(full, "user.mime_type", []byte(contentType)) + if err != nil { + log.WithError(err).WithField("file", full).Error("failed to store file type in xattrs") + return "", 0, err + } + + return f(file) +} + +func (b *FSBackend) Fetch(ctx context.Context, key string) (io.ReadCloser, error) { + full := path.Join(b.path, key) + return os.Open(full) +} + +func (b *FSBackend) Move(ctx context.Context, old, new string) error { + newpath := path.Join(b.path, new) + err := os.MkdirAll(path.Dir(newpath), 0755) + if err != nil { + return err + } + + return os.Rename(path.Join(b.path, old), newpath) +} + +func (b *FSBackend) Serve(digest string, r *http.Request, w http.ResponseWriter) error { + p := path.Join(b.path, "layers", digest) + + log.WithFields(log.Fields{ + "digest": digest, + "path": p, + }).Info("serving blob from filesystem") + + contentType, err := xattr.Get(p, "user.mime_type") + if err != nil { + log.WithError(err).WithField("file", p).Error("failed to read file type from xattrs") + return err + } + w.Header().Add("Content-Type", string(contentType)) + + http.ServeFile(w, r, p) + return nil +} diff --git a/tools/nixery/storage/gcs.go b/tools/nixery/storage/gcs.go new file mode 100644 index 000000000000..a4bb4ba31f67 --- /dev/null +++ b/tools/nixery/storage/gcs.go @@ -0,0 +1,242 @@ +// Copyright 2019 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +// Google Cloud Storage backend for Nixery. +package storage + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "time" + + "cloud.google.com/go/storage" + log "github.com/sirupsen/logrus" + "golang.org/x/oauth2/google" +) + +// HTTP client to use for direct calls to APIs that are not part of the SDK +var client = &http.Client{} + +// API scope needed for renaming objects in GCS +const gcsScope = "https://www.googleapis.com/auth/devstorage.read_write" + +type GCSBackend struct { + bucket string + handle *storage.BucketHandle + signing *storage.SignedURLOptions +} + +// Constructs a new GCS bucket backend based on the configured +// environment variables. +func NewGCSBackend() (*GCSBackend, error) { + bucket := os.Getenv("GCS_BUCKET") + if bucket == "" { + return nil, fmt.Errorf("GCS_BUCKET must be configured for GCS usage") + } + + ctx := context.Background() + client, err := storage.NewClient(ctx) + if err != nil { + log.WithError(err).Fatal("failed to set up Cloud Storage client") + } + + handle := client.Bucket(bucket) + + if _, err := handle.Attrs(ctx); err != nil { + log.WithError(err).WithField("bucket", bucket).Error("could not access configured bucket") + return nil, err + } + + signing, err := signingOptsFromEnv() + if err != nil { + log.WithError(err).Error("failed to configure GCS bucket signing") + return nil, err + } + + return &GCSBackend{ + bucket: bucket, + handle: handle, + signing: signing, + }, nil +} + +func (b *GCSBackend) Name() string { + return "Google Cloud Storage (" + b.bucket + ")" +} + +func (b *GCSBackend) Persist(ctx context.Context, path, contentType string, f Persister) (string, int64, error) { + obj := b.handle.Object(path) + w := obj.NewWriter(ctx) + + hash, size, err := f(w) + if err != nil { + log.WithError(err).WithField("path", path).Error("failed to write to GCS") + return hash, size, err + } + + err = w.Close() + if err != nil { + log.WithError(err).WithField("path", path).Error("failed to complete GCS upload") + return hash, size, err + } + + // GCS natively supports content types for objects, which will be + // used when serving them back. + if contentType != "" { + _, err = obj.Update(ctx, storage.ObjectAttrsToUpdate{ + ContentType: contentType, + }) + + if err != nil { + log.WithError(err).WithField("path", path).Error("failed to update object attrs") + return hash, size, err + } + } + + return hash, size, nil +} + +func (b *GCSBackend) Fetch(ctx context.Context, path string) (io.ReadCloser, error) { + obj := b.handle.Object(path) + + // Probe whether the file exists before trying to fetch it + _, err := obj.Attrs(ctx) + if err != nil { + return nil, err + } + + return obj.NewReader(ctx) +} + +// renameObject renames an object in the specified Cloud Storage +// bucket. +// +// The Go API for Cloud Storage does not support renaming objects, but +// the HTTP API does. The code below makes the relevant call manually. +func (b *GCSBackend) Move(ctx context.Context, old, new string) error { + creds, err := google.FindDefaultCredentials(ctx, gcsScope) + if err != nil { + return err + } + + token, err := creds.TokenSource.Token() + if err != nil { + return err + } + + // as per https://cloud.google.com/storage/docs/renaming-copying-moving-objects#rename + url := fmt.Sprintf( + "https://www.googleapis.com/storage/v1/b/%s/o/%s/rewriteTo/b/%s/o/%s", + url.PathEscape(b.bucket), url.PathEscape(old), + url.PathEscape(b.bucket), url.PathEscape(new), + ) + + req, err := http.NewRequest("POST", url, nil) + req.Header.Add("Authorization", "Bearer "+token.AccessToken) + _, err = client.Do(req) + if err != nil { + return err + } + + // It seems that 'rewriteTo' copies objects instead of + // renaming/moving them, hence a deletion call afterwards is + // required. + if err = b.handle.Object(old).Delete(ctx); err != nil { + log.WithError(err).WithFields(log.Fields{ + "new": new, + "old": old, + }).Warn("failed to delete renamed object") + + // this error should not break renaming and is not returned + } + + return nil +} + +func (b *GCSBackend) Serve(digest string, r *http.Request, w http.ResponseWriter) error { + url, err := b.constructLayerUrl(digest) + if err != nil { + log.WithError(err).WithFields(log.Fields{ + "digest": digest, + "bucket": b.bucket, + }).Error("failed to sign GCS URL") + + return err + } + + log.WithField("digest", digest).Info("redirecting blob request to GCS bucket") + + w.Header().Set("Location", url) + w.WriteHeader(303) + return nil +} + +// Configure GCS URL signing in the presence of a service account key +// (toggled if the user has set GOOGLE_APPLICATION_CREDENTIALS). +func signingOptsFromEnv() (*storage.SignedURLOptions, error) { + path := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") + if path == "" { + // No credentials configured -> no URL signing + return nil, nil + } + + key, err := ioutil.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read service account key: %s", err) + } + + conf, err := google.JWTConfigFromJSON(key) + if err != nil { + return nil, fmt.Errorf("failed to parse service account key: %s", err) + } + + log.WithField("account", conf.Email).Info("GCS URL signing enabled") + + return &storage.SignedURLOptions{ + Scheme: storage.SigningSchemeV4, + GoogleAccessID: conf.Email, + PrivateKey: conf.PrivateKey, + Method: "GET", + }, nil +} + +// layerRedirect constructs the public URL of the layer object in the Cloud +// Storage bucket, signs it and redirects the user there. +// +// Signing the URL allows unauthenticated clients to retrieve objects from the +// bucket. +// +// In case signing is not configured, a redirect to storage.googleapis.com is +// issued, which means the underlying bucket objects need to be publicly +// accessible. +// +// The Docker client is known to follow redirects, but this might not be true +// for all other registry clients. +func (b *GCSBackend) constructLayerUrl(digest string) (string, error) { + log.WithField("layer", digest).Info("redirecting layer request to bucket") + object := "layers/" + digest + + if b.signing != nil { + opts := *b.signing + opts.Expires = time.Now().Add(5 * time.Minute) + return storage.SignedURL(b.bucket, object, &opts) + } else { + return ("https://storage.googleapis.com/" + b.bucket + "/" + object), nil + } +} diff --git a/tools/nixery/storage/storage.go b/tools/nixery/storage/storage.go new file mode 100644 index 000000000000..fd496f440ae3 --- /dev/null +++ b/tools/nixery/storage/storage.go @@ -0,0 +1,51 @@ +// Copyright 2019-2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +// Package storage implements an interface that can be implemented by +// storage backends, such as Google Cloud Storage or the local +// filesystem. +package storage + +import ( + "context" + "io" + "net/http" +) + +type Persister = func(io.Writer) (string, int64, error) + +type Backend interface { + // Name returns the name of the storage backend, for use in + // log messages and such. + Name() string + + // Persist provides a user-supplied function with a writer + // that stores data in the storage backend. + // + // It needs to return the SHA256 hash of the data written as + // well as the total number of bytes, as those are required + // for the image manifest. + Persist(ctx context.Context, path, contentType string, f Persister) (string, int64, error) + + // Fetch retrieves data from the storage backend. + Fetch(ctx context.Context, path string) (io.ReadCloser, error) + + // Move renames a path inside the storage backend. This is + // used for staging uploads while calculating their hashes. + Move(ctx context.Context, old, new string) error + + // Serve provides a handler function to serve HTTP requests + // for objects in the storage backend. + Serve(digest string, r *http.Request, w http.ResponseWriter) error +} |