From b78b8d83385a57cc58145662f2b96d97143eda7f Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 3 Oct 2023 14:40:58 +0300 Subject: refactor(tvix/nar-bridge): move blob cb function to pkg/importer This is useful outside a HTTP server scenario. Change-Id: If35f1ab245855378fd01f16ad7b5774d0cf590ba Reviewed-on: https://cl.tvl.fyi/c/depot/+/9532 Tested-by: BuildkiteCI Reviewed-by: Connor Brewster Autosubmit: flokli --- tvix/nar-bridge/pkg/importer/blob_upload.go | 71 +++++++++++++++++++++++++++++ tvix/nar-bridge/pkg/server/blob_upload.go | 71 ----------------------------- tvix/nar-bridge/pkg/server/nar_put.go | 2 +- 3 files changed, 72 insertions(+), 72 deletions(-) create mode 100644 tvix/nar-bridge/pkg/importer/blob_upload.go delete mode 100644 tvix/nar-bridge/pkg/server/blob_upload.go diff --git a/tvix/nar-bridge/pkg/importer/blob_upload.go b/tvix/nar-bridge/pkg/importer/blob_upload.go new file mode 100644 index 0000000000..24aa1cad1d --- /dev/null +++ b/tvix/nar-bridge/pkg/importer/blob_upload.go @@ -0,0 +1,71 @@ +package importer + +import ( + "bufio" + "context" + "encoding/base64" + "errors" + "fmt" + "io" + + castorev1pb "code.tvl.fyi/tvix/castore/protos" + log "github.com/sirupsen/logrus" +) + +// the size of individual BlobChunk we send when uploading to BlobService. +const chunkSize = 1024 * 1024 + +// this produces a callback function that can be used as blobCb for the +// importer.Import function call. +func GenBlobUploaderCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) { + return func(blobReader io.Reader) ([]byte, error) { + // Ensure the blobReader is buffered to at least the chunk size. + blobReader = bufio.NewReaderSize(blobReader, chunkSize) + + putter, err := blobServiceClient.Put(ctx) + if err != nil { + // return error to the importer + return nil, fmt.Errorf("error from blob service: %w", err) + } + + blobSize := 0 + chunk := make([]byte, chunkSize) + + for { + n, err := blobReader.Read(chunk) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("unable to read from blobreader: %w", err) + } + + if n != 0 { + log.WithField("chunk_size", n).Debug("sending chunk") + blobSize += n + + // send the blob chunk to the server. The err is only valid in the inner scope + if err := putter.Send(&castorev1pb.BlobChunk{ + Data: chunk[:n], + }); err != nil { + return nil, fmt.Errorf("sending blob chunk: %w", err) + } + } + + // if our read from blobReader returned an EOF, we're done reading + if errors.Is(err, io.EOF) { + break + } + + } + + resp, err := putter.CloseAndRecv() + if err != nil { + return nil, fmt.Errorf("close blob putter: %w", err) + } + + log.WithFields(log.Fields{ + "blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()), + "blob_size": blobSize, + }).Debug("uploaded blob") + + return resp.GetDigest(), nil + } +} diff --git a/tvix/nar-bridge/pkg/server/blob_upload.go b/tvix/nar-bridge/pkg/server/blob_upload.go deleted file mode 100644 index 2ae1448e7e..0000000000 --- a/tvix/nar-bridge/pkg/server/blob_upload.go +++ /dev/null @@ -1,71 +0,0 @@ -package server - -import ( - "bufio" - "context" - "encoding/base64" - "errors" - "fmt" - "io" - - castorev1pb "code.tvl.fyi/tvix/castore/protos" - log "github.com/sirupsen/logrus" -) - -// the size of individual BlobChunk we send when uploading to BlobService. -const chunkSize = 1024 * 1024 - -// this produces a callback function that can be used as blobCb for the -// importer.Import function call. -func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) { - return func(blobReader io.Reader) ([]byte, error) { - // Ensure the blobReader is buffered to at least the chunk size. - blobReader = bufio.NewReaderSize(blobReader, chunkSize) - - putter, err := blobServiceClient.Put(ctx) - if err != nil { - // return error to the importer - return nil, fmt.Errorf("error from blob service: %w", err) - } - - blobSize := 0 - chunk := make([]byte, chunkSize) - - for { - n, err := blobReader.Read(chunk) - if err != nil && !errors.Is(err, io.EOF) { - return nil, fmt.Errorf("unable to read from blobreader: %w", err) - } - - if n != 0 { - log.WithField("chunk_size", n).Debug("sending chunk") - blobSize += n - - // send the blob chunk to the server. The err is only valid in the inner scope - if err := putter.Send(&castorev1pb.BlobChunk{ - Data: chunk[:n], - }); err != nil { - return nil, fmt.Errorf("sending blob chunk: %w", err) - } - } - - // if our read from blobReader returned an EOF, we're done reading - if errors.Is(err, io.EOF) { - break - } - - } - - resp, err := putter.CloseAndRecv() - if err != nil { - return nil, fmt.Errorf("close blob putter: %w", err) - } - - log.WithFields(log.Fields{ - "blob_digest": base64.StdEncoding.EncodeToString(resp.GetDigest()), - "blob_size": blobSize, - }).Debug("uploaded blob") - - return resp.GetDigest(), nil - } -} diff --git a/tvix/nar-bridge/pkg/server/nar_put.go b/tvix/nar-bridge/pkg/server/nar_put.go index d8c77c1464..5a56cba881 100644 --- a/tvix/nar-bridge/pkg/server/nar_put.go +++ b/tvix/nar-bridge/pkg/server/nar_put.go @@ -43,7 +43,7 @@ func registerNarPut(s *Server) { ctx, // buffer the body by 10MiB bufio.NewReaderSize(r.Body, 10*1024*1024), - genBlobServiceWriteCb(ctx, s.blobServiceClient), + importer.GenBlobUploaderCb(ctx, s.blobServiceClient), func(directory *castorev1pb.Directory) ([]byte, error) { return directoriesUploader.Put(directory) }, -- cgit 1.4.1