about summary refs log tree commit diff
path: root/tvix/nar-bridge/pkg/server
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-10-03T11·40+0300
committerflokli <flokli@flokli.de>2023-10-05T06·17+0000
commitb78b8d83385a57cc58145662f2b96d97143eda7f (patch)
tree5b56d7d2e8bf4ddeaa485d44cd3ba4d83070dbb7 /tvix/nar-bridge/pkg/server
parent259269482c9d98e800f44b9124cf33a9d43333db (diff)
refactor(tvix/nar-bridge): move blob cb function to pkg/importer r/6698
This is useful outside a HTTP server scenario.

Change-Id: If35f1ab245855378fd01f16ad7b5774d0cf590ba
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9532
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/nar-bridge/pkg/server')
-rw-r--r--tvix/nar-bridge/pkg/server/blob_upload.go71
-rw-r--r--tvix/nar-bridge/pkg/server/nar_put.go2
2 files changed, 1 insertions, 72 deletions
diff --git a/tvix/nar-bridge/pkg/server/blob_upload.go b/tvix/nar-bridge/pkg/server/blob_upload.go
deleted file mode 100644
index 2ae1448e7e79..000000000000
--- a/tvix/nar-bridge/pkg/server/blob_upload.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package server
-
-import (
-	"bufio"
-	"context"
-	"encoding/base64"
-	"errors"
-	"fmt"
-	"io"
-
-	castorev1pb "code.tvl.fyi/tvix/castore/protos"
-	log "github.com/sirupsen/logrus"
-)
-
-// the size of individual BlobChunk we send when uploading to BlobService.
-const chunkSize = 1024 * 1024
-
-// this produces a callback function that can be used as blobCb for the
-// importer.Import function call.
-func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) {
-	return func(blobReader io.Reader) ([]byte, error) {
-		// Ensure the blobReader is buffered to at least the chunk size.
-		blobReader = bufio.NewReaderSize(blobReader, chunkSize)
-
-		putter, err := blobServiceClient.Put(ctx)
-		if err != nil {
-			// return error to the importer
-			return nil, fmt.Errorf("error from blob service: %w", err)
-		}
-
-		blobSize := 0
-		chunk := make([]byte, chunkSize)
-
-		for {
-			n, err := blobReader.Read(chunk)
-			if err != nil && !errors.Is(err, io.EOF) {
-				return nil, fmt.Errorf("unable to read from blobreader: %w", err)
-			}
-
-			if n != 0 {
-				log.WithField("chunk_size", n).Debug("sending chunk")
-				blobSize += n
-
-				// send the blob chunk to the server. The err is only valid in the inner scope
-				if err := putter.Send(&castorev1pb.BlobChunk{
-					Data: chunk[:n],
-				}); err != nil {
-					return nil, fmt.Errorf("sending blob chunk: %w", err)
-				}
-			}
-
-			// if our read from blobReader returned an EOF, we're done reading
-			if errors.Is(err, io.EOF) {
-				break
-			}
-
-		}
-
-		resp, err := putter.CloseAndRecv()
-		if err != nil {
-			return nil, fmt.Errorf("close blob putter: %w", err)
-		}
-
-		log.WithFields(log.Fields{
-			"blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()),
-			"blob_size":   blobSize,
-		}).Debug("uploaded blob")
-
-		return resp.GetDigest(), nil
-	}
-}
diff --git a/tvix/nar-bridge/pkg/server/nar_put.go b/tvix/nar-bridge/pkg/server/nar_put.go
index d8c77c146462..5a56cba8810c 100644
--- a/tvix/nar-bridge/pkg/server/nar_put.go
+++ b/tvix/nar-bridge/pkg/server/nar_put.go
@@ -43,7 +43,7 @@ func registerNarPut(s *Server) {
 			ctx,
 			// buffer the body by 10MiB
 			bufio.NewReaderSize(r.Body, 10*1024*1024),
-			genBlobServiceWriteCb(ctx, s.blobServiceClient),
+			importer.GenBlobUploaderCb(ctx, s.blobServiceClient),
 			func(directory *castorev1pb.Directory) ([]byte, error) {
 				return directoriesUploader.Put(directory)
 			},