about summary refs log tree commit diff
path: root/tvix/nar-bridge/pkg/importer/importer.go
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-05-14T10·35+0200
committerclbot <clbot@tvl.fyi>2024-05-15T21·31+0000
commit1392913e981ae4edbec6ef39a4d3de44749ad81c (patch)
tree899672eac93c185a11a125e2f8d1c41367edbf17 /tvix/nar-bridge/pkg/importer/importer.go
parentce1aa10b694662a3bb4061184312de7a422cfe42 (diff)
chore(tvix/nar-bridge): move to nar-bridge-go r/8147
Make some space for the rust implementation.

Change-Id: I924dc1657be10abe5a11951c3b9de50bae06db19
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11662
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: yuka <yuka@yuka.dev>
Diffstat (limited to 'tvix/nar-bridge/pkg/importer/importer.go')
-rw-r--r--tvix/nar-bridge/pkg/importer/importer.go303
1 files changed, 0 insertions, 303 deletions
diff --git a/tvix/nar-bridge/pkg/importer/importer.go b/tvix/nar-bridge/pkg/importer/importer.go
deleted file mode 100644
index fce6c5f293da..000000000000
--- a/tvix/nar-bridge/pkg/importer/importer.go
+++ /dev/null
@@ -1,303 +0,0 @@
-package importer
-
-import (
-	"bytes"
-	"context"
-	"crypto/sha256"
-	"errors"
-	"fmt"
-	"io"
-	"path"
-	"strings"
-
-	castorev1pb "code.tvl.fyi/tvix/castore-go"
-	"github.com/nix-community/go-nix/pkg/nar"
-	"golang.org/x/sync/errgroup"
-	"lukechampine.com/blake3"
-)
-
-const (
-	// asyncUploadThreshold controls when a file is buffered into memory and uploaded
-	// asynchronously. Files must be smaller than the threshold to be uploaded asynchronously.
-	asyncUploadThreshold = 1024 * 1024 // 1 MiB
-	// maxAsyncUploadBufferBytes is the maximum number of async blob uploads allowed to be
-	// running concurrently at any given time for a simple import operation.
-	maxConcurrentAsyncUploads = 128
-)
-
-// An item on the directories stack
-type stackItem struct {
-	path      string
-	directory *castorev1pb.Directory
-}
-
-// Import reads a NAR from a reader, and returns a the root node,
-// NAR size and NAR sha256 digest.
-func Import(
-	// a context, to support cancellation
-	ctx context.Context,
-	// The reader the data is read from
-	r io.Reader,
-	// callback function called with each regular file content
-	blobCb func(fileReader io.Reader) ([]byte, error),
-	// callback function called with each finalized directory node
-	directoryCb func(directory *castorev1pb.Directory) ([]byte, error),
-) (*castorev1pb.Node, uint64, []byte, error) {
-	// We need to wrap the underlying reader a bit.
-	// - we want to keep track of the number of bytes read in total
-	// - we calculate the sha256 digest over all data read
-	// Express these two things in a MultiWriter, and give the NAR reader a
-	// TeeReader that writes to it.
-	narCountW := &CountingWriter{}
-	sha256W := sha256.New()
-	multiW := io.MultiWriter(narCountW, sha256W)
-	narReader, err := nar.NewReader(io.TeeReader(r, multiW))
-	if err != nil {
-		return nil, 0, nil, fmt.Errorf("failed to instantiate nar reader: %w", err)
-	}
-	defer narReader.Close()
-
-	// If we store a symlink or regular file at the root, these are not nil.
-	// If they are nil, we instead have a stackDirectory.
-	var rootSymlink *castorev1pb.SymlinkNode
-	var rootFile *castorev1pb.FileNode
-	var stackDirectory *castorev1pb.Directory
-
-	// Keep track of all asynch blob uploads so we can make sure they all succeed
-	// before returning.
-	var asyncBlobWg errgroup.Group
-	asyncBlobWg.SetLimit(maxConcurrentAsyncUploads)
-
-	var stack = []stackItem{}
-
-	// popFromStack is used when we transition to a different directory or
-	// drain the stack when we reach the end of the NAR.
-	// It adds the popped element to the element underneath if any,
-	// and passes it to the directoryCb callback.
-	// This function may only be called if the stack is not already empty.
-	popFromStack := func() error {
-		// Keep the top item, and "resize" the stack slice.
-		// This will only make the last element unaccessible, but chances are high
-		// we're re-using that space anyways.
-		toPop := stack[len(stack)-1]
-		stack = stack[:len(stack)-1]
-
-		// call the directoryCb
-		directoryDigest, err := directoryCb(toPop.directory)
-		if err != nil {
-			return fmt.Errorf("failed calling directoryCb: %w", err)
-		}
-
-		// if there's still a parent left on the stack, refer to it from there.
-		if len(stack) > 0 {
-			topOfStack := stack[len(stack)-1].directory
-			topOfStack.Directories = append(topOfStack.Directories, &castorev1pb.DirectoryNode{
-				Name:   []byte(path.Base(toPop.path)),
-				Digest: directoryDigest,
-				Size:   toPop.directory.Size(),
-			})
-		}
-		// Keep track that we have encounter at least one directory
-		stackDirectory = toPop.directory
-		return nil
-	}
-
-	getBasename := func(p string) string {
-		// extract the basename. In case of "/", replace with empty string.
-		basename := path.Base(p)
-		if basename == "/" {
-			basename = ""
-		}
-		return basename
-	}
-
-	for {
-		select {
-		case <-ctx.Done():
-			return nil, 0, nil, ctx.Err()
-		default:
-			// call narReader.Next() to get the next element
-			hdr, err := narReader.Next()
-
-			// If this returns an error, it's either EOF (when we're done reading from the NAR),
-			// or another error.
-			if err != nil {
-				// if this returns no EOF, bail out
-				if !errors.Is(err, io.EOF) {
-					return nil, 0, nil, fmt.Errorf("failed getting next nar element: %w", err)
-				}
-
-				// The NAR has been read all the way to the end…
-				// Make sure we close the nar reader, which might read some final trailers.
-				if err := narReader.Close(); err != nil {
-					return nil, 0, nil, fmt.Errorf("unable to close nar reader: %w", err)
-				}
-
-				// Check the stack. While it's not empty, we need to pop things off the stack.
-				for len(stack) > 0 {
-					err := popFromStack()
-					if err != nil {
-						return nil, 0, nil, fmt.Errorf("unable to pop from stack: %w", err)
-					}
-				}
-
-				// Wait for any pending blob uploads.
-				err := asyncBlobWg.Wait()
-				if err != nil {
-					return nil, 0, nil, fmt.Errorf("async blob upload: %w", err)
-				}
-
-				// Stack is empty.
-				// Now either root{File,Symlink,Directory} is not nil,
-				// and we can return the root node.
-				narSize := narCountW.BytesWritten()
-				narSha256 := sha256W.Sum(nil)
-
-				if rootFile != nil {
-					return &castorev1pb.Node{
-						Node: &castorev1pb.Node_File{
-							File: rootFile,
-						},
-					}, narSize, narSha256, nil
-				} else if rootSymlink != nil {
-					return &castorev1pb.Node{
-						Node: &castorev1pb.Node_Symlink{
-							Symlink: rootSymlink,
-						},
-					}, narSize, narSha256, nil
-				} else if stackDirectory != nil {
-					// calculate directory digest (i.e. after we received all its contents)
-					dgst, err := stackDirectory.Digest()
-					if err != nil {
-						return nil, 0, nil, fmt.Errorf("unable to calculate root directory digest: %w", err)
-					}
-
-					return &castorev1pb.Node{
-						Node: &castorev1pb.Node_Directory{
-							Directory: &castorev1pb.DirectoryNode{
-								Name:   []byte{},
-								Digest: dgst,
-								Size:   stackDirectory.Size(),
-							},
-						},
-					}, narSize, narSha256, nil
-				} else {
-					return nil, 0, nil, fmt.Errorf("no root set")
-				}
-			}
-
-			// Check for valid path transitions, pop from stack if needed
-			// The nar reader already gives us some guarantees about ordering and illegal transitions,
-			// So we really only need to check if the top-of-stack path is a prefix of the path,
-			// and if it's not, pop from the stack. We do this repeatedly until the top of the stack is
-			// the subdirectory the new entry is in, or we hit the root directory.
-
-			// We don't need to worry about the root node case, because we can only finish the root "/"
-			// If we're at the end of the NAR reader (covered by the EOF check)
-			for len(stack) > 1 && !strings.HasPrefix(hdr.Path, stack[len(stack)-1].path+"/") {
-				err := popFromStack()
-				if err != nil {
-					return nil, 0, nil, fmt.Errorf("unable to pop from stack: %w", err)
-				}
-			}
-
-			if hdr.Type == nar.TypeSymlink {
-				symlinkNode := &castorev1pb.SymlinkNode{
-					Name:   []byte(getBasename(hdr.Path)),
-					Target: []byte(hdr.LinkTarget),
-				}
-				if len(stack) > 0 {
-					topOfStack := stack[len(stack)-1].directory
-					topOfStack.Symlinks = append(topOfStack.Symlinks, symlinkNode)
-				} else {
-					rootSymlink = symlinkNode
-				}
-
-			}
-			if hdr.Type == nar.TypeRegular {
-				uploadBlob := func(r io.Reader) ([]byte, error) {
-					// wrap reader with a reader counting the number of bytes read
-					blobCountW := &CountingWriter{}
-					blobReader := io.TeeReader(r, blobCountW)
-
-					blobDigest, err := blobCb(blobReader)
-					if err != nil {
-						return nil, fmt.Errorf("failure from blobCb: %w", err)
-					}
-
-					// ensure blobCb did read all the way to the end.
-					// If it didn't, the blobCb function is wrong and we should bail out.
-					if blobCountW.BytesWritten() != uint64(hdr.Size) {
-						return nil, fmt.Errorf("blobCb did not read all: %d/%d bytes", blobCountW.BytesWritten(), hdr.Size)
-					}
-
-					return blobDigest, nil
-				}
-
-				h := blake3.New(32, nil)
-				blobReader := io.TeeReader(narReader, io.MultiWriter(h))
-				var blobDigest []byte
-
-				// If this file is small enough, read it off the wire immediately and
-				// upload to the blob service asynchronously. This helps reduce the
-				// RTT on blob uploads for NARs with many small files.
-				doAsync := hdr.Size < asyncUploadThreshold
-				if doAsync {
-					blobContents, err := io.ReadAll(blobReader)
-					if err != nil {
-						return nil, 0, nil, fmt.Errorf("read blob: %w", err)
-					}
-
-					blobDigest = h.Sum(nil)
-
-					asyncBlobWg.Go(func() error {
-						blobDigestFromCb, err := uploadBlob(bytes.NewReader(blobContents))
-						if err != nil {
-							return err
-						}
-
-						if !bytes.Equal(blobDigest, blobDigestFromCb) {
-							return fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest)
-						}
-
-						return nil
-					})
-				} else {
-					blobDigestFromCb, err := uploadBlob(blobReader)
-					if err != nil {
-						return nil, 0, nil, fmt.Errorf("upload blob: %w", err)
-					}
-
-					blobDigest = h.Sum(nil)
-					if !bytes.Equal(blobDigest, blobDigestFromCb) {
-						return nil, 0, nil, fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest)
-					}
-				}
-
-				fileNode := &castorev1pb.FileNode{
-					Name:       []byte(getBasename(hdr.Path)),
-					Digest:     blobDigest,
-					Size:       uint64(hdr.Size),
-					Executable: hdr.Executable,
-				}
-				if len(stack) > 0 {
-					topOfStack := stack[len(stack)-1].directory
-					topOfStack.Files = append(topOfStack.Files, fileNode)
-				} else {
-					rootFile = fileNode
-				}
-			}
-			if hdr.Type == nar.TypeDirectory {
-				directory := &castorev1pb.Directory{
-					Directories: []*castorev1pb.DirectoryNode{},
-					Files:       []*castorev1pb.FileNode{},
-					Symlinks:    []*castorev1pb.SymlinkNode{},
-				}
-				stack = append(stack, stackItem{
-					directory: directory,
-					path:      hdr.Path,
-				})
-			}
-		}
-	}
-}