diff options
Diffstat (limited to 'tools/nixery/builder/builder.go')
-rw-r--r-- | tools/nixery/builder/builder.go | 92 |
1 files changed, 50 insertions, 42 deletions
diff --git a/tools/nixery/builder/builder.go b/tools/nixery/builder/builder.go index 901373b57e99..7f0bd7fffdb9 100644 --- a/tools/nixery/builder/builder.go +++ b/tools/nixery/builder/builder.go @@ -26,6 +26,7 @@ import ( "github.com/google/nixery/layers" "github.com/google/nixery/manifest" "github.com/google/nixery/storage" + "github.com/im7mortal/kmutex" log "github.com/sirupsen/logrus" ) @@ -37,10 +38,11 @@ const LayerBudget int = 94 // State holds the runtime state that is carried around in Nixery and // passed to builder functions. type State struct { - Storage storage.Backend - Cache *LocalCache - Cfg config.Config - Pop layers.Popularity + Storage storage.Backend + Cache *LocalCache + Cfg config.Config + Pop layers.Popularity + UploadMutex *kmutex.Kmutex } // Architecture represents the possible CPU architectures for which @@ -292,30 +294,19 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes // Missing layers are built and uploaded to the storage // bucket. for _, l := range grouped { - if entry, cached := layerFromCache(ctx, s, l.Hash()); cached { - entries = append(entries, *entry) - } else { - lh := l.Hash() - - // While packing store paths, the SHA sum of - // the uncompressed layer is computed and - // written to `tarhash`. - // - // TODO(tazjin): Refactor this to make the - // flow of data cleaner. - var tarhash string - lw := func(w io.Writer) error { - var err error - tarhash, err = packStorePaths(&l, w) - return err - } - - entry, err := uploadHashLayer(ctx, s, lh, lw) + lh := l.Hash() + + // While packing store paths, the SHA sum of + // the uncompressed layer is computed and + // written to `tarhash`. + // + // TODO(tazjin): Refactor this to make the + // flow of data cleaner. + lw := func(w io.Writer) (string, error) { + tarhash, err := packStorePaths(&l, w) if err != nil { - return nil, err + return "", err } - entry.MergeRating = l.MergeRating - entry.TarHash = tarhash var pkgs []string for _, p := range l.Contents { @@ -328,15 +319,21 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes "tarhash": tarhash, }).Info("created image layer") - go cacheLayer(ctx, s, l.Hash(), *entry) - entries = append(entries, *entry) + return tarhash, err + } + + entry, err := uploadHashLayer(ctx, s, lh, l.MergeRating, lw) + if err != nil { + return nil, err } + + entries = append(entries, *entry) } // Symlink layer (built in the first Nix build) needs to be // included here manually: slkey := result.SymlinkLayer.TarHash - entry, err := uploadHashLayer(ctx, s, slkey, func(w io.Writer) error { + entry, err := uploadHashLayer(ctx, s, slkey, 0, func(w io.Writer) (string, error) { f, err := os.Open(result.SymlinkLayer.Path) if err != nil { log.WithError(err).WithFields(log.Fields{ @@ -345,7 +342,7 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes "layer": slkey, }).Error("failed to open symlink layer") - return err + return "", err } defer f.Close() @@ -358,18 +355,16 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes "layer": slkey, }).Error("failed to upload symlink layer") - return err + return "", err } - return gz.Close() + return "sha256:" + slkey, gz.Close() }) if err != nil { return nil, err } - entry.TarHash = "sha256:" + result.SymlinkLayer.TarHash - go cacheLayer(ctx, s, slkey, *entry) entries = append(entries, *entry) return entries, nil @@ -380,7 +375,7 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes // // This type exists to avoid duplication between the handling of // symlink layers and store path layers. -type layerWriter func(w io.Writer) error +type layerWriter func(w io.Writer) (string, error) // byteCounter is a special io.Writer that counts all bytes written to // it and does nothing else. @@ -408,8 +403,16 @@ func (b *byteCounter) Write(p []byte) (n int, err error) { // // The return value is the layer's SHA256 hash, which is used in the // image manifest. -func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) (*manifest.Entry, error) { +func uploadHashLayer(ctx context.Context, s *State, key string, mrating uint64, lw layerWriter) (*manifest.Entry, error) { + s.UploadMutex.Lock(key) + defer s.UploadMutex.Unlock(key) + + if entry, cached := layerFromCache(ctx, s, key); cached { + return entry, nil + } + path := "staging/" + key + var tarhash string sha256sum, size, err := s.Storage.Persist(ctx, path, manifest.LayerType, func(sw io.Writer) (string, int64, error) { // Sets up a "multiwriter" that simultaneously runs both hash // algorithms and uploads to the storage backend. @@ -417,7 +420,8 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) counter := &byteCounter{} multi := io.MultiWriter(sw, shasum, counter) - err := lw(multi) + var err error + tarhash, err = lw(multi) sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{})) return sha256sum, counter.count, err @@ -449,10 +453,14 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) }).Info("created and persisted layer") entry := manifest.Entry{ - Digest: "sha256:" + sha256sum, - Size: size, + Digest: "sha256:" + sha256sum, + Size: size, + TarHash: tarhash, + MergeRating: mrating, } + cacheLayer(ctx, s, key, entry) + return &entry, nil } @@ -493,13 +501,13 @@ func BuildImage(ctx context.Context, s *State, image *Image) (*BuildResult, erro } m, c := manifest.Manifest(image.Arch.imageArch, layers, cmd) - lw := func(w io.Writer) error { + lw := func(w io.Writer) (string, error) { r := bytes.NewReader(c.Config) _, err := io.Copy(w, r) - return err + return "", err } - if _, err = uploadHashLayer(ctx, s, c.SHA256, lw); err != nil { + if _, err = uploadHashLayer(ctx, s, c.SHA256, 0, lw); err != nil { log.WithError(err).WithFields(log.Fields{ "image": image.Name, "tag": image.Tag, |