about summary refs log tree commit diff
path: root/tvix/nar-bridge/pkg/server
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nar-bridge/pkg/server')
-rw-r--r--tvix/nar-bridge/pkg/server/blob_upload.go67
1 files changed, 45 insertions, 22 deletions
diff --git a/tvix/nar-bridge/pkg/server/blob_upload.go b/tvix/nar-bridge/pkg/server/blob_upload.go
index fe554f5a5a2b..82e22864a552 100644
--- a/tvix/nar-bridge/pkg/server/blob_upload.go
+++ b/tvix/nar-bridge/pkg/server/blob_upload.go
@@ -1,47 +1,70 @@
 package server
 
 import (
-	storev1pb "code.tvl.fyi/tvix/store/protos"
+	"bufio"
 	"context"
 	"encoding/base64"
+	"errors"
 	"fmt"
-	log "github.com/sirupsen/logrus"
 	"io"
-)
 
-// this returns a callback function that can be used as fileCb
-// for the reader.Import function call
-func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error {
-	return func(fileReader io.Reader) error {
-		// Read from fileReader into a buffer.
-		// We currently buffer all contents and send them to blobServiceClient at once,
-		// but that's about to change.
-		contents, err := io.ReadAll(fileReader)
-		if err != nil {
-			return fmt.Errorf("unable to read all contents from file reader: %w", err)
-		}
+	storev1pb "code.tvl.fyi/tvix/store/protos"
+	log "github.com/sirupsen/logrus"
+)
 
-		log := log.WithField("blob_size", len(contents))
+// the size of individual BlobChunk we send when uploading to BlobService.
+const chunkSize = 1024 * 1024
 
-		log.Infof("about to upload blob")
+// this produces a callback function that can be used as blobCb for the
+// reader.Import function call
+func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error {
+	return func(blobReader io.Reader) error {
+		// Ensure the blobReader is buffered to at least the chunk size.
+		blobReader = bufio.NewReaderSize(blobReader, chunkSize)
 
 		putter, err := blobServiceClient.Put(ctx)
 		if err != nil {
 			// return error to the importer
 			return fmt.Errorf("error from blob service: %w", err)
 		}
-		err = putter.Send(&storev1pb.BlobChunk{
-			Data: contents,
-		})
-		if err != nil {
-			return fmt.Errorf("putting blob chunk: %w", err)
+
+		blobSize := 0
+		chunk := make([]byte, chunkSize)
+
+		for {
+			n, err := blobReader.Read(chunk)
+			if err != nil && !errors.Is(err, io.EOF) {
+				return fmt.Errorf("unable to read from blobreader: %w", err)
+			}
+
+			if n != 0 {
+				log.WithField("chunk_size", n).Debug("sending chunk")
+				blobSize += n
+
+				// send the blob chunk to the server. The err is only valid in the inner scope
+				if err := putter.Send(&storev1pb.BlobChunk{
+					Data: chunk[:n],
+				}); err != nil {
+					return fmt.Errorf("sending blob chunk: %w", err)
+				}
+			}
+
+			// if our read from blobReader returned an EOF, we're done reading
+			if errors.Is(err, io.EOF) {
+				break
+			}
+
 		}
+
 		resp, err := putter.CloseAndRecv()
 		if err != nil {
 			return fmt.Errorf("close blob putter: %w", err)
 		}
 
-		log.WithField("digest", base64.StdEncoding.EncodeToString(resp.GetDigest())).Info("uploaded blob")
+		log.WithFields(log.Fields{
+			"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
+			"blob_size":   blobSize,
+		}).Info("uploaded blob")
 
 		return nil
 	}