diff options
Diffstat (limited to 'tvix/nar-bridge')
-rw-r--r-- | tvix/nar-bridge/default.nix | 2 | ||||
-rw-r--r-- | tvix/nar-bridge/go.mod | 1 | ||||
-rw-r--r-- | tvix/nar-bridge/go.sum | 2 | ||||
-rw-r--r-- | tvix/nar-bridge/pkg/importer/importer.go | 87 | ||||
-rw-r--r-- | tvix/nar-bridge/pkg/importer/roundtrip_test.go | 4 |
5 files changed, 85 insertions, 11 deletions
diff --git a/tvix/nar-bridge/default.nix b/tvix/nar-bridge/default.nix index d28d6f972422..c0247f279f32 100644 --- a/tvix/nar-bridge/default.nix +++ b/tvix/nar-bridge/default.nix @@ -6,5 +6,5 @@ pkgs.buildGoModule { name = "nar-bridge"; src = depot.third_party.gitignoreSource ./.; - vendorHash = "sha256-9tEpICef6xCuRQwyXCg15KugcvknnP53WHotohcbeis="; + vendorHash = "sha256-7jugbC5sEGhppjiZgnoLP5A6kQSaHK9vE6cXVZBG22s="; } diff --git a/tvix/nar-bridge/go.mod b/tvix/nar-bridge/go.mod index 175f7edcfde9..deb6943e23df 100644 --- a/tvix/nar-bridge/go.mod +++ b/tvix/nar-bridge/go.mod @@ -18,6 +18,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 go.opentelemetry.io/otel/sdk v1.22.0 go.opentelemetry.io/otel/sdk/metric v1.22.0 + golang.org/x/sync v0.4.0 google.golang.org/grpc v1.60.1 google.golang.org/protobuf v1.32.0 lukechampine.com/blake3 v1.2.1 diff --git a/tvix/nar-bridge/go.sum b/tvix/nar-bridge/go.sum index 2c1bac78ccf7..39f77b906128 100644 --- a/tvix/nar-bridge/go.sum +++ b/tvix/nar-bridge/go.sum @@ -90,6 +90,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= diff --git a/tvix/nar-bridge/pkg/importer/importer.go b/tvix/nar-bridge/pkg/importer/importer.go index 679fc9b8ab14..fce6c5f293da 100644 --- a/tvix/nar-bridge/pkg/importer/importer.go +++ b/tvix/nar-bridge/pkg/importer/importer.go @@ -1,6 +1,7 @@ package importer import ( + "bytes" "context" "crypto/sha256" "errors" @@ -11,6 +12,17 @@ import ( castorev1pb "code.tvl.fyi/tvix/castore-go" "github.com/nix-community/go-nix/pkg/nar" + "golang.org/x/sync/errgroup" + "lukechampine.com/blake3" +) + +const ( + // asyncUploadThreshold controls when a file is buffered into memory and uploaded + // asynchronously. Files must be smaller than the threshold to be uploaded asynchronously. + asyncUploadThreshold = 1024 * 1024 // 1 MiB + // maxAsyncUploadBufferBytes is the maximum number of async blob uploads allowed to be + // running concurrently at any given time for a simple import operation. + maxConcurrentAsyncUploads = 128 ) // An item on the directories stack @@ -51,6 +63,11 @@ func Import( var rootFile *castorev1pb.FileNode var stackDirectory *castorev1pb.Directory + // Keep track of all asynch blob uploads so we can make sure they all succeed + // before returning. + var asyncBlobWg errgroup.Group + asyncBlobWg.SetLimit(maxConcurrentAsyncUploads) + var stack = []stackItem{} // popFromStack is used when we transition to a different directory or @@ -124,6 +141,12 @@ func Import( } } + // Wait for any pending blob uploads. + err := asyncBlobWg.Wait() + if err != nil { + return nil, 0, nil, fmt.Errorf("async blob upload: %w", err) + } + // Stack is empty. // Now either root{File,Symlink,Directory} is not nil, // and we can return the root node. @@ -192,19 +215,63 @@ func Import( } if hdr.Type == nar.TypeRegular { - // wrap reader with a reader counting the number of bytes read - blobCountW := &CountingWriter{} - blobReader := io.TeeReader(narReader, blobCountW) + uploadBlob := func(r io.Reader) ([]byte, error) { + // wrap reader with a reader counting the number of bytes read + blobCountW := &CountingWriter{} + blobReader := io.TeeReader(r, blobCountW) - blobDigest, err := blobCb(blobReader) - if err != nil { - return nil, 0, nil, fmt.Errorf("failure from blobCb: %w", err) + blobDigest, err := blobCb(blobReader) + if err != nil { + return nil, fmt.Errorf("failure from blobCb: %w", err) + } + + // ensure blobCb did read all the way to the end. + // If it didn't, the blobCb function is wrong and we should bail out. + if blobCountW.BytesWritten() != uint64(hdr.Size) { + return nil, fmt.Errorf("blobCb did not read all: %d/%d bytes", blobCountW.BytesWritten(), hdr.Size) + } + + return blobDigest, nil } - // ensure blobCb did read all the way to the end. - // If it didn't, the blobCb function is wrong and we should bail out. - if blobCountW.BytesWritten() != uint64(hdr.Size) { - panic("blobCB did not read to end") + h := blake3.New(32, nil) + blobReader := io.TeeReader(narReader, io.MultiWriter(h)) + var blobDigest []byte + + // If this file is small enough, read it off the wire immediately and + // upload to the blob service asynchronously. This helps reduce the + // RTT on blob uploads for NARs with many small files. + doAsync := hdr.Size < asyncUploadThreshold + if doAsync { + blobContents, err := io.ReadAll(blobReader) + if err != nil { + return nil, 0, nil, fmt.Errorf("read blob: %w", err) + } + + blobDigest = h.Sum(nil) + + asyncBlobWg.Go(func() error { + blobDigestFromCb, err := uploadBlob(bytes.NewReader(blobContents)) + if err != nil { + return err + } + + if !bytes.Equal(blobDigest, blobDigestFromCb) { + return fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest) + } + + return nil + }) + } else { + blobDigestFromCb, err := uploadBlob(blobReader) + if err != nil { + return nil, 0, nil, fmt.Errorf("upload blob: %w", err) + } + + blobDigest = h.Sum(nil) + if !bytes.Equal(blobDigest, blobDigestFromCb) { + return nil, 0, nil, fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest) + } } fileNode := &castorev1pb.FileNode{ diff --git a/tvix/nar-bridge/pkg/importer/roundtrip_test.go b/tvix/nar-bridge/pkg/importer/roundtrip_test.go index b16c310522f5..6d6fcb9ee220 100644 --- a/tvix/nar-bridge/pkg/importer/roundtrip_test.go +++ b/tvix/nar-bridge/pkg/importer/roundtrip_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "sync" "testing" castorev1pb "code.tvl.fyi/tvix/castore-go" @@ -27,6 +28,7 @@ func TestRoundtrip(t *testing.T) { narContents, err := io.ReadAll(f) require.NoError(t, err) + var mu sync.Mutex blobsMap := make(map[string][]byte, 0) directoriesMap := make(map[string]*castorev1pb.Directory) @@ -41,7 +43,9 @@ func TestRoundtrip(t *testing.T) { dgst := mustBlobDigest(bytes.NewReader(contents)) // put it in filesMap + mu.Lock() blobsMap[base64.StdEncoding.EncodeToString(dgst)] = contents + mu.Unlock() return dgst, nil }, |