about summary refs log tree commit diff
path: root/tvix/nar-bridge/pkg/importer
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nar-bridge/pkg/importer')
-rw-r--r--tvix/nar-bridge/pkg/importer/importer.go87
-rw-r--r--tvix/nar-bridge/pkg/importer/roundtrip_test.go4
2 files changed, 81 insertions, 10 deletions
diff --git a/tvix/nar-bridge/pkg/importer/importer.go b/tvix/nar-bridge/pkg/importer/importer.go
index 679fc9b8ab14..fce6c5f293da 100644
--- a/tvix/nar-bridge/pkg/importer/importer.go
+++ b/tvix/nar-bridge/pkg/importer/importer.go
@@ -1,6 +1,7 @@
 package importer
 
 import (
+	"bytes"
 	"context"
 	"crypto/sha256"
 	"errors"
@@ -11,6 +12,17 @@ import (
 
 	castorev1pb "code.tvl.fyi/tvix/castore-go"
 	"github.com/nix-community/go-nix/pkg/nar"
+	"golang.org/x/sync/errgroup"
+	"lukechampine.com/blake3"
+)
+
+const (
+	// asyncUploadThreshold controls when a file is buffered into memory and uploaded
+	// asynchronously. Files must be smaller than the threshold to be uploaded asynchronously.
+	asyncUploadThreshold = 1024 * 1024 // 1 MiB
+	// maxAsyncUploadBufferBytes is the maximum number of async blob uploads allowed to be
+	// running concurrently at any given time for a simple import operation.
+	maxConcurrentAsyncUploads = 128
 )
 
 // An item on the directories stack
@@ -51,6 +63,11 @@ func Import(
 	var rootFile *castorev1pb.FileNode
 	var stackDirectory *castorev1pb.Directory
 
+	// Keep track of all asynch blob uploads so we can make sure they all succeed
+	// before returning.
+	var asyncBlobWg errgroup.Group
+	asyncBlobWg.SetLimit(maxConcurrentAsyncUploads)
+
 	var stack = []stackItem{}
 
 	// popFromStack is used when we transition to a different directory or
@@ -124,6 +141,12 @@ func Import(
 					}
 				}
 
+				// Wait for any pending blob uploads.
+				err := asyncBlobWg.Wait()
+				if err != nil {
+					return nil, 0, nil, fmt.Errorf("async blob upload: %w", err)
+				}
+
 				// Stack is empty.
 				// Now either root{File,Symlink,Directory} is not nil,
 				// and we can return the root node.
@@ -192,19 +215,63 @@ func Import(
 
 			}
 			if hdr.Type == nar.TypeRegular {
-				// wrap reader with a reader counting the number of bytes read
-				blobCountW := &CountingWriter{}
-				blobReader := io.TeeReader(narReader, blobCountW)
+				uploadBlob := func(r io.Reader) ([]byte, error) {
+					// wrap reader with a reader counting the number of bytes read
+					blobCountW := &CountingWriter{}
+					blobReader := io.TeeReader(r, blobCountW)
 
-				blobDigest, err := blobCb(blobReader)
-				if err != nil {
-					return nil, 0, nil, fmt.Errorf("failure from blobCb: %w", err)
+					blobDigest, err := blobCb(blobReader)
+					if err != nil {
+						return nil, fmt.Errorf("failure from blobCb: %w", err)
+					}
+
+					// ensure blobCb did read all the way to the end.
+					// If it didn't, the blobCb function is wrong and we should bail out.
+					if blobCountW.BytesWritten() != uint64(hdr.Size) {
+						return nil, fmt.Errorf("blobCb did not read all: %d/%d bytes", blobCountW.BytesWritten(), hdr.Size)
+					}
+
+					return blobDigest, nil
 				}
 
-				// ensure blobCb did read all the way to the end.
-				// If it didn't, the blobCb function is wrong and we should bail out.
-				if blobCountW.BytesWritten() != uint64(hdr.Size) {
-					panic("blobCB did not read to end")
+				h := blake3.New(32, nil)
+				blobReader := io.TeeReader(narReader, io.MultiWriter(h))
+				var blobDigest []byte
+
+				// If this file is small enough, read it off the wire immediately and
+				// upload to the blob service asynchronously. This helps reduce the
+				// RTT on blob uploads for NARs with many small files.
+				doAsync := hdr.Size < asyncUploadThreshold
+				if doAsync {
+					blobContents, err := io.ReadAll(blobReader)
+					if err != nil {
+						return nil, 0, nil, fmt.Errorf("read blob: %w", err)
+					}
+
+					blobDigest = h.Sum(nil)
+
+					asyncBlobWg.Go(func() error {
+						blobDigestFromCb, err := uploadBlob(bytes.NewReader(blobContents))
+						if err != nil {
+							return err
+						}
+
+						if !bytes.Equal(blobDigest, blobDigestFromCb) {
+							return fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest)
+						}
+
+						return nil
+					})
+				} else {
+					blobDigestFromCb, err := uploadBlob(blobReader)
+					if err != nil {
+						return nil, 0, nil, fmt.Errorf("upload blob: %w", err)
+					}
+
+					blobDigest = h.Sum(nil)
+					if !bytes.Equal(blobDigest, blobDigestFromCb) {
+						return nil, 0, nil, fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest)
+					}
 				}
 
 				fileNode := &castorev1pb.FileNode{
diff --git a/tvix/nar-bridge/pkg/importer/roundtrip_test.go b/tvix/nar-bridge/pkg/importer/roundtrip_test.go
index b16c310522f5..6d6fcb9ee220 100644
--- a/tvix/nar-bridge/pkg/importer/roundtrip_test.go
+++ b/tvix/nar-bridge/pkg/importer/roundtrip_test.go
@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"sync"
 	"testing"
 
 	castorev1pb "code.tvl.fyi/tvix/castore-go"
@@ -27,6 +28,7 @@ func TestRoundtrip(t *testing.T) {
 	narContents, err := io.ReadAll(f)
 	require.NoError(t, err)
 
+	var mu sync.Mutex
 	blobsMap := make(map[string][]byte, 0)
 	directoriesMap := make(map[string]*castorev1pb.Directory)
 
@@ -41,7 +43,9 @@ func TestRoundtrip(t *testing.T) {
 			dgst := mustBlobDigest(bytes.NewReader(contents))
 
 			// put it in filesMap
+			mu.Lock()
 			blobsMap[base64.StdEncoding.EncodeToString(dgst)] = contents
+			mu.Unlock()
 
 			return dgst, nil
 		},