diff options
Diffstat (limited to 'tvix/nar-bridge/pkg')
-rw-r--r-- | tvix/nar-bridge/pkg/reader/reader.go | 6 | ||||
-rw-r--r-- | tvix/nar-bridge/pkg/server/blob_upload.go | 67 |
2 files changed, 48 insertions, 25 deletions
diff --git a/tvix/nar-bridge/pkg/reader/reader.go b/tvix/nar-bridge/pkg/reader/reader.go index 04e1e6f2ae69..76e0b8b63e86 100644 --- a/tvix/nar-bridge/pkg/reader/reader.go +++ b/tvix/nar-bridge/pkg/reader/reader.go @@ -41,7 +41,7 @@ func New(r io.Reader) *Reader { func (r *Reader) Import( ctx context.Context, // callback function called with each regular file content - fileCb func(fileReader io.Reader) error, + blobCb func(fileReader io.Reader) error, // callback function called with each finalized directory node directoryCb func(directory *storev1pb.Directory) error, ) (*storev1pb.PathInfo, error) { @@ -219,9 +219,9 @@ func (r *Reader) Import( // wrap reader with a reader calculating the blake3 hash fileReader := NewHasher(narReader, blake3.New(32, nil)) - err := fileCb(fileReader) + err := blobCb(fileReader) if err != nil { - return nil, fmt.Errorf("failure from fileCb: %w", err) + return nil, fmt.Errorf("failure from blobCb: %w", err) } // drive the file reader to the end, in case the CB function doesn't read diff --git a/tvix/nar-bridge/pkg/server/blob_upload.go b/tvix/nar-bridge/pkg/server/blob_upload.go index fe554f5a5a2b..82e22864a552 100644 --- a/tvix/nar-bridge/pkg/server/blob_upload.go +++ b/tvix/nar-bridge/pkg/server/blob_upload.go @@ -1,47 +1,70 @@ package server import ( - storev1pb "code.tvl.fyi/tvix/store/protos" + "bufio" "context" "encoding/base64" + "errors" "fmt" - log "github.com/sirupsen/logrus" "io" -) -// this returns a callback function that can be used as fileCb -// for the reader.Import function call -func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error { - return func(fileReader io.Reader) error { - // Read from fileReader into a buffer. - // We currently buffer all contents and send them to blobServiceClient at once, - // but that's about to change. - contents, err := io.ReadAll(fileReader) - if err != nil { - return fmt.Errorf("unable to read all contents from file reader: %w", err) - } + storev1pb "code.tvl.fyi/tvix/store/protos" + log "github.com/sirupsen/logrus" +) - log := log.WithField("blob_size", len(contents)) +// the size of individual BlobChunk we send when uploading to BlobService. +const chunkSize = 1024 * 1024 - log.Infof("about to upload blob") +// this produces a callback function that can be used as blobCb for the +// reader.Import function call +func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error { + return func(blobReader io.Reader) error { + // Ensure the blobReader is buffered to at least the chunk size. + blobReader = bufio.NewReaderSize(blobReader, chunkSize) putter, err := blobServiceClient.Put(ctx) if err != nil { // return error to the importer return fmt.Errorf("error from blob service: %w", err) } - err = putter.Send(&storev1pb.BlobChunk{ - Data: contents, - }) - if err != nil { - return fmt.Errorf("putting blob chunk: %w", err) + + blobSize := 0 + chunk := make([]byte, chunkSize) + + for { + n, err := blobReader.Read(chunk) + if err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("unable to read from blobreader: %w", err) + } + + if n != 0 { + log.WithField("chunk_size", n).Debug("sending chunk") + blobSize += n + + // send the blob chunk to the server. The err is only valid in the inner scope + if err := putter.Send(&storev1pb.BlobChunk{ + Data: chunk[:n], + }); err != nil { + return fmt.Errorf("sending blob chunk: %w", err) + } + } + + // if our read from blobReader returned an EOF, we're done reading + if errors.Is(err, io.EOF) { + break + } + } + resp, err := putter.CloseAndRecv() if err != nil { return fmt.Errorf("close blob putter: %w", err) } - log.WithField("digest", base64.StdEncoding.EncodeToString(resp.GetDigest())).Info("uploaded blob") + log.WithFields(log.Fields{ + "blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()), + "blob_size": blobSize, + }).Info("uploaded blob") return nil } |