package main
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"time"
"github.com/alecthomas/kong"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
castorev1pb "code.tvl.fyi/tvix/castore/protos"
"code.tvl.fyi/tvix/nar-bridge/pkg/pathinfosvc"
storev1pb "code.tvl.fyi/tvix/store/protos"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// `help:"Provide a tvix-store gRPC PathInfoService for a HTTP Nix Binary Cache"`
var cli struct {
LogLevel string `enum:"trace,debug,info,warn,error,fatal,panic" help:"The log level to log with" default:"info"`
ListenAddr string `name:"listen-addr" help:"The address this service listens on" type:"string" default:"[::]:8001"` //nolint:lll
BlobServiceAddr string `name:"blob-service-addr" env:"BLOB_SERVICE_ADDR" default:"grpc+http://[::1]:8000"`
DirectoryServiceAddr string `name:"directory-service-addr" env:"DIRECTORY_SERVICE_ADDR" default:"grpc+http://[::1]:8000"`
HTTPBinaryCacheURL *url.URL `name:"http-binary-cache-url" env:"HTTP_BINARY_CACHE_URL" help:"The URL containing the Nix HTTP Binary cache" default:"https://cache.nixos.org"`
}
func connectService(ctx context.Context, serviceAddr string) (*grpc.ClientConn, error) {
if !strings.HasPrefix(serviceAddr, "grpc+http://") {
return nil, fmt.Errorf("invalid serviceAddr: %s", serviceAddr)
}
addr := strings.TrimPrefix(serviceAddr, "grpc+http://")
conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
return conn, nil
}
func main() {
_ = kong.Parse(&cli)
logLevel, err := logrus.ParseLevel(cli.LogLevel)
if err != nil {
log.Fatal("invalid log level")
}
logrus.SetLevel(logLevel)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
// connect to the two stores
connBlobSvc, err := connectService(ctx, cli.BlobServiceAddr)
if err != nil {
log.Fatalf("unable to connect to blob service: %v", err)
}
defer connBlobSvc.Close()
connDirectorySvc, err := connectService(ctx, cli.DirectoryServiceAddr)
if err != nil {
log.Fatalf("unable to connect to directory service: %v", err)
}
defer connDirectorySvc.Close()
// set up pathinfoservice
var opts []grpc.ServerOption
s := grpc.NewServer(opts...)
reflection.Register(s)
storev1pb.RegisterPathInfoServiceServer(s,
pathinfosvc.New(
cli.HTTPBinaryCacheURL,
&http.Client{},
castorev1pb.NewDirectoryServiceClient(connDirectorySvc),
castorev1pb.NewBlobServiceClient(connBlobSvc),
),
)
log.Printf("Starting nar-bridge-pathinfosvc at %v", cli.ListenAddr)
lis, err := net.Listen("tcp", cli.ListenAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
go s.Serve(lis)
// listen for the interrupt signal.
<-ctx.Done()
// Restore default behaviour on the interrupt signal
stop()
log.Info("Received Signal, shutting down, press Ctl+C again to force.")
stopped := make(chan interface{})
go func() {
s.GracefulStop()
close(stopped)
}()
t := time.NewTimer(30 * time.Second)
select {
case <-t.C:
log.Info("timeout, kicking remaining clients")
s.Stop()
case <-stopped:
log.Info("all clients left during grace period")
t.Stop()
}
}