diff options
Diffstat (limited to 'tvix/nar-bridge')
-rw-r--r-- | tvix/nar-bridge/.gitignore | 1 | ||||
-rw-r--r-- | tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go | 117 | ||||
-rw-r--r-- | tvix/nar-bridge/default.nix | 2 | ||||
-rw-r--r-- | tvix/nar-bridge/go.mod | 1 | ||||
-rw-r--r-- | tvix/nar-bridge/go.sum | 3 | ||||
-rw-r--r-- | tvix/nar-bridge/pkg/pathinfosvc/server.go | 300 |
6 files changed, 423 insertions, 1 deletions
diff --git a/tvix/nar-bridge/.gitignore b/tvix/nar-bridge/.gitignore index 867be7a1273f..d70e1f8120cc 100644 --- a/tvix/nar-bridge/.gitignore +++ b/tvix/nar-bridge/.gitignore @@ -1 +1,2 @@ /nar-bridge-http +/nar-bridge-pathinfo diff --git a/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go b/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go new file mode 100644 index 000000000000..76d0fafa4cd6 --- /dev/null +++ b/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "fmt" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "strings" + "time" + + "github.com/alecthomas/kong" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/reflection" + + castorev1pb "code.tvl.fyi/tvix/castore/protos" + "code.tvl.fyi/tvix/nar-bridge/pkg/pathinfosvc" + storev1pb "code.tvl.fyi/tvix/store/protos" + "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" +) + +// `help:"Provide a tvix-store gRPC PathInfoService for a HTTP Nix Binary Cache"` +var cli struct { + LogLevel string `enum:"trace,debug,info,warn,error,fatal,panic" help:"The log level to log with" default:"info"` + ListenAddr string `name:"listen-addr" help:"The address this service listens on" type:"string" default:"[::]:8001"` //nolint:lll + BlobServiceAddr string `name:"blob-service-addr" env:"BLOB_SERVICE_ADDR" default:"grpc+http://[::1]:8000"` + DirectoryServiceAddr string `name:"directory-service-addr" env:"DIRECTORY_SERVICE_ADDR" default:"grpc+http://[::1]:8000"` + HTTPBinaryCacheURL *url.URL `name:"http-binary-cache-url" env:"HTTP_BINARY_CACHE_URL" help:"The URL containing the Nix HTTP Binary cache" default:"https://cache.nixos.org"` +} + +func connectService(ctx context.Context, serviceAddr string) (*grpc.ClientConn, error) { + if !strings.HasPrefix(serviceAddr, "grpc+http://") { + return nil, fmt.Errorf("invalid serviceAddr: %s", serviceAddr) + } + addr := strings.TrimPrefix(serviceAddr, "grpc+http://") + + conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + return conn, nil +} + +func main() { + _ = kong.Parse(&cli) + + logLevel, err := logrus.ParseLevel(cli.LogLevel) + if err != nil { + log.Fatal("invalid log level") + } + logrus.SetLevel(logLevel) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() + + // connect to the two stores + connBlobSvc, err := connectService(ctx, cli.BlobServiceAddr) + if err != nil { + log.Fatalf("unable to connect to blob service: %v", err) + } + defer connBlobSvc.Close() + + connDirectorySvc, err := connectService(ctx, cli.DirectoryServiceAddr) + if err != nil { + log.Fatalf("unable to connect to directory service: %v", err) + } + defer connDirectorySvc.Close() + + // set up pathinfoservice + var opts []grpc.ServerOption + s := grpc.NewServer(opts...) + reflection.Register(s) + + storev1pb.RegisterPathInfoServiceServer(s, + pathinfosvc.New( + cli.HTTPBinaryCacheURL, + &http.Client{}, + castorev1pb.NewDirectoryServiceClient(connDirectorySvc), + castorev1pb.NewBlobServiceClient(connBlobSvc), + ), + ) + + log.Printf("Starting nar-bridge-pathinfosvc at %v", cli.ListenAddr) + lis, err := net.Listen("tcp", cli.ListenAddr) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + go s.Serve(lis) + + // listen for the interrupt signal. + <-ctx.Done() + + // Restore default behaviour on the interrupt signal + stop() + log.Info("Received Signal, shutting down, press Ctl+C again to force.") + + stopped := make(chan interface{}) + go func() { + s.GracefulStop() + close(stopped) + }() + + t := time.NewTimer(30 * time.Second) + select { + case <-t.C: + log.Info("timeout, kicking remaining clients") + s.Stop() + case <-stopped: + log.Info("all clients left during grace period") + t.Stop() + } +} diff --git a/tvix/nar-bridge/default.nix b/tvix/nar-bridge/default.nix index 06c43bb961c1..fa99eba2f3b8 100644 --- a/tvix/nar-bridge/default.nix +++ b/tvix/nar-bridge/default.nix @@ -6,5 +6,5 @@ pkgs.buildGoModule { name = "nar-bridge"; src = depot.third_party.gitignoreSource ./.; - vendorHash = "sha256-sxyoVyHq/FJbNzVa6Xlnv0+/vxLxf0SAY4ZbvRySoFQ="; + vendorHash = "sha256-wEd3CBK7r28U77LpWc0UtbMlihkI7dEdy+ZWtJOBTSs="; } diff --git a/tvix/nar-bridge/go.mod b/tvix/nar-bridge/go.mod index 6a987adaccf5..670be0b02d0c 100644 --- a/tvix/nar-bridge/go.mod +++ b/tvix/nar-bridge/go.mod @@ -25,6 +25,7 @@ require ( github.com/multiformats/go-varint v0.0.6 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/ulikunitz/xz v0.5.11 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect diff --git a/tvix/nar-bridge/go.sum b/tvix/nar-bridge/go.sum index f8b9a9559c53..f148e8318b88 100644 --- a/tvix/nar-bridge/go.sum +++ b/tvix/nar-bridge/go.sum @@ -53,6 +53,7 @@ github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2 github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/nix-community/go-nix v0.0.0-20230825195510-c72199eca18e h1:15CPg2PQMyBl+TTEKuonrQqS9uOJyi7JcuU0FpvV088= github.com/nix-community/go-nix v0.0.0-20230825195510-c72199eca18e/go.mod h1:y3eASc0gMh26jjoP9Xz+qqMKjTnqJgG1RG8xvKvFR8s= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -68,6 +69,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8= +github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= diff --git a/tvix/nar-bridge/pkg/pathinfosvc/server.go b/tvix/nar-bridge/pkg/pathinfosvc/server.go new file mode 100644 index 000000000000..0b573d2131e0 --- /dev/null +++ b/tvix/nar-bridge/pkg/pathinfosvc/server.go @@ -0,0 +1,300 @@ +package pathinfosvc + +import ( + "bufio" + "bytes" + "context" + "encoding/base64" + "fmt" + "io" + "net/http" + "net/url" + + castorev1pb "code.tvl.fyi/tvix/castore/protos" + "code.tvl.fyi/tvix/nar-bridge/pkg/importer" + storev1pb "code.tvl.fyi/tvix/store/protos" + mh "github.com/multiformats/go-multihash/core" + "github.com/nix-community/go-nix/pkg/narinfo" + "github.com/nix-community/go-nix/pkg/nixbase32" + "github.com/nix-community/go-nix/pkg/nixpath" + "github.com/sirupsen/logrus" + "github.com/ulikunitz/xz" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var _ storev1pb.PathInfoServiceServer = &PathInfoServiceServer{} + +// PathInfoServiceServer exposes a Nix HTTP Binary Cache as a storev1pb.PathInfoServiceServer. +type PathInfoServiceServer struct { + storev1pb.UnimplementedPathInfoServiceServer + httpEndpoint *url.URL + httpClient *http.Client + // TODO: signatures + + directoryServiceClient castorev1pb.DirectoryServiceClient + blobServiceClient castorev1pb.BlobServiceClient +} + +func New(httpEndpoint *url.URL, httpClient *http.Client, directoryServiceClient castorev1pb.DirectoryServiceClient, blobServiceClient castorev1pb.BlobServiceClient) *PathInfoServiceServer { + return &PathInfoServiceServer{ + httpEndpoint: httpEndpoint, + httpClient: httpClient, + directoryServiceClient: directoryServiceClient, + blobServiceClient: blobServiceClient, + } +} + +// CalculateNAR implements storev1.PathInfoServiceServer. +// It returns PermissionDenied, as clients are supposed to calculate NAR hashes themselves. +func (*PathInfoServiceServer) CalculateNAR(context.Context, *castorev1pb.Node) (*storev1pb.CalculateNARResponse, error) { + return nil, status.Error(codes.PermissionDenied, "do it yourself please") +} + +// Get implements storev1.PathInfoServiceServer. +// It only supports lookup my outhash, translates them to a corresponding GET $outhash.narinfo request, +// ingests the NAR file, while populating blob and directory service, then returns the PathInfo node. +// Subsequent requests will traverse the NAR file again, so make sure to compose this with another +// PathInfoService as caching layer. +func (p *PathInfoServiceServer) Get(ctx context.Context, getPathInfoRequest *storev1pb.GetPathInfoRequest) (*storev1pb.PathInfo, error) { + outputHash := getPathInfoRequest.GetByOutputHash() + if outputHash == nil { + return nil, status.Error(codes.Unimplemented, "only by output hash supported") + } + + // construct NARInfo URL + narinfoURL := p.httpEndpoint.JoinPath(fmt.Sprintf("%v.narinfo", nixbase32.EncodeToString(outputHash))) + + log := logrus.WithField("output_hash", base64.StdEncoding.EncodeToString(outputHash)) + + // We start right with a GET request, rather than doing a HEAD request. + // If a request to the PathInfoService reaches us, an upper layer *wants* it + // from us. + // Doing a HEAD first wouldn't give us anything, we can still react on the Not + // Found situation when doing the GET request. + niRq, err := http.NewRequestWithContext(ctx, "GET", narinfoURL.String(), nil) + if err != nil { + log.WithError(err).Error("unable to construct NARInfo request") + return nil, status.Errorf(codes.Internal, "unable to construct NARInfo request") + } + + // Do the actual request; this follows redirects. + niResp, err := p.httpClient.Do(niRq) + if err != nil { + log.WithError(err).Error("unable to do NARInfo request") + return nil, status.Errorf(codes.Internal, "unable to do NARInfo request") + } + defer niResp.Body.Close() + + // In the case of a 404, return a NotFound. + // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix, + // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org. + if niResp.StatusCode == http.StatusNotFound || niResp.StatusCode == http.StatusForbidden { + log.Warn("no NARInfo found") + return nil, status.Error(codes.NotFound, "no NARInfo found") + } + + if niResp.StatusCode < 200 || niResp.StatusCode >= 300 { + log.WithField("status_code", niResp.StatusCode).Warn("Got non-success when trying to request NARInfo") + return nil, status.Errorf(codes.Internal, "got status code %v trying to request NARInfo", niResp.StatusCode) + } + + // parse the NARInfo file. + narInfo, err := narinfo.Parse(niResp.Body) + if err != nil { + log.WithError(err).Warn("Unable to parse NARInfo") + return nil, status.Errorf(codes.Internal, "unable to parse NARInfo") + } + + // close niResp.Body, we're not gonna read from there anymore. + _ = niResp.Body.Close() + + // validate the NARInfo file. This ensures strings we need to parse actually parse, + // so we can just plain panic further down. + if err := narInfo.Check(); err != nil { + log.WithError(err).Warn("unable to validate NARInfo") + return nil, status.Errorf(codes.Internal, "unable to validate NARInfo: %s", err) + } + + // only allow sha256 here. Is anything else even supported by Nix? + if narInfo.NarHash.HashType != mh.SHA2_256 { + log.Error("unsupported hash type") + return nil, status.Errorf(codes.Internal, "unsuported hash type in NarHash: %s", narInfo.NarHash.SRIString()) + } + + // TODO: calculate fingerprint, check with trusted pubkeys, decide what to do on mismatch + + log = log.WithField("narinfo_narhash", narInfo.NarHash.SRIString()) + log = log.WithField("nar_url", narInfo.URL) + + // prepare the GET request for the NAR file. + narRq, err := http.NewRequestWithContext(ctx, "GET", p.httpEndpoint.JoinPath(narInfo.URL).String(), nil) + if err != nil { + log.WithError(err).Error("unable to construct NAR request") + return nil, status.Errorf(codes.Internal, "unable to construct NAR request") + } + + log.Info("requesting NAR") + narResp, err := p.httpClient.Do(narRq) + if err != nil { + log.WithError(err).Error("error during NAR request") + return nil, status.Errorf(codes.Internal, "error during NAR request") + } + defer narResp.Body.Close() + + // If we can't access the NAR file that the NARInfo is referring to, this is a store inconsistency. + // Propagate a more serious Internal error, rather than just a NotFound. + if narResp.StatusCode == http.StatusNotFound || narResp.StatusCode == http.StatusForbidden { + log.Error("Unable to find NAR") + return nil, status.Errorf(codes.Internal, "NAR at URL %s does not exist", narInfo.URL) + } + + // wrap narResp.Body with some buffer. + // We already defer closing the http body, so it's ok to loose io.Close here. + var narBody io.Reader + narBody = bufio.NewReaderSize(narResp.Body, 10*1024*1024) + + if narInfo.Compression == "none" { + // Nothing to do + } else if narInfo.Compression == "xz" { + narBody, err = xz.NewReader(narBody) + if err != nil { + log.WithError(err).Error("failed to open xz") + return nil, status.Errorf(codes.Internal, "failed to open xz") + } + } else { + log.WithField("nar_compression", narInfo.Compression).Error("unsupported compression") + return nil, fmt.Errorf("unsupported NAR compression: %s", narInfo.Compression) + } + + directoriesUploader := importer.NewDirectoriesUploader(ctx, p.directoryServiceClient) + defer directoriesUploader.Done() //nolint:errcheck + + blobUploaderCb := importer.GenBlobUploaderCb(ctx, p.blobServiceClient) + + pathInfo, err := importer.Import( + ctx, + narBody, + func(blobReader io.Reader) ([]byte, error) { + blobDigest, err := blobUploaderCb(blobReader) + if err != nil { + return nil, err + } + log.WithField("blob_digest", base64.StdEncoding.EncodeToString(blobDigest)).Debug("upload blob") + return blobDigest, nil + }, + func(directory *castorev1pb.Directory) ([]byte, error) { + directoryDigest, err := directoriesUploader.Put(directory) + if err != nil { + return nil, err + } + log.WithField("directory_digest", base64.StdEncoding.EncodeToString(directoryDigest)).Debug("upload directory") + return directoryDigest, nil + }, + ) + + if err != nil { + log.WithError(err).Error("error during NAR import") + return nil, status.Error(codes.Internal, "error during NAR import") + } + + // Close the directories uploader. This ensures the DirectoryService has + // properly persisted all Directory messages sent. + directoriesPutResponse, err := directoriesUploader.Done() + if err != nil { + log.WithError(err).Error("error during directory upload") + + return nil, status.Error(codes.Internal, "error during directory upload") + } + + // If we uploaded directories (so directoriesPutResponse doesn't return null), + // the RootDigest field in directoriesPutResponse should match the digest + // returned in importedPathInfo. + // This check ensures the directory service came up with the same root hash as we did. + if directoriesPutResponse != nil { + rootDigestPathInfo := pathInfo.GetNode().GetDirectory().GetDigest() + rootDigestDirectoriesPutResponse := directoriesPutResponse.GetRootDigest() + + log := log.WithFields(logrus.Fields{ + "root_digest_pathinfo": rootDigestPathInfo, + "root_digest_directories_put_resp": rootDigestDirectoriesPutResponse, + }) + if !bytes.Equal(rootDigestPathInfo, rootDigestDirectoriesPutResponse) { + log.Errorf("returned root digest doesn't match what's calculated") + return nil, status.Error(codes.Internal, "error in root digest calculation") + } + } + + // Compare NAR hash in the NARInfo with the one we calculated while reading the NAR + // We already checked above that the digest is in sha256. + importedNarSha256 := pathInfo.GetNarinfo().GetNarSha256() + if !bytes.Equal(narInfo.NarHash.Digest(), importedNarSha256) { + log := log.WithField("imported_nar_sha256", base64.StdEncoding.EncodeToString(importedNarSha256)) + log.Error("imported digest doesn't match NARInfo digest") + + return nil, fmt.Errorf("imported digest doesn't match NARInfo digest") + } + + // annotate importedPathInfo with the rest of the metadata from NARINfo. + + // extract the output hashes from narInfo.References into importedPathInfo.References. + { + // Length of the hash portion of the store path in base32. + encodedPathHashSize := nixbase32.EncodedLen(20) + for _, referenceStr := range narInfo.References { + if len(referenceStr) < encodedPathHashSize { + return nil, fmt.Errorf("reference string '%s' is too small", referenceStr) + } + + decodedReferenceHash, err := nixbase32.DecodeString(referenceStr[0:encodedPathHashSize]) + if err != nil { + return nil, fmt.Errorf("unable to decode reference string '%s': %w", referenceStr, err) + + } + pathInfo.References = append(pathInfo.References, decodedReferenceHash) + } + } + pathInfo.Narinfo.ReferenceNames = narInfo.References + + for _, signature := range narInfo.Signatures { + pathInfo.Narinfo.Signatures = append(pathInfo.Narinfo.Signatures, &storev1pb.NARInfo_Signature{ + Name: signature.Name, + Data: signature.Data, + }) + } + + // set the root node name to the basename of the output path in the narInfo. + // currently the root node has no name yet. + outPath, err := nixpath.FromString(narInfo.StorePath) + if err != nil { + // unreachable due to nixpath.Check() + panic(err) + } + newName := []byte(nixbase32.EncodeToString(outPath.Digest) + "-" + string(outPath.Name)) + + // set the root name in all three cases. + if node := pathInfo.Node.GetDirectory(); node != nil { + node.Name = newName + } else if node := pathInfo.Node.GetFile(); node != nil { + node.Name = newName + } else if node := pathInfo.Node.GetSymlink(); node != nil { + node.Name = newName + } else { + panic("node may not be nil") + } + + return pathInfo, nil + + // TODO: Deriver, System, CA +} + +// List implements storev1.PathInfoServiceServer. +// It returns a permission denied, because normally you can't get a listing +func (*PathInfoServiceServer) List(*storev1pb.ListPathInfoRequest, storev1pb.PathInfoService_ListServer) error { + return status.Error(codes.Unimplemented, "unimplemented") +} + +// Put implements storev1.PathInfoServiceServer. +func (*PathInfoServiceServer) Put(context.Context, *storev1pb.PathInfo) (*storev1pb.PathInfo, error) { + return nil, status.Error(codes.Unimplemented, "unimplemented") +} |