diff options
Diffstat (limited to 'tvix/nar-bridge/pkg/importer')
-rw-r--r-- | tvix/nar-bridge/pkg/importer/importer.go | 87 | ||||
-rw-r--r-- | tvix/nar-bridge/pkg/importer/roundtrip_test.go | 4 |
2 files changed, 81 insertions, 10 deletions
diff --git a/tvix/nar-bridge/pkg/importer/importer.go b/tvix/nar-bridge/pkg/importer/importer.go index 679fc9b8ab14..fce6c5f293da 100644 --- a/tvix/nar-bridge/pkg/importer/importer.go +++ b/tvix/nar-bridge/pkg/importer/importer.go @@ -1,6 +1,7 @@ package importer import ( + "bytes" "context" "crypto/sha256" "errors" @@ -11,6 +12,17 @@ import ( castorev1pb "code.tvl.fyi/tvix/castore-go" "github.com/nix-community/go-nix/pkg/nar" + "golang.org/x/sync/errgroup" + "lukechampine.com/blake3" +) + +const ( + // asyncUploadThreshold controls when a file is buffered into memory and uploaded + // asynchronously. Files must be smaller than the threshold to be uploaded asynchronously. + asyncUploadThreshold = 1024 * 1024 // 1 MiB + // maxAsyncUploadBufferBytes is the maximum number of async blob uploads allowed to be + // running concurrently at any given time for a simple import operation. + maxConcurrentAsyncUploads = 128 ) // An item on the directories stack @@ -51,6 +63,11 @@ func Import( var rootFile *castorev1pb.FileNode var stackDirectory *castorev1pb.Directory + // Keep track of all asynch blob uploads so we can make sure they all succeed + // before returning. + var asyncBlobWg errgroup.Group + asyncBlobWg.SetLimit(maxConcurrentAsyncUploads) + var stack = []stackItem{} // popFromStack is used when we transition to a different directory or @@ -124,6 +141,12 @@ func Import( } } + // Wait for any pending blob uploads. + err := asyncBlobWg.Wait() + if err != nil { + return nil, 0, nil, fmt.Errorf("async blob upload: %w", err) + } + // Stack is empty. // Now either root{File,Symlink,Directory} is not nil, // and we can return the root node. @@ -192,19 +215,63 @@ func Import( } if hdr.Type == nar.TypeRegular { - // wrap reader with a reader counting the number of bytes read - blobCountW := &CountingWriter{} - blobReader := io.TeeReader(narReader, blobCountW) + uploadBlob := func(r io.Reader) ([]byte, error) { + // wrap reader with a reader counting the number of bytes read + blobCountW := &CountingWriter{} + blobReader := io.TeeReader(r, blobCountW) - blobDigest, err := blobCb(blobReader) - if err != nil { - return nil, 0, nil, fmt.Errorf("failure from blobCb: %w", err) + blobDigest, err := blobCb(blobReader) + if err != nil { + return nil, fmt.Errorf("failure from blobCb: %w", err) + } + + // ensure blobCb did read all the way to the end. + // If it didn't, the blobCb function is wrong and we should bail out. + if blobCountW.BytesWritten() != uint64(hdr.Size) { + return nil, fmt.Errorf("blobCb did not read all: %d/%d bytes", blobCountW.BytesWritten(), hdr.Size) + } + + return blobDigest, nil } - // ensure blobCb did read all the way to the end. - // If it didn't, the blobCb function is wrong and we should bail out. - if blobCountW.BytesWritten() != uint64(hdr.Size) { - panic("blobCB did not read to end") + h := blake3.New(32, nil) + blobReader := io.TeeReader(narReader, io.MultiWriter(h)) + var blobDigest []byte + + // If this file is small enough, read it off the wire immediately and + // upload to the blob service asynchronously. This helps reduce the + // RTT on blob uploads for NARs with many small files. + doAsync := hdr.Size < asyncUploadThreshold + if doAsync { + blobContents, err := io.ReadAll(blobReader) + if err != nil { + return nil, 0, nil, fmt.Errorf("read blob: %w", err) + } + + blobDigest = h.Sum(nil) + + asyncBlobWg.Go(func() error { + blobDigestFromCb, err := uploadBlob(bytes.NewReader(blobContents)) + if err != nil { + return err + } + + if !bytes.Equal(blobDigest, blobDigestFromCb) { + return fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest) + } + + return nil + }) + } else { + blobDigestFromCb, err := uploadBlob(blobReader) + if err != nil { + return nil, 0, nil, fmt.Errorf("upload blob: %w", err) + } + + blobDigest = h.Sum(nil) + if !bytes.Equal(blobDigest, blobDigestFromCb) { + return nil, 0, nil, fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest) + } } fileNode := &castorev1pb.FileNode{ diff --git a/tvix/nar-bridge/pkg/importer/roundtrip_test.go b/tvix/nar-bridge/pkg/importer/roundtrip_test.go index b16c310522f5..6d6fcb9ee220 100644 --- a/tvix/nar-bridge/pkg/importer/roundtrip_test.go +++ b/tvix/nar-bridge/pkg/importer/roundtrip_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "sync" "testing" castorev1pb "code.tvl.fyi/tvix/castore-go" @@ -27,6 +28,7 @@ func TestRoundtrip(t *testing.T) { narContents, err := io.ReadAll(f) require.NoError(t, err) + var mu sync.Mutex blobsMap := make(map[string][]byte, 0) directoriesMap := make(map[string]*castorev1pb.Directory) @@ -41,7 +43,9 @@ func TestRoundtrip(t *testing.T) { dgst := mustBlobDigest(bytes.NewReader(contents)) // put it in filesMap + mu.Lock() blobsMap[base64.StdEncoding.EncodeToString(dgst)] = contents + mu.Unlock() return dgst, nil }, |