about summary refs log tree commit diff
path: root/tvix/nar-bridge/pkg/server
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-10-03T10·59+0300
committerflokli <flokli@flokli.de>2023-10-05T06·17+0000
commitf92b0ef9336552c46d63a4b497603e691bfbf39b (patch)
tree245334b0c31f4f28251df3827b6729982a54e5f9 /tvix/nar-bridge/pkg/server
parentb1ff1267be5f1dfa4f764648da68bbaec8366ecd (diff)
refactor(tvix/nar-bridge): let callbaks return calculated digests r/6694
This aligns behaviour more with how it should be - it's the
responsibility of the callback functions to return digests of the things
they consume(d). It allows further cleaning up the hasher struct.

Change-Id: I9cbfc87e6abd4ff17fadf39eb6563ec3cb7fcc6f
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9528
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/nar-bridge/pkg/server')
-rw-r--r--tvix/nar-bridge/pkg/server/blob_upload.go14
-rw-r--r--tvix/nar-bridge/pkg/server/directory_upload.go14
-rw-r--r--tvix/nar-bridge/pkg/server/nar_put.go2
3 files changed, 15 insertions, 15 deletions
diff --git a/tvix/nar-bridge/pkg/server/blob_upload.go b/tvix/nar-bridge/pkg/server/blob_upload.go
index 5531335367df..2ae1448e7e79 100644
--- a/tvix/nar-bridge/pkg/server/blob_upload.go
+++ b/tvix/nar-bridge/pkg/server/blob_upload.go
@@ -17,15 +17,15 @@ const chunkSize = 1024 * 1024
 
 // this produces a callback function that can be used as blobCb for the
 // importer.Import function call.
-func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) error {
-	return func(blobReader io.Reader) error {
+func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) {
+	return func(blobReader io.Reader) ([]byte, error) {
 		// Ensure the blobReader is buffered to at least the chunk size.
 		blobReader = bufio.NewReaderSize(blobReader, chunkSize)
 
 		putter, err := blobServiceClient.Put(ctx)
 		if err != nil {
 			// return error to the importer
-			return fmt.Errorf("error from blob service: %w", err)
+			return nil, fmt.Errorf("error from blob service: %w", err)
 		}
 
 		blobSize := 0
@@ -34,7 +34,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl
 		for {
 			n, err := blobReader.Read(chunk)
 			if err != nil && !errors.Is(err, io.EOF) {
-				return fmt.Errorf("unable to read from blobreader: %w", err)
+				return nil, fmt.Errorf("unable to read from blobreader: %w", err)
 			}
 
 			if n != 0 {
@@ -45,7 +45,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl
 				if err := putter.Send(&castorev1pb.BlobChunk{
 					Data: chunk[:n],
 				}); err != nil {
-					return fmt.Errorf("sending blob chunk: %w", err)
+					return nil, fmt.Errorf("sending blob chunk: %w", err)
 				}
 			}
 
@@ -58,7 +58,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl
 
 		resp, err := putter.CloseAndRecv()
 		if err != nil {
-			return fmt.Errorf("close blob putter: %w", err)
+			return nil, fmt.Errorf("close blob putter: %w", err)
 		}
 
 		log.WithFields(log.Fields{
@@ -66,6 +66,6 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl
 			"blob_size":   blobSize,
 		}).Debug("uploaded blob")
 
-		return nil
+		return resp.GetDigest(), nil
 	}
 }
diff --git a/tvix/nar-bridge/pkg/server/directory_upload.go b/tvix/nar-bridge/pkg/server/directory_upload.go
index 27dedc888bc2..e4a98f907577 100644
--- a/tvix/nar-bridge/pkg/server/directory_upload.go
+++ b/tvix/nar-bridge/pkg/server/directory_upload.go
@@ -23,10 +23,10 @@ func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev
 	}
 }
 
-func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error {
-	directoryDgst, err := directory.Digest()
+func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, error) {
+	directoryDigest, err := directory.Digest()
 	if err != nil {
-		return fmt.Errorf("failed calculating directory digest: %w", err)
+		return nil, fmt.Errorf("failed calculating directory digest: %w", err)
 	}
 
 	// Send the directory to the directory service
@@ -34,7 +34,7 @@ func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error {
 	if du.directoryServicePutStream == nil {
 		directoryServicePutStream, err := du.directoryServiceClient.Put(du.ctx)
 		if err != nil {
-			return fmt.Errorf("unable to initialize directory service put stream: %v", err)
+			return nil, fmt.Errorf("unable to initialize directory service put stream: %v", err)
 		}
 		du.directoryServicePutStream = directoryServicePutStream
 	}
@@ -42,11 +42,11 @@ func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error {
 	// send the directory out
 	err = du.directoryServicePutStream.Send(directory)
 	if err != nil {
-		return fmt.Errorf("error sending directory: %w", err)
+		return nil, fmt.Errorf("error sending directory: %w", err)
 	}
-	log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDgst)).Debug("uploaded directory")
+	log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory")
 
-	return nil
+	return directoryDigest, nil
 }
 
 // Done is called whenever we're
diff --git a/tvix/nar-bridge/pkg/server/nar_put.go b/tvix/nar-bridge/pkg/server/nar_put.go
index 0cb0190b7c6f..d8c77c146462 100644
--- a/tvix/nar-bridge/pkg/server/nar_put.go
+++ b/tvix/nar-bridge/pkg/server/nar_put.go
@@ -44,7 +44,7 @@ func registerNarPut(s *Server) {
 			// buffer the body by 10MiB
 			bufio.NewReaderSize(r.Body, 10*1024*1024),
 			genBlobServiceWriteCb(ctx, s.blobServiceClient),
-			func(directory *castorev1pb.Directory) error {
+			func(directory *castorev1pb.Directory) ([]byte, error) {
 				return directoriesUploader.Put(directory)
 			},
 		)