about summary refs log tree commit diff
path: root/tvix/nar-bridge
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nar-bridge')
-rw-r--r--tvix/nar-bridge/.gitignore1
-rw-r--r--tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go117
-rw-r--r--tvix/nar-bridge/default.nix2
-rw-r--r--tvix/nar-bridge/go.mod1
-rw-r--r--tvix/nar-bridge/go.sum3
-rw-r--r--tvix/nar-bridge/pkg/pathinfosvc/server.go300
6 files changed, 423 insertions, 1 deletions
diff --git a/tvix/nar-bridge/.gitignore b/tvix/nar-bridge/.gitignore
index 867be7a1273f..d70e1f8120cc 100644
--- a/tvix/nar-bridge/.gitignore
+++ b/tvix/nar-bridge/.gitignore
@@ -1 +1,2 @@
 /nar-bridge-http
+/nar-bridge-pathinfo
diff --git a/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go b/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go
new file mode 100644
index 000000000000..76d0fafa4cd6
--- /dev/null
+++ b/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go
@@ -0,0 +1,117 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"net/http"
+	"net/url"
+	"os"
+	"os/signal"
+	"strings"
+	"time"
+
+	"github.com/alecthomas/kong"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/grpc/reflection"
+
+	castorev1pb "code.tvl.fyi/tvix/castore/protos"
+	"code.tvl.fyi/tvix/nar-bridge/pkg/pathinfosvc"
+	storev1pb "code.tvl.fyi/tvix/store/protos"
+	"github.com/sirupsen/logrus"
+	log "github.com/sirupsen/logrus"
+)
+
+// `help:"Provide a tvix-store gRPC PathInfoService for a HTTP Nix Binary Cache"`
+var cli struct {
+	LogLevel             string   `enum:"trace,debug,info,warn,error,fatal,panic" help:"The log level to log with" default:"info"`
+	ListenAddr           string   `name:"listen-addr" help:"The address this service listens on" type:"string" default:"[::]:8001"` //nolint:lll
+	BlobServiceAddr      string   `name:"blob-service-addr" env:"BLOB_SERVICE_ADDR" default:"grpc+http://[::1]:8000"`
+	DirectoryServiceAddr string   `name:"directory-service-addr" env:"DIRECTORY_SERVICE_ADDR" default:"grpc+http://[::1]:8000"`
+	HTTPBinaryCacheURL   *url.URL `name:"http-binary-cache-url" env:"HTTP_BINARY_CACHE_URL" help:"The URL containing the Nix HTTP Binary cache" default:"https://cache.nixos.org"`
+}
+
+func connectService(ctx context.Context, serviceAddr string) (*grpc.ClientConn, error) {
+	if !strings.HasPrefix(serviceAddr, "grpc+http://") {
+		return nil, fmt.Errorf("invalid serviceAddr: %s", serviceAddr)
+	}
+	addr := strings.TrimPrefix(serviceAddr, "grpc+http://")
+
+	conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		log.Fatalf("did not connect: %v", err)
+	}
+	return conn, nil
+}
+
+func main() {
+	_ = kong.Parse(&cli)
+
+	logLevel, err := logrus.ParseLevel(cli.LogLevel)
+	if err != nil {
+		log.Fatal("invalid log level")
+	}
+	logrus.SetLevel(logLevel)
+
+	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
+	defer stop()
+
+	// connect to the two stores
+	connBlobSvc, err := connectService(ctx, cli.BlobServiceAddr)
+	if err != nil {
+		log.Fatalf("unable to connect to blob service: %v", err)
+	}
+	defer connBlobSvc.Close()
+
+	connDirectorySvc, err := connectService(ctx, cli.DirectoryServiceAddr)
+	if err != nil {
+		log.Fatalf("unable to connect to directory service: %v", err)
+	}
+	defer connDirectorySvc.Close()
+
+	// set up pathinfoservice
+	var opts []grpc.ServerOption
+	s := grpc.NewServer(opts...)
+	reflection.Register(s)
+
+	storev1pb.RegisterPathInfoServiceServer(s,
+		pathinfosvc.New(
+			cli.HTTPBinaryCacheURL,
+			&http.Client{},
+			castorev1pb.NewDirectoryServiceClient(connDirectorySvc),
+			castorev1pb.NewBlobServiceClient(connBlobSvc),
+		),
+	)
+
+	log.Printf("Starting nar-bridge-pathinfosvc at %v", cli.ListenAddr)
+	lis, err := net.Listen("tcp", cli.ListenAddr)
+	if err != nil {
+		log.Fatalf("failed to listen: %v", err)
+	}
+	go s.Serve(lis)
+
+	// listen for the interrupt signal.
+	<-ctx.Done()
+
+	// Restore default behaviour on the interrupt signal
+	stop()
+	log.Info("Received Signal, shutting down, press Ctl+C again to force.")
+
+	stopped := make(chan interface{})
+	go func() {
+		s.GracefulStop()
+		close(stopped)
+	}()
+
+	t := time.NewTimer(30 * time.Second)
+	select {
+	case <-t.C:
+		log.Info("timeout, kicking remaining clients")
+		s.Stop()
+	case <-stopped:
+		log.Info("all clients left during grace period")
+		t.Stop()
+	}
+}
diff --git a/tvix/nar-bridge/default.nix b/tvix/nar-bridge/default.nix
index 06c43bb961c1..fa99eba2f3b8 100644
--- a/tvix/nar-bridge/default.nix
+++ b/tvix/nar-bridge/default.nix
@@ -6,5 +6,5 @@ pkgs.buildGoModule {
   name = "nar-bridge";
   src = depot.third_party.gitignoreSource ./.;
 
-  vendorHash = "sha256-sxyoVyHq/FJbNzVa6Xlnv0+/vxLxf0SAY4ZbvRySoFQ=";
+  vendorHash = "sha256-wEd3CBK7r28U77LpWc0UtbMlihkI7dEdy+ZWtJOBTSs=";
 }
diff --git a/tvix/nar-bridge/go.mod b/tvix/nar-bridge/go.mod
index 6a987adaccf5..670be0b02d0c 100644
--- a/tvix/nar-bridge/go.mod
+++ b/tvix/nar-bridge/go.mod
@@ -25,6 +25,7 @@ require (
 	github.com/multiformats/go-varint v0.0.6 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/spaolacci/murmur3 v1.1.0 // indirect
+	github.com/ulikunitz/xz v0.5.11 // indirect
 	golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
 	golang.org/x/net v0.7.0 // indirect
 	golang.org/x/sys v0.5.0 // indirect
diff --git a/tvix/nar-bridge/go.sum b/tvix/nar-bridge/go.sum
index f8b9a9559c53..f148e8318b88 100644
--- a/tvix/nar-bridge/go.sum
+++ b/tvix/nar-bridge/go.sum
@@ -53,6 +53,7 @@ github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2
 github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
 github.com/nix-community/go-nix v0.0.0-20230825195510-c72199eca18e h1:15CPg2PQMyBl+TTEKuonrQqS9uOJyi7JcuU0FpvV088=
 github.com/nix-community/go-nix v0.0.0-20230825195510-c72199eca18e/go.mod h1:y3eASc0gMh26jjoP9Xz+qqMKjTnqJgG1RG8xvKvFR8s=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -68,6 +69,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8=
+github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
diff --git a/tvix/nar-bridge/pkg/pathinfosvc/server.go b/tvix/nar-bridge/pkg/pathinfosvc/server.go
new file mode 100644
index 000000000000..0b573d2131e0
--- /dev/null
+++ b/tvix/nar-bridge/pkg/pathinfosvc/server.go
@@ -0,0 +1,300 @@
+package pathinfosvc
+
+import (
+	"bufio"
+	"bytes"
+	"context"
+	"encoding/base64"
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+
+	castorev1pb "code.tvl.fyi/tvix/castore/protos"
+	"code.tvl.fyi/tvix/nar-bridge/pkg/importer"
+	storev1pb "code.tvl.fyi/tvix/store/protos"
+	mh "github.com/multiformats/go-multihash/core"
+	"github.com/nix-community/go-nix/pkg/narinfo"
+	"github.com/nix-community/go-nix/pkg/nixbase32"
+	"github.com/nix-community/go-nix/pkg/nixpath"
+	"github.com/sirupsen/logrus"
+	"github.com/ulikunitz/xz"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+var _ storev1pb.PathInfoServiceServer = &PathInfoServiceServer{}
+
+// PathInfoServiceServer exposes a Nix HTTP Binary Cache as a storev1pb.PathInfoServiceServer.
+type PathInfoServiceServer struct {
+	storev1pb.UnimplementedPathInfoServiceServer
+	httpEndpoint *url.URL
+	httpClient   *http.Client
+	// TODO: signatures
+
+	directoryServiceClient castorev1pb.DirectoryServiceClient
+	blobServiceClient      castorev1pb.BlobServiceClient
+}
+
+func New(httpEndpoint *url.URL, httpClient *http.Client, directoryServiceClient castorev1pb.DirectoryServiceClient, blobServiceClient castorev1pb.BlobServiceClient) *PathInfoServiceServer {
+	return &PathInfoServiceServer{
+		httpEndpoint:           httpEndpoint,
+		httpClient:             httpClient,
+		directoryServiceClient: directoryServiceClient,
+		blobServiceClient:      blobServiceClient,
+	}
+}
+
+// CalculateNAR implements storev1.PathInfoServiceServer.
+// It returns PermissionDenied, as clients are supposed to calculate NAR hashes themselves.
+func (*PathInfoServiceServer) CalculateNAR(context.Context, *castorev1pb.Node) (*storev1pb.CalculateNARResponse, error) {
+	return nil, status.Error(codes.PermissionDenied, "do it yourself please")
+}
+
+// Get implements storev1.PathInfoServiceServer.
+// It only supports lookup my outhash, translates them to a corresponding GET $outhash.narinfo request,
+// ingests the NAR file, while populating blob and directory service, then returns the PathInfo node.
+// Subsequent requests will traverse the NAR file again, so make sure to compose this with another
+// PathInfoService as caching layer.
+func (p *PathInfoServiceServer) Get(ctx context.Context, getPathInfoRequest *storev1pb.GetPathInfoRequest) (*storev1pb.PathInfo, error) {
+	outputHash := getPathInfoRequest.GetByOutputHash()
+	if outputHash == nil {
+		return nil, status.Error(codes.Unimplemented, "only by output hash supported")
+	}
+
+	// construct NARInfo URL
+	narinfoURL := p.httpEndpoint.JoinPath(fmt.Sprintf("%v.narinfo", nixbase32.EncodeToString(outputHash)))
+
+	log := logrus.WithField("output_hash", base64.StdEncoding.EncodeToString(outputHash))
+
+	// We start right with a GET request, rather than doing a HEAD request.
+	// If a request to the PathInfoService reaches us, an upper layer *wants* it
+	// from us.
+	// Doing a HEAD first wouldn't give us anything, we can still react on the Not
+	// Found situation when doing the GET request.
+	niRq, err := http.NewRequestWithContext(ctx, "GET", narinfoURL.String(), nil)
+	if err != nil {
+		log.WithError(err).Error("unable to construct NARInfo request")
+		return nil, status.Errorf(codes.Internal, "unable to construct NARInfo request")
+	}
+
+	// Do the actual request; this follows redirects.
+	niResp, err := p.httpClient.Do(niRq)
+	if err != nil {
+		log.WithError(err).Error("unable to do NARInfo request")
+		return nil, status.Errorf(codes.Internal, "unable to do NARInfo request")
+	}
+	defer niResp.Body.Close()
+
+	// In the case of a 404, return a NotFound.
+	// We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
+	// when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
+	if niResp.StatusCode == http.StatusNotFound || niResp.StatusCode == http.StatusForbidden {
+		log.Warn("no NARInfo found")
+		return nil, status.Error(codes.NotFound, "no NARInfo found")
+	}
+
+	if niResp.StatusCode < 200 || niResp.StatusCode >= 300 {
+		log.WithField("status_code", niResp.StatusCode).Warn("Got non-success when trying to request NARInfo")
+		return nil, status.Errorf(codes.Internal, "got status code %v trying to request NARInfo", niResp.StatusCode)
+	}
+
+	// parse the NARInfo file.
+	narInfo, err := narinfo.Parse(niResp.Body)
+	if err != nil {
+		log.WithError(err).Warn("Unable to parse NARInfo")
+		return nil, status.Errorf(codes.Internal, "unable to parse NARInfo")
+	}
+
+	// close niResp.Body, we're not gonna read from there anymore.
+	_ = niResp.Body.Close()
+
+	// validate the NARInfo file. This ensures strings we need to parse actually parse,
+	// so we can just plain panic further down.
+	if err := narInfo.Check(); err != nil {
+		log.WithError(err).Warn("unable to validate NARInfo")
+		return nil, status.Errorf(codes.Internal, "unable to validate NARInfo: %s", err)
+	}
+
+	// only allow sha256 here. Is anything else even supported by Nix?
+	if narInfo.NarHash.HashType != mh.SHA2_256 {
+		log.Error("unsupported hash type")
+		return nil, status.Errorf(codes.Internal, "unsuported hash type in NarHash: %s", narInfo.NarHash.SRIString())
+	}
+
+	// TODO: calculate fingerprint, check with trusted pubkeys, decide what to do on mismatch
+
+	log = log.WithField("narinfo_narhash", narInfo.NarHash.SRIString())
+	log = log.WithField("nar_url", narInfo.URL)
+
+	// prepare the GET request for the NAR file.
+	narRq, err := http.NewRequestWithContext(ctx, "GET", p.httpEndpoint.JoinPath(narInfo.URL).String(), nil)
+	if err != nil {
+		log.WithError(err).Error("unable to construct NAR request")
+		return nil, status.Errorf(codes.Internal, "unable to construct NAR request")
+	}
+
+	log.Info("requesting NAR")
+	narResp, err := p.httpClient.Do(narRq)
+	if err != nil {
+		log.WithError(err).Error("error during NAR request")
+		return nil, status.Errorf(codes.Internal, "error during NAR request")
+	}
+	defer narResp.Body.Close()
+
+	// If we can't access the NAR file that the NARInfo is referring to, this is a store inconsistency.
+	// Propagate a more serious Internal error, rather than just a NotFound.
+	if narResp.StatusCode == http.StatusNotFound || narResp.StatusCode == http.StatusForbidden {
+		log.Error("Unable to find NAR")
+		return nil, status.Errorf(codes.Internal, "NAR at URL %s does not exist", narInfo.URL)
+	}
+
+	// wrap narResp.Body with some buffer.
+	// We already defer closing the http body, so it's ok to loose io.Close here.
+	var narBody io.Reader
+	narBody = bufio.NewReaderSize(narResp.Body, 10*1024*1024)
+
+	if narInfo.Compression == "none" {
+		// Nothing to do
+	} else if narInfo.Compression == "xz" {
+		narBody, err = xz.NewReader(narBody)
+		if err != nil {
+			log.WithError(err).Error("failed to open xz")
+			return nil, status.Errorf(codes.Internal, "failed to open xz")
+		}
+	} else {
+		log.WithField("nar_compression", narInfo.Compression).Error("unsupported compression")
+		return nil, fmt.Errorf("unsupported NAR compression: %s", narInfo.Compression)
+	}
+
+	directoriesUploader := importer.NewDirectoriesUploader(ctx, p.directoryServiceClient)
+	defer directoriesUploader.Done() //nolint:errcheck
+
+	blobUploaderCb := importer.GenBlobUploaderCb(ctx, p.blobServiceClient)
+
+	pathInfo, err := importer.Import(
+		ctx,
+		narBody,
+		func(blobReader io.Reader) ([]byte, error) {
+			blobDigest, err := blobUploaderCb(blobReader)
+			if err != nil {
+				return nil, err
+			}
+			log.WithField("blob_digest", base64.StdEncoding.EncodeToString(blobDigest)).Debug("upload blob")
+			return blobDigest, nil
+		},
+		func(directory *castorev1pb.Directory) ([]byte, error) {
+			directoryDigest, err := directoriesUploader.Put(directory)
+			if err != nil {
+				return nil, err
+			}
+			log.WithField("directory_digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("upload directory")
+			return directoryDigest, nil
+		},
+	)
+
+	if err != nil {
+		log.WithError(err).Error("error during NAR import")
+		return nil, status.Error(codes.Internal, "error during NAR import")
+	}
+
+	// Close the directories uploader. This ensures the DirectoryService has
+	// properly persisted all Directory messages sent.
+	directoriesPutResponse, err := directoriesUploader.Done()
+	if err != nil {
+		log.WithError(err).Error("error during directory upload")
+
+		return nil, status.Error(codes.Internal, "error during directory upload")
+	}
+
+	// If we uploaded directories (so directoriesPutResponse doesn't return null),
+	// the RootDigest field in directoriesPutResponse should match the digest
+	// returned in importedPathInfo.
+	// This check ensures the directory service came up with the same root hash as we did.
+	if directoriesPutResponse != nil {
+		rootDigestPathInfo := pathInfo.GetNode().GetDirectory().GetDigest()
+		rootDigestDirectoriesPutResponse := directoriesPutResponse.GetRootDigest()
+
+		log := log.WithFields(logrus.Fields{
+			"root_digest_pathinfo":             rootDigestPathInfo,
+			"root_digest_directories_put_resp": rootDigestDirectoriesPutResponse,
+		})
+		if !bytes.Equal(rootDigestPathInfo, rootDigestDirectoriesPutResponse) {
+			log.Errorf("returned root digest doesn't match what's calculated")
+			return nil, status.Error(codes.Internal, "error in root digest calculation")
+		}
+	}
+
+	// Compare NAR hash in the NARInfo with the one we calculated while reading the NAR
+	// We already checked above that the digest is in sha256.
+	importedNarSha256 := pathInfo.GetNarinfo().GetNarSha256()
+	if !bytes.Equal(narInfo.NarHash.Digest(), importedNarSha256) {
+		log := log.WithField("imported_nar_sha256", base64.StdEncoding.EncodeToString(importedNarSha256))
+		log.Error("imported digest doesn't match NARInfo digest")
+
+		return nil, fmt.Errorf("imported digest doesn't match NARInfo digest")
+	}
+
+	// annotate importedPathInfo with the rest of the metadata from NARINfo.
+
+	// extract the output hashes from narInfo.References into importedPathInfo.References.
+	{
+		// Length of the hash portion of the store path in base32.
+		encodedPathHashSize := nixbase32.EncodedLen(20)
+		for _, referenceStr := range narInfo.References {
+			if len(referenceStr) < encodedPathHashSize {
+				return nil, fmt.Errorf("reference string '%s' is too small", referenceStr)
+			}
+
+			decodedReferenceHash, err := nixbase32.DecodeString(referenceStr[0:encodedPathHashSize])
+			if err != nil {
+				return nil, fmt.Errorf("unable to decode reference string '%s': %w", referenceStr, err)
+
+			}
+			pathInfo.References = append(pathInfo.References, decodedReferenceHash)
+		}
+	}
+	pathInfo.Narinfo.ReferenceNames = narInfo.References
+
+	for _, signature := range narInfo.Signatures {
+		pathInfo.Narinfo.Signatures = append(pathInfo.Narinfo.Signatures, &storev1pb.NARInfo_Signature{
+			Name: signature.Name,
+			Data: signature.Data,
+		})
+	}
+
+	// set the root node name to the basename of the output path in the narInfo.
+	// currently the root node has no name yet.
+	outPath, err := nixpath.FromString(narInfo.StorePath)
+	if err != nil {
+		// unreachable due to nixpath.Check()
+		panic(err)
+	}
+	newName := []byte(nixbase32.EncodeToString(outPath.Digest) + "-" + string(outPath.Name))
+
+	// set the root name in all three cases.
+	if node := pathInfo.Node.GetDirectory(); node != nil {
+		node.Name = newName
+	} else if node := pathInfo.Node.GetFile(); node != nil {
+		node.Name = newName
+	} else if node := pathInfo.Node.GetSymlink(); node != nil {
+		node.Name = newName
+	} else {
+		panic("node may not be nil")
+	}
+
+	return pathInfo, nil
+
+	// TODO: Deriver, System, CA
+}
+
+// List implements storev1.PathInfoServiceServer.
+// It returns a permission denied, because normally you can't get a listing
+func (*PathInfoServiceServer) List(*storev1pb.ListPathInfoRequest, storev1pb.PathInfoService_ListServer) error {
+	return status.Error(codes.Unimplemented, "unimplemented")
+}
+
+// Put implements storev1.PathInfoServiceServer.
+func (*PathInfoServiceServer) Put(context.Context, *storev1pb.PathInfo) (*storev1pb.PathInfo, error) {
+	return nil, status.Error(codes.Unimplemented, "unimplemented")
+}