about summary refs log tree commit diff
path: root/tvix/nar-bridge/pkg/server/blob_upload.go
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-17T18·47+0300
committerflokli <flokli@flokli.de>2023-09-18T14·02+0000
commit0c031461c3732fd925668a22a042aa5f1b2764f3 (patch)
treeda678eb24f15169e5a4510fbb775f461ef808792 /tvix/nar-bridge/pkg/server/blob_upload.go
parent136b12ddd7d4fa32c25e907a52a7f49a449601ed (diff)
fix(tvix/nar-bridge): chunk blobs r/6610
Instead of creating one big BlobChunk containing all data, and creating
way too large proto messages, chunk blobs up to a reasonable (1MiB)
chunk size, and send them to the server like that.

Change-Id: Ia45a53956a6d7c0599cc59ac516ba37e9fb1b30e
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9357
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Diffstat (limited to '')
-rw-r--r--tvix/nar-bridge/pkg/server/blob_upload.go67
1 files changed, 45 insertions, 22 deletions
diff --git a/tvix/nar-bridge/pkg/server/blob_upload.go b/tvix/nar-bridge/pkg/server/blob_upload.go
index fe554f5a5a..82e22864a5 100644
--- a/tvix/nar-bridge/pkg/server/blob_upload.go
+++ b/tvix/nar-bridge/pkg/server/blob_upload.go
@@ -1,47 +1,70 @@
 package server
 
 import (
-	storev1pb "code.tvl.fyi/tvix/store/protos"
+	"bufio"
 	"context"
 	"encoding/base64"
+	"errors"
 	"fmt"
-	log "github.com/sirupsen/logrus"
 	"io"
-)
 
-// this returns a callback function that can be used as fileCb
-// for the reader.Import function call
-func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error {
-	return func(fileReader io.Reader) error {
-		// Read from fileReader into a buffer.
-		// We currently buffer all contents and send them to blobServiceClient at once,
-		// but that's about to change.
-		contents, err := io.ReadAll(fileReader)
-		if err != nil {
-			return fmt.Errorf("unable to read all contents from file reader: %w", err)
-		}
+	storev1pb "code.tvl.fyi/tvix/store/protos"
+	log "github.com/sirupsen/logrus"
+)
 
-		log := log.WithField("blob_size", len(contents))
+// the size of individual BlobChunk we send when uploading to BlobService.
+const chunkSize = 1024 * 1024
 
-		log.Infof("about to upload blob")
+// this produces a callback function that can be used as blobCb for the
+// reader.Import function call
+func genBlobServiceWriteCb(ctx context.Context, blobServiceClient storev1pb.BlobServiceClient) func(io.Reader) error {
+	return func(blobReader io.Reader) error {
+		// Ensure the blobReader is buffered to at least the chunk size.
+		blobReader = bufio.NewReaderSize(blobReader, chunkSize)
 
 		putter, err := blobServiceClient.Put(ctx)
 		if err != nil {
 			// return error to the importer
 			return fmt.Errorf("error from blob service: %w", err)
 		}
-		err = putter.Send(&storev1pb.BlobChunk{
-			Data: contents,
-		})
-		if err != nil {
-			return fmt.Errorf("putting blob chunk: %w", err)
+
+		blobSize := 0
+		chunk := make([]byte, chunkSize)
+
+		for {
+			n, err := blobReader.Read(chunk)
+			if err != nil && !errors.Is(err, io.EOF) {
+				return fmt.Errorf("unable to read from blobreader: %w", err)
+			}
+
+			if n != 0 {
+				log.WithField("chunk_size", n).Debug("sending chunk")
+				blobSize += n
+
+				// send the blob chunk to the server. The err is only valid in the inner scope
+				if err := putter.Send(&storev1pb.BlobChunk{
+					Data: chunk[:n],
+				}); err != nil {
+					return fmt.Errorf("sending blob chunk: %w", err)
+				}
+			}
+
+			// if our read from blobReader returned an EOF, we're done reading
+			if errors.Is(err, io.EOF) {
+				break
+			}
+
 		}
+
 		resp, err := putter.CloseAndRecv()
 		if err != nil {
 			return fmt.Errorf("close blob putter: %w", err)
 		}
 
-		log.WithField("digest", base64.StdEncoding.EncodeToString(resp.GetDigest())).Info("uploaded blob")
+		log.WithFields(log.Fields{
+			"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
+			"blob_size":   blobSize,
+		}).Info("uploaded blob")
 
 		return nil
 	}