diff options
Diffstat (limited to 'tvix/nar-bridge/pkg/importer/directory_upload.go')
-rw-r--r-- | tvix/nar-bridge/pkg/importer/directory_upload.go | 88 |
1 files changed, 0 insertions, 88 deletions
diff --git a/tvix/nar-bridge/pkg/importer/directory_upload.go b/tvix/nar-bridge/pkg/importer/directory_upload.go deleted file mode 100644 index 117f442fa54f..000000000000 --- a/tvix/nar-bridge/pkg/importer/directory_upload.go +++ /dev/null @@ -1,88 +0,0 @@ -package importer - -import ( - "bytes" - "context" - "encoding/base64" - "fmt" - - castorev1pb "code.tvl.fyi/tvix/castore-go" - log "github.com/sirupsen/logrus" -) - -// DirectoriesUploader opens a Put stream when it receives the first Put() call, -// and then uses the opened stream for subsequent Put() calls. -// When the uploading is finished, a call to Done() will close the stream and -// return the root digest returned from the directoryServiceClient. -type DirectoriesUploader struct { - ctx context.Context - directoryServiceClient castorev1pb.DirectoryServiceClient - directoryServicePutStream castorev1pb.DirectoryService_PutClient - lastDirectoryDigest []byte -} - -func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev1pb.DirectoryServiceClient) *DirectoriesUploader { - return &DirectoriesUploader{ - ctx: ctx, - directoryServiceClient: directoryServiceClient, - directoryServicePutStream: nil, - } -} - -func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, error) { - directoryDigest, err := directory.Digest() - if err != nil { - return nil, fmt.Errorf("failed calculating directory digest: %w", err) - } - - // Send the directory to the directory service - // If the stream hasn't been initialized yet, do it first - if du.directoryServicePutStream == nil { - directoryServicePutStream, err := du.directoryServiceClient.Put(du.ctx) - if err != nil { - return nil, fmt.Errorf("unable to initialize directory service put stream: %v", err) - } - du.directoryServicePutStream = directoryServicePutStream - } - - // send the directory out - err = du.directoryServicePutStream.Send(directory) - if err != nil { - return nil, fmt.Errorf("error sending directory: %w", err) - } - log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory") - - // update lastDirectoryDigest - du.lastDirectoryDigest = directoryDigest - - return directoryDigest, nil -} - -// Done closes the stream and returns the response. -// It returns null if closed for a second time. -func (du *DirectoriesUploader) Done() (*castorev1pb.PutDirectoryResponse, error) { - // only close once, and only if we opened. - if du.directoryServicePutStream == nil { - return nil, nil - } - - putDirectoryResponse, err := du.directoryServicePutStream.CloseAndRecv() - if err != nil { - return nil, fmt.Errorf("unable to close directory service put stream: %v", err) - } - - // ensure the response contains the same digest as the one we have in lastDirectoryDigest. - // Otherwise, the backend came up with another digest than we, in which we return an error. - if !bytes.Equal(du.lastDirectoryDigest, putDirectoryResponse.RootDigest) { - return nil, fmt.Errorf( - "backend calculated different root digest as we, expected %s, actual %s", - base64.StdEncoding.EncodeToString(du.lastDirectoryDigest), - base64.StdEncoding.EncodeToString(putDirectoryResponse.RootDigest), - ) - } - - // clear directoryServicePutStream. - du.directoryServicePutStream = nil - - return putDirectoryResponse, nil -} |