package main import ( "context" "fmt" "net" "net/http" "net/url" "os" "os/signal" "strings" "time" "github.com/alecthomas/kong" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/reflection" castorev1pb "code.tvl.fyi/tvix/castore/protos" "code.tvl.fyi/tvix/nar-bridge/pkg/pathinfosvc" storev1pb "code.tvl.fyi/tvix/store/protos" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) // `help:"Provide a tvix-store gRPC PathInfoService for a HTTP Nix Binary Cache"` var cli struct { LogLevel string `enum:"trace,debug,info,warn,error,fatal,panic" help:"The log level to log with" default:"info"` ListenAddr string `name:"listen-addr" help:"The address this service listens on" type:"string" default:"[::]:8001"` //nolint:lll BlobServiceAddr string `name:"blob-service-addr" env:"BLOB_SERVICE_ADDR" default:"grpc+http://[::1]:8000"` DirectoryServiceAddr string `name:"directory-service-addr" env:"DIRECTORY_SERVICE_ADDR" default:"grpc+http://[::1]:8000"` HTTPBinaryCacheURL *url.URL `name:"http-binary-cache-url" env:"HTTP_BINARY_CACHE_URL" help:"The URL containing the Nix HTTP Binary cache" default:"https://cache.nixos.org"` } func connectService(ctx context.Context, serviceAddr string) (*grpc.ClientConn, error) { if !strings.HasPrefix(serviceAddr, "grpc+http://") { return nil, fmt.Errorf("invalid serviceAddr: %s", serviceAddr) } addr := strings.TrimPrefix(serviceAddr, "grpc+http://") conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v", err) } return conn, nil } func main() { _ = kong.Parse(&cli) logLevel, err := logrus.ParseLevel(cli.LogLevel) if err != nil { log.Fatal("invalid log level") } logrus.SetLevel(logLevel) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() // connect to the two stores connBlobSvc, err := connectService(ctx, cli.BlobServiceAddr) if err != nil { log.Fatalf("unable to connect to blob service: %v", err) } defer connBlobSvc.Close() connDirectorySvc, err := connectService(ctx, cli.DirectoryServiceAddr) if err != nil { log.Fatalf("unable to connect to directory service: %v", err) } defer connDirectorySvc.Close() // set up pathinfoservice var opts []grpc.ServerOption s := grpc.NewServer(opts...) reflection.Register(s) storev1pb.RegisterPathInfoServiceServer(s, pathinfosvc.New( cli.HTTPBinaryCacheURL, &http.Client{}, castorev1pb.NewDirectoryServiceClient(connDirectorySvc), castorev1pb.NewBlobServiceClient(connBlobSvc), ), ) log.Printf("Starting nar-bridge-pathinfosvc at %v", cli.ListenAddr) lis, err := net.Listen("tcp", cli.ListenAddr) if err != nil { log.Fatalf("failed to listen: %v", err) } go s.Serve(lis) // listen for the interrupt signal. <-ctx.Done() // Restore default behaviour on the interrupt signal stop() log.Info("Received Signal, shutting down, press Ctl+C again to force.") stopped := make(chan interface{}) go func() { s.GracefulStop() close(stopped) }() t := time.NewTimer(30 * time.Second) select { case <-t.C: log.Info("timeout, kicking remaining clients") s.Stop() case <-stopped: log.Info("all clients left during grace period") t.Stop() } }