From c04041a0017359811d91406e9484ed65b76ba33b Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 3 Oct 2023 19:08:34 +0300 Subject: feat(tvix/nar-bridge): add nar-bridge-pathinfo command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds an additional nar-bridge-pathinfo command. It exposes a PathInfoService for a HTTP Binary Cache, ingesting data into a BlobService/DirectoryService as it goes through the NAR file. It does this whenever it receives a Get request for a specific output path, and waits returning with the PathInfo response until it ingested the data. It does not do any sort of caching - this means it re-downloads NAR files again whenever the PathInfo is requested again, so you most likely do not want to use this currently. It's one building component as soon as we have store composition (which we currently don't, so don't use this). It can be used as an alternative mechanism to ingest data (Blobs and Directories) of a given store path from a binary cache into tvix-store. ``` ❯ nix-build -A third_party.nixpkgs.hello /nix/store/mdi7lvrn2mx7rfzv3fdq3v5yw8swiks6-hello-2.12.1 ❯ nix hash to-sri --type sha1 mdi7lvrn2mx7rfzv3fdq3v5yw8swiks6 sha1-Rs/INeK+7IGbG/u7fHoVNm96Yqs= ❯ out=$(mg build //tvix/nar-bridge) $out/bin/nar-bridge-pathinfo --log-level debug & INFO[0000] Starting nar-bridge-pathinfosvc at [::]:8001 ❯ mg run //tvix:store -- daemon & [mg] building target //tvix:store [mg] running target //tvix:store 2023-10-03T16:21:57.433739Z INFO tvix_store: tvix-store listening on [::]:8000 at src/bin/tvix-store.rs:229 ❯ evans --host localhost --port 8001 -r repl […] tvix.store.v1.PathInfoService@localhost:8001> call Get ✔ by_output_hash by_output_hash (TYPE_BYTES) => Rs/INeK+7IGbG/u7fHoVNm96Yqs= { "narinfo": { "narSha256": "sXrPtjqhSoc2u0YfM1HVZThknkSYuRuHdtKCB6wkDFo=", "narSize": "226552", "referenceNames": [ "aw2fw9ag10wr9pf0qk4nk5sxi0q0bn56-glibc-2.37-8", "mdi7lvrn2mx7rfzv3fdq3v5yw8swiks6-hello-2.12.1" ], "signatures": [ { "data": "7guDbfaF2Q29HY0c5axhtuacfxN6uxuEqeUfncDiSvMSAWvfHVMppB89ILqV8FE58pEQ04tSbMnRhR3FGPV0AA==", "name": "cache.nixos.org-1" } ] }, "node": { "directory": { "digest": "xvo6BYbYaDw76IibLu5sr+VZoj9iM0ET2RUuYSYLwKE=", "name": "bWRpN2x2cm4ybXg3cmZ6djNmZHEzdjV5dzhzd2lrczYtaGVsbG8tMi4xMi4x", "size": 141 } }, "references": [ "ptgFMIhdl2nJxMDdlDkITyXuBFc=", "Rs/INeK+7IGbG/u7fHoVNm96Yqs=" ] } ``` Change-Id: I50167d0ac081c91adf5cf2733bbc4dc0993bd46e Reviewed-on: https://cl.tvl.fyi/c/depot/+/9539 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: Connor Brewster Reviewed-by: Brian Olsen --- tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go | 117 ++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go (limited to 'tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go') diff --git a/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go b/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go new file mode 100644 index 0000000000..76d0fafa4c --- /dev/null +++ b/tvix/nar-bridge/cmd/nar-bridge-pathinfo/main.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "fmt" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "strings" + "time" + + "github.com/alecthomas/kong" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/reflection" + + castorev1pb "code.tvl.fyi/tvix/castore/protos" + "code.tvl.fyi/tvix/nar-bridge/pkg/pathinfosvc" + storev1pb "code.tvl.fyi/tvix/store/protos" + "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" +) + +// `help:"Provide a tvix-store gRPC PathInfoService for a HTTP Nix Binary Cache"` +var cli struct { + LogLevel string `enum:"trace,debug,info,warn,error,fatal,panic" help:"The log level to log with" default:"info"` + ListenAddr string `name:"listen-addr" help:"The address this service listens on" type:"string" default:"[::]:8001"` //nolint:lll + BlobServiceAddr string `name:"blob-service-addr" env:"BLOB_SERVICE_ADDR" default:"grpc+http://[::1]:8000"` + DirectoryServiceAddr string `name:"directory-service-addr" env:"DIRECTORY_SERVICE_ADDR" default:"grpc+http://[::1]:8000"` + HTTPBinaryCacheURL *url.URL `name:"http-binary-cache-url" env:"HTTP_BINARY_CACHE_URL" help:"The URL containing the Nix HTTP Binary cache" default:"https://cache.nixos.org"` +} + +func connectService(ctx context.Context, serviceAddr string) (*grpc.ClientConn, error) { + if !strings.HasPrefix(serviceAddr, "grpc+http://") { + return nil, fmt.Errorf("invalid serviceAddr: %s", serviceAddr) + } + addr := strings.TrimPrefix(serviceAddr, "grpc+http://") + + conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + return conn, nil +} + +func main() { + _ = kong.Parse(&cli) + + logLevel, err := logrus.ParseLevel(cli.LogLevel) + if err != nil { + log.Fatal("invalid log level") + } + logrus.SetLevel(logLevel) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() + + // connect to the two stores + connBlobSvc, err := connectService(ctx, cli.BlobServiceAddr) + if err != nil { + log.Fatalf("unable to connect to blob service: %v", err) + } + defer connBlobSvc.Close() + + connDirectorySvc, err := connectService(ctx, cli.DirectoryServiceAddr) + if err != nil { + log.Fatalf("unable to connect to directory service: %v", err) + } + defer connDirectorySvc.Close() + + // set up pathinfoservice + var opts []grpc.ServerOption + s := grpc.NewServer(opts...) + reflection.Register(s) + + storev1pb.RegisterPathInfoServiceServer(s, + pathinfosvc.New( + cli.HTTPBinaryCacheURL, + &http.Client{}, + castorev1pb.NewDirectoryServiceClient(connDirectorySvc), + castorev1pb.NewBlobServiceClient(connBlobSvc), + ), + ) + + log.Printf("Starting nar-bridge-pathinfosvc at %v", cli.ListenAddr) + lis, err := net.Listen("tcp", cli.ListenAddr) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + go s.Serve(lis) + + // listen for the interrupt signal. + <-ctx.Done() + + // Restore default behaviour on the interrupt signal + stop() + log.Info("Received Signal, shutting down, press Ctl+C again to force.") + + stopped := make(chan interface{}) + go func() { + s.GracefulStop() + close(stopped) + }() + + t := time.NewTimer(30 * time.Second) + select { + case <-t.C: + log.Info("timeout, kicking remaining clients") + s.Stop() + case <-stopped: + log.Info("all clients left during grace period") + t.Stop() + } +} -- cgit 1.4.1