about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/nar-bridge/default.nix2
-rw-r--r--tvix/nar-bridge/go.mod1
-rw-r--r--tvix/nar-bridge/go.sum2
-rw-r--r--tvix/nar-bridge/pkg/importer/importer.go87
-rw-r--r--tvix/nar-bridge/pkg/importer/roundtrip_test.go4
5 files changed, 85 insertions, 11 deletions
diff --git a/tvix/nar-bridge/default.nix b/tvix/nar-bridge/default.nix
index d28d6f9724..c0247f279f 100644
--- a/tvix/nar-bridge/default.nix
+++ b/tvix/nar-bridge/default.nix
@@ -6,5 +6,5 @@ pkgs.buildGoModule {
   name = "nar-bridge";
   src = depot.third_party.gitignoreSource ./.;
 
-  vendorHash = "sha256-9tEpICef6xCuRQwyXCg15KugcvknnP53WHotohcbeis=";
+  vendorHash = "sha256-7jugbC5sEGhppjiZgnoLP5A6kQSaHK9vE6cXVZBG22s=";
 }
diff --git a/tvix/nar-bridge/go.mod b/tvix/nar-bridge/go.mod
index 175f7edcfd..deb6943e23 100644
--- a/tvix/nar-bridge/go.mod
+++ b/tvix/nar-bridge/go.mod
@@ -18,6 +18,7 @@ require (
 	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0
 	go.opentelemetry.io/otel/sdk v1.22.0
 	go.opentelemetry.io/otel/sdk/metric v1.22.0
+	golang.org/x/sync v0.4.0
 	google.golang.org/grpc v1.60.1
 	google.golang.org/protobuf v1.32.0
 	lukechampine.com/blake3 v1.2.1
diff --git a/tvix/nar-bridge/go.sum b/tvix/nar-bridge/go.sum
index 2c1bac78cc..39f77b9061 100644
--- a/tvix/nar-bridge/go.sum
+++ b/tvix/nar-bridge/go.sum
@@ -90,6 +90,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
 golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
 golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
 golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY=
+golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
+golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
diff --git a/tvix/nar-bridge/pkg/importer/importer.go b/tvix/nar-bridge/pkg/importer/importer.go
index 679fc9b8ab..fce6c5f293 100644
--- a/tvix/nar-bridge/pkg/importer/importer.go
+++ b/tvix/nar-bridge/pkg/importer/importer.go
@@ -1,6 +1,7 @@
 package importer
 
 import (
+	"bytes"
 	"context"
 	"crypto/sha256"
 	"errors"
@@ -11,6 +12,17 @@ import (
 
 	castorev1pb "code.tvl.fyi/tvix/castore-go"
 	"github.com/nix-community/go-nix/pkg/nar"
+	"golang.org/x/sync/errgroup"
+	"lukechampine.com/blake3"
+)
+
+const (
+	// asyncUploadThreshold controls when a file is buffered into memory and uploaded
+	// asynchronously. Files must be smaller than the threshold to be uploaded asynchronously.
+	asyncUploadThreshold = 1024 * 1024 // 1 MiB
+	// maxAsyncUploadBufferBytes is the maximum number of async blob uploads allowed to be
+	// running concurrently at any given time for a simple import operation.
+	maxConcurrentAsyncUploads = 128
 )
 
 // An item on the directories stack
@@ -51,6 +63,11 @@ func Import(
 	var rootFile *castorev1pb.FileNode
 	var stackDirectory *castorev1pb.Directory
 
+	// Keep track of all asynch blob uploads so we can make sure they all succeed
+	// before returning.
+	var asyncBlobWg errgroup.Group
+	asyncBlobWg.SetLimit(maxConcurrentAsyncUploads)
+
 	var stack = []stackItem{}
 
 	// popFromStack is used when we transition to a different directory or
@@ -124,6 +141,12 @@ func Import(
 					}
 				}
 
+				// Wait for any pending blob uploads.
+				err := asyncBlobWg.Wait()
+				if err != nil {
+					return nil, 0, nil, fmt.Errorf("async blob upload: %w", err)
+				}
+
 				// Stack is empty.
 				// Now either root{File,Symlink,Directory} is not nil,
 				// and we can return the root node.
@@ -192,19 +215,63 @@ func Import(
 
 			}
 			if hdr.Type == nar.TypeRegular {
-				// wrap reader with a reader counting the number of bytes read
-				blobCountW := &CountingWriter{}
-				blobReader := io.TeeReader(narReader, blobCountW)
+				uploadBlob := func(r io.Reader) ([]byte, error) {
+					// wrap reader with a reader counting the number of bytes read
+					blobCountW := &CountingWriter{}
+					blobReader := io.TeeReader(r, blobCountW)
 
-				blobDigest, err := blobCb(blobReader)
-				if err != nil {
-					return nil, 0, nil, fmt.Errorf("failure from blobCb: %w", err)
+					blobDigest, err := blobCb(blobReader)
+					if err != nil {
+						return nil, fmt.Errorf("failure from blobCb: %w", err)
+					}
+
+					// ensure blobCb did read all the way to the end.
+					// If it didn't, the blobCb function is wrong and we should bail out.
+					if blobCountW.BytesWritten() != uint64(hdr.Size) {
+						return nil, fmt.Errorf("blobCb did not read all: %d/%d bytes", blobCountW.BytesWritten(), hdr.Size)
+					}
+
+					return blobDigest, nil
 				}
 
-				// ensure blobCb did read all the way to the end.
-				// If it didn't, the blobCb function is wrong and we should bail out.
-				if blobCountW.BytesWritten() != uint64(hdr.Size) {
-					panic("blobCB did not read to end")
+				h := blake3.New(32, nil)
+				blobReader := io.TeeReader(narReader, io.MultiWriter(h))
+				var blobDigest []byte
+
+				// If this file is small enough, read it off the wire immediately and
+				// upload to the blob service asynchronously. This helps reduce the
+				// RTT on blob uploads for NARs with many small files.
+				doAsync := hdr.Size < asyncUploadThreshold
+				if doAsync {
+					blobContents, err := io.ReadAll(blobReader)
+					if err != nil {
+						return nil, 0, nil, fmt.Errorf("read blob: %w", err)
+					}
+
+					blobDigest = h.Sum(nil)
+
+					asyncBlobWg.Go(func() error {
+						blobDigestFromCb, err := uploadBlob(bytes.NewReader(blobContents))
+						if err != nil {
+							return err
+						}
+
+						if !bytes.Equal(blobDigest, blobDigestFromCb) {
+							return fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest)
+						}
+
+						return nil
+					})
+				} else {
+					blobDigestFromCb, err := uploadBlob(blobReader)
+					if err != nil {
+						return nil, 0, nil, fmt.Errorf("upload blob: %w", err)
+					}
+
+					blobDigest = h.Sum(nil)
+					if !bytes.Equal(blobDigest, blobDigestFromCb) {
+						return nil, 0, nil, fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest)
+					}
 				}
 
 				fileNode := &castorev1pb.FileNode{
diff --git a/tvix/nar-bridge/pkg/importer/roundtrip_test.go b/tvix/nar-bridge/pkg/importer/roundtrip_test.go
index b16c310522..6d6fcb9ee2 100644
--- a/tvix/nar-bridge/pkg/importer/roundtrip_test.go
+++ b/tvix/nar-bridge/pkg/importer/roundtrip_test.go
@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"sync"
 	"testing"
 
 	castorev1pb "code.tvl.fyi/tvix/castore-go"
@@ -27,6 +28,7 @@ func TestRoundtrip(t *testing.T) {
 	narContents, err := io.ReadAll(f)
 	require.NoError(t, err)
 
+	var mu sync.Mutex
 	blobsMap := make(map[string][]byte, 0)
 	directoriesMap := make(map[string]*castorev1pb.Directory)
 
@@ -41,7 +43,9 @@ func TestRoundtrip(t *testing.T) {
 			dgst := mustBlobDigest(bytes.NewReader(contents))
 
 			// put it in filesMap
+			mu.Lock()
 			blobsMap[base64.StdEncoding.EncodeToString(dgst)] = contents
+			mu.Unlock()
 
 			return dgst, nil
 		},