about summary refs log tree commit diff
path: root/tools/nixery/builder
diff options
context:
space:
mode:
Diffstat (limited to 'tools/nixery/builder')
-rw-r--r--tools/nixery/builder/builder.go92
1 files changed, 50 insertions, 42 deletions
diff --git a/tools/nixery/builder/builder.go b/tools/nixery/builder/builder.go
index 901373b57e99..7f0bd7fffdb9 100644
--- a/tools/nixery/builder/builder.go
+++ b/tools/nixery/builder/builder.go
@@ -26,6 +26,7 @@ import (
 	"github.com/google/nixery/layers"
 	"github.com/google/nixery/manifest"
 	"github.com/google/nixery/storage"
+	"github.com/im7mortal/kmutex"
 	log "github.com/sirupsen/logrus"
 )
 
@@ -37,10 +38,11 @@ const LayerBudget int = 94
 // State holds the runtime state that is carried around in Nixery and
 // passed to builder functions.
 type State struct {
-	Storage storage.Backend
-	Cache   *LocalCache
-	Cfg     config.Config
-	Pop     layers.Popularity
+	Storage     storage.Backend
+	Cache       *LocalCache
+	Cfg         config.Config
+	Pop         layers.Popularity
+	UploadMutex *kmutex.Kmutex
 }
 
 // Architecture represents the possible CPU architectures for which
@@ -292,30 +294,19 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
 	// Missing layers are built and uploaded to the storage
 	// bucket.
 	for _, l := range grouped {
-		if entry, cached := layerFromCache(ctx, s, l.Hash()); cached {
-			entries = append(entries, *entry)
-		} else {
-			lh := l.Hash()
-
-			// While packing store paths, the SHA sum of
-			// the uncompressed layer is computed and
-			// written to `tarhash`.
-			//
-			// TODO(tazjin): Refactor this to make the
-			// flow of data cleaner.
-			var tarhash string
-			lw := func(w io.Writer) error {
-				var err error
-				tarhash, err = packStorePaths(&l, w)
-				return err
-			}
-
-			entry, err := uploadHashLayer(ctx, s, lh, lw)
+		lh := l.Hash()
+
+		// While packing store paths, the SHA sum of
+		// the uncompressed layer is computed and
+		// written to `tarhash`.
+		//
+		// TODO(tazjin): Refactor this to make the
+		// flow of data cleaner.
+		lw := func(w io.Writer) (string, error) {
+			tarhash, err := packStorePaths(&l, w)
 			if err != nil {
-				return nil, err
+				return "", err
 			}
-			entry.MergeRating = l.MergeRating
-			entry.TarHash = tarhash
 
 			var pkgs []string
 			for _, p := range l.Contents {
@@ -328,15 +319,21 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
 				"tarhash":  tarhash,
 			}).Info("created image layer")
 
-			go cacheLayer(ctx, s, l.Hash(), *entry)
-			entries = append(entries, *entry)
+			return tarhash, err
+		}
+
+		entry, err := uploadHashLayer(ctx, s, lh, l.MergeRating, lw)
+		if err != nil {
+			return nil, err
 		}
+
+		entries = append(entries, *entry)
 	}
 
 	// Symlink layer (built in the first Nix build) needs to be
 	// included here manually:
 	slkey := result.SymlinkLayer.TarHash
-	entry, err := uploadHashLayer(ctx, s, slkey, func(w io.Writer) error {
+	entry, err := uploadHashLayer(ctx, s, slkey, 0, func(w io.Writer) (string, error) {
 		f, err := os.Open(result.SymlinkLayer.Path)
 		if err != nil {
 			log.WithError(err).WithFields(log.Fields{
@@ -345,7 +342,7 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
 				"layer": slkey,
 			}).Error("failed to open symlink layer")
 
-			return err
+			return "", err
 		}
 		defer f.Close()
 
@@ -358,18 +355,16 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
 				"layer": slkey,
 			}).Error("failed to upload symlink layer")
 
-			return err
+			return "", err
 		}
 
-		return gz.Close()
+		return "sha256:" + slkey, gz.Close()
 	})
 
 	if err != nil {
 		return nil, err
 	}
 
-	entry.TarHash = "sha256:" + result.SymlinkLayer.TarHash
-	go cacheLayer(ctx, s, slkey, *entry)
 	entries = append(entries, *entry)
 
 	return entries, nil
@@ -380,7 +375,7 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
 //
 // This type exists to avoid duplication between the handling of
 // symlink layers and store path layers.
-type layerWriter func(w io.Writer) error
+type layerWriter func(w io.Writer) (string, error)
 
 // byteCounter is a special io.Writer that counts all bytes written to
 // it and does nothing else.
@@ -408,8 +403,16 @@ func (b *byteCounter) Write(p []byte) (n int, err error) {
 //
 // The return value is the layer's SHA256 hash, which is used in the
 // image manifest.
-func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) (*manifest.Entry, error) {
+func uploadHashLayer(ctx context.Context, s *State, key string, mrating uint64, lw layerWriter) (*manifest.Entry, error) {
+	s.UploadMutex.Lock(key)
+	defer s.UploadMutex.Unlock(key)
+
+	if entry, cached := layerFromCache(ctx, s, key); cached {
+		return entry, nil
+	}
+
 	path := "staging/" + key
+	var tarhash string
 	sha256sum, size, err := s.Storage.Persist(ctx, path, manifest.LayerType, func(sw io.Writer) (string, int64, error) {
 		// Sets up a "multiwriter" that simultaneously runs both hash
 		// algorithms and uploads to the storage backend.
@@ -417,7 +420,8 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter)
 		counter := &byteCounter{}
 		multi := io.MultiWriter(sw, shasum, counter)
 
-		err := lw(multi)
+		var err error
+		tarhash, err = lw(multi)
 		sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{}))
 
 		return sha256sum, counter.count, err
@@ -449,10 +453,14 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter)
 	}).Info("created and persisted layer")
 
 	entry := manifest.Entry{
-		Digest: "sha256:" + sha256sum,
-		Size:   size,
+		Digest:      "sha256:" + sha256sum,
+		Size:        size,
+		TarHash:     tarhash,
+		MergeRating: mrating,
 	}
 
+	cacheLayer(ctx, s, key, entry)
+
 	return &entry, nil
 }
 
@@ -493,13 +501,13 @@ func BuildImage(ctx context.Context, s *State, image *Image) (*BuildResult, erro
 	}
 	m, c := manifest.Manifest(image.Arch.imageArch, layers, cmd)
 
-	lw := func(w io.Writer) error {
+	lw := func(w io.Writer) (string, error) {
 		r := bytes.NewReader(c.Config)
 		_, err := io.Copy(w, r)
-		return err
+		return "", err
 	}
 
-	if _, err = uploadHashLayer(ctx, s, c.SHA256, lw); err != nil {
+	if _, err = uploadHashLayer(ctx, s, c.SHA256, 0, lw); err != nil {
 		log.WithError(err).WithFields(log.Fields{
 			"image": image.Name,
 			"tag":   image.Tag,