about summary refs log tree commit diff
path: root/tvix/nar-bridge/pkg/server
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nar-bridge/pkg/server')
-rw-r--r--tvix/nar-bridge/pkg/server/directory_upload.go70
-rw-r--r--tvix/nar-bridge/pkg/server/nar_put.go2
2 files changed, 1 insertions, 71 deletions
diff --git a/tvix/nar-bridge/pkg/server/directory_upload.go b/tvix/nar-bridge/pkg/server/directory_upload.go
deleted file mode 100644
index ad04d2e389..0000000000
--- a/tvix/nar-bridge/pkg/server/directory_upload.go
+++ /dev/null
@@ -1,70 +0,0 @@
-package server
-
-import (
-	"context"
-	"encoding/base64"
-	"fmt"
-
-	castorev1pb "code.tvl.fyi/tvix/castore/protos"
-	log "github.com/sirupsen/logrus"
-)
-
-// DirectoriesUploader opens a Put stream when it receives the first Put() call,
-// and then uses the opened stream for subsequent Put() calls.
-// When the uploading is finished, a call to Done() will close the stream and
-// return the root digest returned from the directoryServiceClient.
-type DirectoriesUploader struct {
-	ctx                       context.Context
-	directoryServiceClient    castorev1pb.DirectoryServiceClient
-	directoryServicePutStream castorev1pb.DirectoryService_PutClient
-}
-
-func NewDirectoriesUploader(ctx context.Context, directoryServiceClient castorev1pb.DirectoryServiceClient) *DirectoriesUploader {
-	return &DirectoriesUploader{
-		ctx:                       ctx,
-		directoryServiceClient:    directoryServiceClient,
-		directoryServicePutStream: nil,
-	}
-}
-
-func (du *DirectoriesUploader) Put(directory *castorev1pb.Directory) ([]byte, error) {
-	directoryDigest, err := directory.Digest()
-	if err != nil {
-		return nil, fmt.Errorf("failed calculating directory digest: %w", err)
-	}
-
-	// Send the directory to the directory service
-	// If the stream hasn't been initialized yet, do it first
-	if du.directoryServicePutStream == nil {
-		directoryServicePutStream, err := du.directoryServiceClient.Put(du.ctx)
-		if err != nil {
-			return nil, fmt.Errorf("unable to initialize directory service put stream: %v", err)
-		}
-		du.directoryServicePutStream = directoryServicePutStream
-	}
-
-	// send the directory out
-	err = du.directoryServicePutStream.Send(directory)
-	if err != nil {
-		return nil, fmt.Errorf("error sending directory: %w", err)
-	}
-	log.WithField("digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("uploaded directory")
-
-	return directoryDigest, nil
-}
-
-// Done closes the stream and returns the response.
-func (du *DirectoriesUploader) Done() (*castorev1pb.PutDirectoryResponse, error) {
-	// only close once, and only if we opened.
-	if du.directoryServicePutStream == nil {
-		return nil, nil
-	}
-	putDirectoryResponse, err := du.directoryServicePutStream.CloseAndRecv()
-	if err != nil {
-		return nil, fmt.Errorf("unable to close directory service put stream: %v", err)
-	}
-
-	du.directoryServicePutStream = nil
-
-	return putDirectoryResponse, nil
-}
diff --git a/tvix/nar-bridge/pkg/server/nar_put.go b/tvix/nar-bridge/pkg/server/nar_put.go
index 5a56cba881..41cd257f72 100644
--- a/tvix/nar-bridge/pkg/server/nar_put.go
+++ b/tvix/nar-bridge/pkg/server/nar_put.go
@@ -36,7 +36,7 @@ func registerNarPut(s *Server) {
 
 		log := log.WithField("narhash_url", narHashFromUrl.SRIString())
 
-		directoriesUploader := NewDirectoriesUploader(ctx, s.directoryServiceClient)
+		directoriesUploader := importer.NewDirectoriesUploader(ctx, s.directoryServiceClient)
 		defer directoriesUploader.Done() //nolint:errcheck
 
 		pathInfo, err := importer.Import(