about summary refs log tree commit diff
path: root/tvix/nar-bridge/pkg/importer
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-10-03T11·40+0300
committerflokli <flokli@flokli.de>2023-10-05T06·17+0000
commitb78b8d83385a57cc58145662f2b96d97143eda7f (patch)
tree5b56d7d2e8bf4ddeaa485d44cd3ba4d83070dbb7 /tvix/nar-bridge/pkg/importer
parent259269482c9d98e800f44b9124cf33a9d43333db (diff)
refactor(tvix/nar-bridge): move blob cb function to pkg/importer r/6698
This is useful outside a HTTP server scenario.

Change-Id: If35f1ab245855378fd01f16ad7b5774d0cf590ba
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9532
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/nar-bridge/pkg/importer')
-rw-r--r--tvix/nar-bridge/pkg/importer/blob_upload.go71
1 files changed, 71 insertions, 0 deletions
diff --git a/tvix/nar-bridge/pkg/importer/blob_upload.go b/tvix/nar-bridge/pkg/importer/blob_upload.go
new file mode 100644
index 000000000000..24aa1cad1df7
--- /dev/null
+++ b/tvix/nar-bridge/pkg/importer/blob_upload.go
@@ -0,0 +1,71 @@
+package importer
+
+import (
+	"bufio"
+	"context"
+	"encoding/base64"
+	"errors"
+	"fmt"
+	"io"
+
+	castorev1pb "code.tvl.fyi/tvix/castore/protos"
+	log "github.com/sirupsen/logrus"
+)
+
+// the size of individual BlobChunk we send when uploading to BlobService.
+const chunkSize = 1024 * 1024
+
+// this produces a callback function that can be used as blobCb for the
+// importer.Import function call.
+func GenBlobUploaderCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) {
+	return func(blobReader io.Reader) ([]byte, error) {
+		// Ensure the blobReader is buffered to at least the chunk size.
+		blobReader = bufio.NewReaderSize(blobReader, chunkSize)
+
+		putter, err := blobServiceClient.Put(ctx)
+		if err != nil {
+			// return error to the importer
+			return nil, fmt.Errorf("error from blob service: %w", err)
+		}
+
+		blobSize := 0
+		chunk := make([]byte, chunkSize)
+
+		for {
+			n, err := blobReader.Read(chunk)
+			if err != nil && !errors.Is(err, io.EOF) {
+				return nil, fmt.Errorf("unable to read from blobreader: %w", err)
+			}
+
+			if n != 0 {
+				log.WithField("chunk_size", n).Debug("sending chunk")
+				blobSize += n
+
+				// send the blob chunk to the server. The err is only valid in the inner scope
+				if err := putter.Send(&castorev1pb.BlobChunk{
+					Data: chunk[:n],
+				}); err != nil {
+					return nil, fmt.Errorf("sending blob chunk: %w", err)
+				}
+			}
+
+			// if our read from blobReader returned an EOF, we're done reading
+			if errors.Is(err, io.EOF) {
+				break
+			}
+
+		}
+
+		resp, err := putter.CloseAndRecv()
+		if err != nil {
+			return nil, fmt.Errorf("close blob putter: %w", err)
+		}
+
+		log.WithFields(log.Fields{
+			"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
+			"blob_size":   blobSize,
+		}).Debug("uploaded blob")
+
+		return resp.GetDigest(), nil
+	}
+}