From 0353108e99c6c2b7f15ea0c99a8ca4fd899d241a Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 3 Oct 2023 14:42:59 +0300 Subject: refactor(tvix/nar-bridge): move DirectoriesUploader to pkg/importer This is useful outside a HTTP server scenario. Change-Id: I0042a6e773906a15a254d850520e6f841035bf20 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9533 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: Connor Brewster --- tvix/nar-bridge/pkg/importer/directory_upload.go | 70 ++++++++++++++++++++++++ tvix/nar-bridge/pkg/server/directory_upload.go | 70 ------------------------ tvix/nar-bridge/pkg/server/nar_put.go | 2 +- 3 files changed, 71 insertions(+), 71 deletions(-) create mode 100644 tvix/nar-bridge/pkg/importer/directory_upload.go delete mode 100644 tvix/nar-bridge/pkg/server/directory_upload.go (limited to 'tvix/nar-bridge') diff --git a/tvix/nar-bridge/pkg/importer/directory_upload.go b/tvix/nar-bridge/pkg/importer/directory_upload.go new file mode 100644 index 000000000000..5d5ca1f3899d --- /dev/null +++ b/tvix/nar-bridge/pkg/importer/directory_upload.go @@ -0,0 +1,70 @@ +package importer + +import ( + "context" + "encoding/base64" + "fmt" + + castorev1pb "code.tvl.fyi/tvix/castore/protos" + log "github.com/sirupsen/logrus" +) + +// DirectoriesUploader opens a Put stream when it receives the first Put() call, +// and then uses the opened stream for subsequent Put() calls. +// When the uploading is finished, a call to Done() will close the stream and +// return the root digest returned from the directoryServiceClient. +type DirectoriesUploader struct { + ctx context.Context + directoryServiceClient castorev1pb.DirectoryServiceClient + directoryServicePutStream castorev1pb.DirectoryService_PutClient +} + +func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev1pb.DirectoryServiceClient) *DirectoriesUploader { + return &DirectoriesUploader{ + ctx: ctx, + directoryServiceClient: directoryServiceClient, + directoryServicePutStream: nil, + } +} + +func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, error) { + directoryDigest, err := directory.Digest() + if err != nil { + return nil, fmt.Errorf("failed calculating directory digest: %w", err) + } + + // Send the directory to the directory service + // If the stream hasn't been initialized yet, do it first + if du.directoryServicePutStream == nil { + directoryServicePutStream, err := du.directoryServiceClient.Put(du.ctx) + if err != nil { + return nil, fmt.Errorf("unable to initialize directory service put stream: %v", err) + } + du.directoryServicePutStream = directoryServicePutStream + } + + // send the directory out + err = du.directoryServicePutStream.Send(directory) + if err != nil { + return nil, fmt.Errorf("error sending directory: %w", err) + } + log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory") + + return directoryDigest, nil +} + +// Done closes the stream and returns the response. +func (du *DirectoriesUploader) Done() (*castorev1pb.PutDirectoryResponse, error) { + // only close once, and only if we opened. + if du.directoryServicePutStream == nil { + return nil, nil + } + putDirectoryResponse, err := du.directoryServicePutStream.CloseAndRecv() + if err != nil { + return nil, fmt.Errorf("unable to close directory service put stream: %v", err) + } + + du.directoryServicePutStream = nil + + return putDirectoryResponse, nil +} diff --git a/tvix/nar-bridge/pkg/server/directory_upload.go b/tvix/nar-bridge/pkg/server/directory_upload.go deleted file mode 100644 index ad04d2e389bb..000000000000 --- a/tvix/nar-bridge/pkg/server/directory_upload.go +++ /dev/null @@ -1,70 +0,0 @@ -package server - -import ( - "context" - "encoding/base64" - "fmt" - - castorev1pb "code.tvl.fyi/tvix/castore/protos" - log "github.com/sirupsen/logrus" -) - -// DirectoriesUploader opens a Put stream when it receives the first Put() call, -// and then uses the opened stream for subsequent Put() calls. -// When the uploading is finished, a call to Done() will close the stream and -// return the root digest returned from the directoryServiceClient. -type DirectoriesUploader struct { - ctx context.Context - directoryServiceClient castorev1pb.DirectoryServiceClient - directoryServicePutStream castorev1pb.DirectoryService_PutClient -} - -func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev1pb.DirectoryServiceClient) *DirectoriesUploader { - return &DirectoriesUploader{ - ctx: ctx, - directoryServiceClient: directoryServiceClient, - directoryServicePutStream: nil, - } -} - -func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, error) { - directoryDigest, err := directory.Digest() - if err != nil { - return nil, fmt.Errorf("failed calculating directory digest: %w", err) - } - - // Send the directory to the directory service - // If the stream hasn't been initialized yet, do it first - if du.directoryServicePutStream == nil { - directoryServicePutStream, err := du.directoryServiceClient.Put(du.ctx) - if err != nil { - return nil, fmt.Errorf("unable to initialize directory service put stream: %v", err) - } - du.directoryServicePutStream = directoryServicePutStream - } - - // send the directory out - err = du.directoryServicePutStream.Send(directory) - if err != nil { - return nil, fmt.Errorf("error sending directory: %w", err) - } - log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory") - - return directoryDigest, nil -} - -// Done closes the stream and returns the response. -func (du *DirectoriesUploader) Done() (*castorev1pb.PutDirectoryResponse, error) { - // only close once, and only if we opened. - if du.directoryServicePutStream == nil { - return nil, nil - } - putDirectoryResponse, err := du.directoryServicePutStream.CloseAndRecv() - if err != nil { - return nil, fmt.Errorf("unable to close directory service put stream: %v", err) - } - - du.directoryServicePutStream = nil - - return putDirectoryResponse, nil -} diff --git a/tvix/nar-bridge/pkg/server/nar_put.go b/tvix/nar-bridge/pkg/server/nar_put.go index 5a56cba8810c..41cd257f722a 100644 --- a/tvix/nar-bridge/pkg/server/nar_put.go +++ b/tvix/nar-bridge/pkg/server/nar_put.go @@ -36,7 +36,7 @@ func registerNarPut(s *Server) { log := log.WithField("narhash_url", narHashFromUrl.SRIString()) - directoriesUploader := NewDirectoriesUploader(ctx, s.directoryServiceClient) + directoriesUploader := importer.NewDirectoriesUploader(ctx, s.directoryServiceClient) defer directoriesUploader.Done() //nolint:errcheck pathInfo, err := importer.Import( -- cgit 1.4.1