From f92b0ef9336552c46d63a4b497603e691bfbf39b Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 3 Oct 2023 13:59:13 +0300 Subject: refactor(tvix/nar-bridge): let callbaks return calculated digests This aligns behaviour more with how it should be - it's the responsibility of the callback functions to return digests of the things they consume(d). It allows further cleaning up the hasher struct. Change-Id: I9cbfc87e6abd4ff17fadf39eb6563ec3cb7fcc6f Reviewed-on: https://cl.tvl.fyi/c/depot/+/9528 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: Connor Brewster --- tvix/nar-bridge/pkg/server/blob_upload.go | 14 +++++++------- tvix/nar-bridge/pkg/server/directory_upload.go | 14 +++++++------- tvix/nar-bridge/pkg/server/nar_put.go | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) (limited to 'tvix/nar-bridge/pkg/server') diff --git a/tvix/nar-bridge/pkg/server/blob_upload.go b/tvix/nar-bridge/pkg/server/blob_upload.go index 5531335367df..2ae1448e7e79 100644 --- a/tvix/nar-bridge/pkg/server/blob_upload.go +++ b/tvix/nar-bridge/pkg/server/blob_upload.go @@ -17,15 +17,15 @@ const chunkSize = 1024 * 1024 // this produces a callback function that can be used as blobCb for the // importer.Import function call. -func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) error { - return func(blobReader io.Reader) error { +func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.BlobServiceClient) func(io.Reader) ([]byte, error) { + return func(blobReader io.Reader) ([]byte, error) { // Ensure the blobReader is buffered to at least the chunk size. blobReader = bufio.NewReaderSize(blobReader, chunkSize) putter, err := blobServiceClient.Put(ctx) if err != nil { // return error to the importer - return fmt.Errorf("error from blob service: %w", err) + return nil, fmt.Errorf("error from blob service: %w", err) } blobSize := 0 @@ -34,7 +34,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl for { n, err := blobReader.Read(chunk) if err != nil && !errors.Is(err, io.EOF) { - return fmt.Errorf("unable to read from blobreader: %w", err) + return nil, fmt.Errorf("unable to read from blobreader: %w", err) } if n != 0 { @@ -45,7 +45,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl if err := putter.Send(&castorev1pb.BlobChunk{ Data: chunk[:n], }); err != nil { - return fmt.Errorf("sending blob chunk: %w", err) + return nil, fmt.Errorf("sending blob chunk: %w", err) } } @@ -58,7 +58,7 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl resp, err := putter.CloseAndRecv() if err != nil { - return fmt.Errorf("close blob putter: %w", err) + return nil, fmt.Errorf("close blob putter: %w", err) } log.WithFields(log.Fields{ @@ -66,6 +66,6 @@ func genBlobServiceWriteCb(ctx context.Context, blobServiceClient castorev1pb.Bl "blob_size": blobSize, }).Debug("uploaded blob") - return nil + return resp.GetDigest(), nil } } diff --git a/tvix/nar-bridge/pkg/server/directory_upload.go b/tvix/nar-bridge/pkg/server/directory_upload.go index 27dedc888bc2..e4a98f907577 100644 --- a/tvix/nar-bridge/pkg/server/directory_upload.go +++ b/tvix/nar-bridge/pkg/server/directory_upload.go @@ -23,10 +23,10 @@ func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev } } -func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error { - directoryDgst, err := directory.Digest() +func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, error) { + directoryDigest, err := directory.Digest() if err != nil { - return fmt.Errorf("failed calculating directory digest: %w", err) + return nil, fmt.Errorf("failed calculating directory digest: %w", err) } // Send the directory to the directory service @@ -34,7 +34,7 @@ func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error { if du.directoryServicePutStream == nil { directoryServicePutStream, err := du.directoryServiceClient.Put(du.ctx) if err != nil { - return fmt.Errorf("unable to initialize directory service put stream: %v", err) + return nil, fmt.Errorf("unable to initialize directory service put stream: %v", err) } du.directoryServicePutStream = directoryServicePutStream } @@ -42,11 +42,11 @@ func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) error { // send the directory out err = du.directoryServicePutStream.Send(directory) if err != nil { - return fmt.Errorf("error sending directory: %w", err) + return nil, fmt.Errorf("error sending directory: %w", err) } - log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDgst)).Debug("uploaded directory") + log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory") - return nil + return directoryDigest, nil } // Done is called whenever we're diff --git a/tvix/nar-bridge/pkg/server/nar_put.go b/tvix/nar-bridge/pkg/server/nar_put.go index 0cb0190b7c6f..d8c77c146462 100644 --- a/tvix/nar-bridge/pkg/server/nar_put.go +++ b/tvix/nar-bridge/pkg/server/nar_put.go @@ -44,7 +44,7 @@ func registerNarPut(s *Server) { // buffer the body by 10MiB bufio.NewReaderSize(r.Body, 10*1024*1024), genBlobServiceWriteCb(ctx, s.blobServiceClient), - func(directory *castorev1pb.Directory) error { + func(directory *castorev1pb.Directory) ([]byte, error) { return directoriesUploader.Put(directory) }, ) -- cgit 1.4.1