package importer
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"path"
"strings"
castorev1pb "code.tvl.fyi/tvix/castore-go"
"github.com/nix-community/go-nix/pkg/nar"
"golang.org/x/sync/errgroup"
"lukechampine.com/blake3"
)
const (
// asyncUploadThreshold controls when a file is buffered into memory and uploaded
// asynchronously. Files must be smaller than the threshold to be uploaded asynchronously.
asyncUploadThreshold = 1024 * 1024 // 1 MiB
// maxAsyncUploadBufferBytes is the maximum number of async blob uploads allowed to be
// running concurrently at any given time for a simple import operation.
maxConcurrentAsyncUploads = 128
)
// An item on the directories stack
type stackItem struct {
path string
directory *castorev1pb.Directory
}
// Import reads a NAR from a reader, and returns a the root node,
// NAR size and NAR sha256 digest.
func Import(
// a context, to support cancellation
ctx context.Context,
// The reader the data is read from
r io.Reader,
// callback function called with each regular file content
blobCb func(fileReader io.Reader) ([]byte, error),
// callback function called with each finalized directory node
directoryCb func(directory *castorev1pb.Directory) ([]byte, error),
) (*castorev1pb.Node, uint64, []byte, error) {
// We need to wrap the underlying reader a bit.
// - we want to keep track of the number of bytes read in total
// - we calculate the sha256 digest over all data read
// Express these two things in a MultiWriter, and give the NAR reader a
// TeeReader that writes to it.
narCountW := &CountingWriter{}
sha256W := sha256.New()
multiW := io.MultiWriter(narCountW, sha256W)
narReader, err := nar.NewReader(io.TeeReader(r, multiW))
if err != nil {
return nil, 0, nil, fmt.Errorf("failed to instantiate nar reader: %w", err)
}
defer narReader.Close()
// If we store a symlink or regular file at the root, these are not nil.
// If they are nil, we instead have a stackDirectory.
var rootSymlink *castorev1pb.SymlinkNode
var rootFile *castorev1pb.FileNode
var stackDirectory *castorev1pb.Directory
// Keep track of all asynch blob uploads so we can make sure they all succeed
// before returning.
var asyncBlobWg errgroup.Group
asyncBlobWg.SetLimit(maxConcurrentAsyncUploads)
var stack = []stackItem{}
// popFromStack is used when we transition to a different directory or
// drain the stack when we reach the end of the NAR.
// It adds the popped element to the element underneath if any,
// and passes it to the directoryCb callback.
// This function may only be called if the stack is not already empty.
popFromStack := func() error {
// Keep the top item, and "resize" the stack slice.
// This will only make the last element unaccessible, but chances are high
// we're re-using that space anyways.
toPop := stack[len(stack)-1]
stack = stack[:len(stack)-1]
// call the directoryCb
directoryDigest, err := directoryCb(toPop.directory)
if err != nil {
return fmt.Errorf("failed calling directoryCb: %w", err)
}
// if there's still a parent left on the stack, refer to it from there.
if len(stack) > 0 {
topOfStack := stack[len(stack)-1].directory
topOfStack.Directories = append(topOfStack.Directories, &castorev1pb.DirectoryNode{
Name: []byte(path.Base(toPop.path)),
Digest: directoryDigest,
Size: toPop.directory.Size(),
})
}
// Keep track that we have encounter at least one directory
stackDirectory = toPop.directory
return nil
}
getBasename := func(p string) string {
// extract the basename. In case of "/", replace with empty string.
basename := path.Base(p)
if basename == "/" {
basename = ""
}
return basename
}
for {
select {
case <-ctx.Done():
return nil, 0, nil, ctx.Err()
default:
// call narReader.Next() to get the next element
hdr, err := narReader.Next()
// If this returns an error, it's either EOF (when we're done reading from the NAR),
// or another error.
if err != nil {
// if this returns no EOF, bail out
if !errors.Is(err, io.EOF) {
return nil, 0, nil, fmt.Errorf("failed getting next nar element: %w", err)
}
// The NAR has been read all the way to the end…
// Make sure we close the nar reader, which might read some final trailers.
if err := narReader.Close(); err != nil {
return nil, 0, nil, fmt.Errorf("unable to close nar reader: %w", err)
}
// Check the stack. While it's not empty, we need to pop things off the stack.
for len(stack) > 0 {
err := popFromStack()
if err != nil {
return nil, 0, nil, fmt.Errorf("unable to pop from stack: %w", err)
}
}
// Wait for any pending blob uploads.
err := asyncBlobWg.Wait()
if err != nil {
return nil, 0, nil, fmt.Errorf("async blob upload: %w", err)
}
// Stack is empty.
// Now either root{File,Symlink,Directory} is not nil,
// and we can return the root node.
narSize := narCountW.BytesWritten()
narSha256 := sha256W.Sum(nil)
if rootFile != nil {
return &castorev1pb.Node{
Node: &castorev1pb.Node_File{
File: rootFile,
},
}, narSize, narSha256, nil
} else if rootSymlink != nil {
return &castorev1pb.Node{
Node: &castorev1pb.Node_Symlink{
Symlink: rootSymlink,
},
}, narSize, narSha256, nil
} else if stackDirectory != nil {
// calculate directory digest (i.e. after we received all its contents)
dgst, err := stackDirectory.Digest()
if err != nil {
return nil, 0, nil, fmt.Errorf("unable to calculate root directory digest: %w", err)
}
return &castorev1pb.Node{
Node: &castorev1pb.Node_Directory{
Directory: &castorev1pb.DirectoryNode{
Name: []byte{},
Digest: dgst,
Size: stackDirectory.Size(),
},
},
}, narSize, narSha256, nil
} else {
return nil, 0, nil, fmt.Errorf("no root set")
}
}
// Check for valid path transitions, pop from stack if needed
// The nar reader already gives us some guarantees about ordering and illegal transitions,
// So we really only need to check if the top-of-stack path is a prefix of the path,
// and if it's not, pop from the stack. We do this repeatedly until the top of the stack is
// the subdirectory the new entry is in, or we hit the root directory.
// We don't need to worry about the root node case, because we can only finish the root "/"
// If we're at the end of the NAR reader (covered by the EOF check)
for len(stack) > 1 && !strings.HasPrefix(hdr.Path, stack[len(stack)-1].path+"/") {
err := popFromStack()
if err != nil {
return nil, 0, nil, fmt.Errorf("unable to pop from stack: %w", err)
}
}
if hdr.Type == nar.TypeSymlink {
symlinkNode := &castorev1pb.SymlinkNode{
Name: []byte(getBasename(hdr.Path)),
Target: []byte(hdr.LinkTarget),
}
if len(stack) > 0 {
topOfStack := stack[len(stack)-1].directory
topOfStack.Symlinks = append(topOfStack.Symlinks, symlinkNode)
} else {
rootSymlink = symlinkNode
}
}
if hdr.Type == nar.TypeRegular {
uploadBlob := func(r io.Reader) ([]byte, error) {
// wrap reader with a reader counting the number of bytes read
blobCountW := &CountingWriter{}
blobReader := io.TeeReader(r, blobCountW)
blobDigest, err := blobCb(blobReader)
if err != nil {
return nil, fmt.Errorf("failure from blobCb: %w", err)
}
// ensure blobCb did read all the way to the end.
// If it didn't, the blobCb function is wrong and we should bail out.
if blobCountW.BytesWritten() != uint64(hdr.Size) {
return nil, fmt.Errorf("blobCb did not read all: %d/%d bytes", blobCountW.BytesWritten(), hdr.Size)
}
return blobDigest, nil
}
h := blake3.New(32, nil)
blobReader := io.TeeReader(narReader, io.MultiWriter(h))
var blobDigest []byte
// If this file is small enough, read it off the wire immediately and
// upload to the blob service asynchronously. This helps reduce the
// RTT on blob uploads for NARs with many small files.
doAsync := hdr.Size < asyncUploadThreshold
if doAsync {
blobContents, err := io.ReadAll(blobReader)
if err != nil {
return nil, 0, nil, fmt.Errorf("read blob: %w", err)
}
blobDigest = h.Sum(nil)
asyncBlobWg.Go(func() error {
blobDigestFromCb, err := uploadBlob(bytes.NewReader(blobContents))
if err != nil {
return err
}
if !bytes.Equal(blobDigest, blobDigestFromCb) {
return fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest)
}
return nil
})
} else {
blobDigestFromCb, err := uploadBlob(blobReader)
if err != nil {
return nil, 0, nil, fmt.Errorf("upload blob: %w", err)
}
blobDigest = h.Sum(nil)
if !bytes.Equal(blobDigest, blobDigestFromCb) {
return nil, 0, nil, fmt.Errorf("unexpected digest (got %x, expected %x)", blobDigestFromCb, blobDigest)
}
}
fileNode := &castorev1pb.FileNode{
Name: []byte(getBasename(hdr.Path)),
Digest: blobDigest,
Size: uint64(hdr.Size),
Executable: hdr.Executable,
}
if len(stack) > 0 {
topOfStack := stack[len(stack)-1].directory
topOfStack.Files = append(topOfStack.Files, fileNode)
} else {
rootFile = fileNode
}
}
if hdr.Type == nar.TypeDirectory {
directory := &castorev1pb.Directory{
Directories: []*castorev1pb.DirectoryNode{},
Files: []*castorev1pb.FileNode{},
Symlinks: []*castorev1pb.SymlinkNode{},
}
stack = append(stack, stackItem{
directory: directory,
path: hdr.Path,
})
}
}
}
}