diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-03T14·09+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-09-05T20·46+0000 |
commit | da9d706e0a5e4e37087e4841a8fc8edf0da35e77 (patch) | |
tree | d6de951e1458bc0a56aff5a1feb463ee6eeb9592 /tvix/store | |
parent | e41b5ae3f0cd0814367f1e2f4f256669c849fde7 (diff) |
feat(tvix/store/pathinfosvc): provide listing r/6555
This provides an additional method in the PathInfoService trait, as well as an RPC method on the gRPC layer to list all PathInfo objects in a PathInfoService. Change-Id: I7378f6bbd334bd6ac4e9be92505bd099a1c2b19a Reviewed-on: https://cl.tvl.fyi/c/depot/+/9216 Reviewed-by: tazjin <tazjin@tvl.su> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/protos/rpc_pathinfo.pb.go | 133 | ||||
-rw-r--r-- | tvix/store/protos/rpc_pathinfo.proto | 8 | ||||
-rw-r--r-- | tvix/store/protos/rpc_pathinfo_grpc.pb.go | 71 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 64 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 12 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 25 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 30 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 4 |
9 files changed, 310 insertions, 41 deletions
diff --git a/tvix/store/protos/rpc_pathinfo.pb.go b/tvix/store/protos/rpc_pathinfo.pb.go index bc4da2a127f9..293cb5a7c321 100644 --- a/tvix/store/protos/rpc_pathinfo.pb.go +++ b/tvix/store/protos/rpc_pathinfo.pb.go @@ -96,6 +96,46 @@ type GetPathInfoRequest_ByOutputHash struct { func (*GetPathInfoRequest_ByOutputHash) isGetPathInfoRequest_ByWhat() {} +// The parameters that can be used to lookup (multiple) PathInfo objects. +// Currently no filtering is possible, all objects are returned. +type ListPathInfoRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ListPathInfoRequest) Reset() { + *x = ListPathInfoRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListPathInfoRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListPathInfoRequest) ProtoMessage() {} + +func (x *ListPathInfoRequest) ProtoReflect() protoreflect.Message { + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListPathInfoRequest.ProtoReflect.Descriptor instead. +func (*ListPathInfoRequest) Descriptor() ([]byte, []int) { + return file_tvix_store_protos_rpc_pathinfo_proto_rawDescGZIP(), []int{1} +} + // CalculateNARResponse is the response returned by the CalculateNAR request. // // It contains the size of the NAR representation (in bytes), and the sha56 @@ -114,7 +154,7 @@ type CalculateNARResponse struct { func (x *CalculateNARResponse) Reset() { *x = CalculateNARResponse{} if protoimpl.UnsafeEnabled { - mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[1] + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -127,7 +167,7 @@ func (x *CalculateNARResponse) String() string { func (*CalculateNARResponse) ProtoMessage() {} func (x *CalculateNARResponse) ProtoReflect() protoreflect.Message { - mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[1] + mi := &file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -140,7 +180,7 @@ func (x *CalculateNARResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CalculateNARResponse.ProtoReflect.Descriptor instead. func (*CalculateNARResponse) Descriptor() ([]byte, []int) { - return file_tvix_store_protos_rpc_pathinfo_proto_rawDescGZIP(), []int{1} + return file_tvix_store_protos_rpc_pathinfo_proto_rawDescGZIP(), []int{2} } func (x *CalculateNARResponse) GetNarSize() uint64 { @@ -170,28 +210,34 @@ var file_tvix_store_protos_rpc_pathinfo_proto_rawDesc = []byte{ 0x0e, 0x62, 0x79, 0x5f, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x0c, 0x62, 0x79, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x48, 0x61, 0x73, 0x68, 0x42, 0x09, 0x0a, 0x07, 0x62, 0x79, 0x5f, 0x77, 0x68, 0x61, 0x74, - 0x22, 0x50, 0x0a, 0x14, 0x43, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x4e, 0x41, 0x52, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x6e, 0x61, 0x72, 0x5f, - 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x6e, 0x61, 0x72, 0x53, - 0x69, 0x7a, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x61, 0x72, 0x5f, 0x73, 0x68, 0x61, 0x32, 0x35, - 0x36, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6e, 0x61, 0x72, 0x53, 0x68, 0x61, 0x32, - 0x35, 0x36, 0x32, 0xd7, 0x01, 0x0a, 0x0f, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x53, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x41, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x21, 0x2e, - 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, - 0x74, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x22, 0x15, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x50, 0x0a, 0x14, 0x43, 0x61, 0x6c, 0x63, 0x75, + 0x6c, 0x61, 0x74, 0x65, 0x4e, 0x41, 0x52, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x19, 0x0a, 0x08, 0x6e, 0x61, 0x72, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x07, 0x6e, 0x61, 0x72, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x61, + 0x72, 0x5f, 0x73, 0x68, 0x61, 0x32, 0x35, 0x36, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, + 0x6e, 0x61, 0x72, 0x53, 0x68, 0x61, 0x32, 0x35, 0x36, 0x32, 0x9e, 0x02, 0x0a, 0x0f, 0x50, 0x61, + 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x41, 0x0a, + 0x03, 0x47, 0x65, 0x74, 0x12, 0x21, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, + 0x12, 0x37, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x17, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, + 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x17, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x37, 0x0a, 0x03, 0x50, 0x75, 0x74, - 0x12, 0x17, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x17, 0x2e, 0x74, 0x76, 0x69, 0x78, - 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, - 0x66, 0x6f, 0x12, 0x48, 0x0a, 0x0c, 0x43, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x4e, - 0x41, 0x52, 0x12, 0x13, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, - 0x76, 0x31, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x1a, 0x23, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, - 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, - 0x65, 0x4e, 0x41, 0x52, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x28, 0x5a, 0x26, - 0x63, 0x6f, 0x64, 0x65, 0x2e, 0x74, 0x76, 0x6c, 0x2e, 0x66, 0x79, 0x69, 0x2f, 0x74, 0x76, 0x69, - 0x78, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x3b, 0x73, - 0x74, 0x6f, 0x72, 0x65, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2e, 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x48, 0x0a, 0x0c, 0x43, 0x61, 0x6c, + 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x4e, 0x41, 0x52, 0x12, 0x13, 0x2e, 0x74, 0x76, 0x69, 0x78, + 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x1a, 0x23, + 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, + 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x4e, 0x41, 0x52, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x22, 0x2e, 0x74, 0x76, + 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, + 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x17, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x50, 0x61, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x30, 0x01, 0x42, 0x28, 0x5a, 0x26, 0x63, 0x6f, + 0x64, 0x65, 0x2e, 0x74, 0x76, 0x6c, 0x2e, 0x66, 0x79, 0x69, 0x2f, 0x74, 0x76, 0x69, 0x78, 0x2f, + 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x3b, 0x73, 0x74, 0x6f, + 0x72, 0x65, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -206,22 +252,25 @@ func file_tvix_store_protos_rpc_pathinfo_proto_rawDescGZIP() []byte { return file_tvix_store_protos_rpc_pathinfo_proto_rawDescData } -var file_tvix_store_protos_rpc_pathinfo_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_tvix_store_protos_rpc_pathinfo_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_tvix_store_protos_rpc_pathinfo_proto_goTypes = []interface{}{ (*GetPathInfoRequest)(nil), // 0: tvix.store.v1.GetPathInfoRequest - (*CalculateNARResponse)(nil), // 1: tvix.store.v1.CalculateNARResponse - (*PathInfo)(nil), // 2: tvix.store.v1.PathInfo - (*Node)(nil), // 3: tvix.store.v1.Node + (*ListPathInfoRequest)(nil), // 1: tvix.store.v1.ListPathInfoRequest + (*CalculateNARResponse)(nil), // 2: tvix.store.v1.CalculateNARResponse + (*PathInfo)(nil), // 3: tvix.store.v1.PathInfo + (*Node)(nil), // 4: tvix.store.v1.Node } var file_tvix_store_protos_rpc_pathinfo_proto_depIdxs = []int32{ 0, // 0: tvix.store.v1.PathInfoService.Get:input_type -> tvix.store.v1.GetPathInfoRequest - 2, // 1: tvix.store.v1.PathInfoService.Put:input_type -> tvix.store.v1.PathInfo - 3, // 2: tvix.store.v1.PathInfoService.CalculateNAR:input_type -> tvix.store.v1.Node - 2, // 3: tvix.store.v1.PathInfoService.Get:output_type -> tvix.store.v1.PathInfo - 2, // 4: tvix.store.v1.PathInfoService.Put:output_type -> tvix.store.v1.PathInfo - 1, // 5: tvix.store.v1.PathInfoService.CalculateNAR:output_type -> tvix.store.v1.CalculateNARResponse - 3, // [3:6] is the sub-list for method output_type - 0, // [0:3] is the sub-list for method input_type + 3, // 1: tvix.store.v1.PathInfoService.Put:input_type -> tvix.store.v1.PathInfo + 4, // 2: tvix.store.v1.PathInfoService.CalculateNAR:input_type -> tvix.store.v1.Node + 1, // 3: tvix.store.v1.PathInfoService.List:input_type -> tvix.store.v1.ListPathInfoRequest + 3, // 4: tvix.store.v1.PathInfoService.Get:output_type -> tvix.store.v1.PathInfo + 3, // 5: tvix.store.v1.PathInfoService.Put:output_type -> tvix.store.v1.PathInfo + 2, // 6: tvix.store.v1.PathInfoService.CalculateNAR:output_type -> tvix.store.v1.CalculateNARResponse + 3, // 7: tvix.store.v1.PathInfoService.List:output_type -> tvix.store.v1.PathInfo + 4, // [4:8] is the sub-list for method output_type + 0, // [0:4] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -247,6 +296,18 @@ func file_tvix_store_protos_rpc_pathinfo_proto_init() { } } file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListPathInfoRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_tvix_store_protos_rpc_pathinfo_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CalculateNARResponse); i { case 0: return &v.state @@ -268,7 +329,7 @@ func file_tvix_store_protos_rpc_pathinfo_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_tvix_store_protos_rpc_pathinfo_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/tvix/store/protos/rpc_pathinfo.proto b/tvix/store/protos/rpc_pathinfo.proto index fecf682d168d..e1d6cd774144 100644 --- a/tvix/store/protos/rpc_pathinfo.proto +++ b/tvix/store/protos/rpc_pathinfo.proto @@ -41,6 +41,10 @@ service PathInfoService { // It can also be used to calculate arbitrary NAR hashes of output paths, // in case a legacy Nix Binary Cache frontend is provided. rpc CalculateNAR(Node) returns (CalculateNARResponse); + + // Return a stream of PathInfo messages matching the criteria specified in + // ListPathInfoRequest. + rpc List(ListPathInfoRequest) returns (stream PathInfo); } // The parameters that can be used to lookup a (single) PathInfo object. @@ -55,6 +59,10 @@ message GetPathInfoRequest { }; } +// The parameters that can be used to lookup (multiple) PathInfo objects. +// Currently no filtering is possible, all objects are returned. +message ListPathInfoRequest { } + // CalculateNARResponse is the response returned by the CalculateNAR request. // // It contains the size of the NAR representation (in bytes), and the sha56 diff --git a/tvix/store/protos/rpc_pathinfo_grpc.pb.go b/tvix/store/protos/rpc_pathinfo_grpc.pb.go index bf83a9b7d98b..d7b6711c0310 100644 --- a/tvix/store/protos/rpc_pathinfo_grpc.pb.go +++ b/tvix/store/protos/rpc_pathinfo_grpc.pb.go @@ -25,6 +25,7 @@ const ( PathInfoService_Get_FullMethodName = "/tvix.store.v1.PathInfoService/Get" PathInfoService_Put_FullMethodName = "/tvix.store.v1.PathInfoService/Put" PathInfoService_CalculateNAR_FullMethodName = "/tvix.store.v1.PathInfoService/CalculateNAR" + PathInfoService_List_FullMethodName = "/tvix.store.v1.PathInfoService/List" ) // PathInfoServiceClient is the client API for PathInfoService service. @@ -60,6 +61,9 @@ type PathInfoServiceClient interface { // It can also be used to calculate arbitrary NAR hashes of output paths, // in case a legacy Nix Binary Cache frontend is provided. CalculateNAR(ctx context.Context, in *Node, opts ...grpc.CallOption) (*CalculateNARResponse, error) + // Return a stream of PathInfo messages matching the criteria specified in + // ListPathInfoRequest. + List(ctx context.Context, in *ListPathInfoRequest, opts ...grpc.CallOption) (PathInfoService_ListClient, error) } type pathInfoServiceClient struct { @@ -97,6 +101,38 @@ func (c *pathInfoServiceClient) CalculateNAR(ctx context.Context, in *Node, opts return out, nil } +func (c *pathInfoServiceClient) List(ctx context.Context, in *ListPathInfoRequest, opts ...grpc.CallOption) (PathInfoService_ListClient, error) { + stream, err := c.cc.NewStream(ctx, &PathInfoService_ServiceDesc.Streams[0], PathInfoService_List_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &pathInfoServiceListClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type PathInfoService_ListClient interface { + Recv() (*PathInfo, error) + grpc.ClientStream +} + +type pathInfoServiceListClient struct { + grpc.ClientStream +} + +func (x *pathInfoServiceListClient) Recv() (*PathInfo, error) { + m := new(PathInfo) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // PathInfoServiceServer is the server API for PathInfoService service. // All implementations must embed UnimplementedPathInfoServiceServer // for forward compatibility @@ -130,6 +166,9 @@ type PathInfoServiceServer interface { // It can also be used to calculate arbitrary NAR hashes of output paths, // in case a legacy Nix Binary Cache frontend is provided. CalculateNAR(context.Context, *Node) (*CalculateNARResponse, error) + // Return a stream of PathInfo messages matching the criteria specified in + // ListPathInfoRequest. + List(*ListPathInfoRequest, PathInfoService_ListServer) error mustEmbedUnimplementedPathInfoServiceServer() } @@ -146,6 +185,9 @@ func (UnimplementedPathInfoServiceServer) Put(context.Context, *PathInfo) (*Path func (UnimplementedPathInfoServiceServer) CalculateNAR(context.Context, *Node) (*CalculateNARResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method CalculateNAR not implemented") } +func (UnimplementedPathInfoServiceServer) List(*ListPathInfoRequest, PathInfoService_ListServer) error { + return status.Errorf(codes.Unimplemented, "method List not implemented") +} func (UnimplementedPathInfoServiceServer) mustEmbedUnimplementedPathInfoServiceServer() {} // UnsafePathInfoServiceServer may be embedded to opt out of forward compatibility for this service. @@ -213,6 +255,27 @@ func _PathInfoService_CalculateNAR_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _PathInfoService_List_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ListPathInfoRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(PathInfoServiceServer).List(m, &pathInfoServiceListServer{stream}) +} + +type PathInfoService_ListServer interface { + Send(*PathInfo) error + grpc.ServerStream +} + +type pathInfoServiceListServer struct { + grpc.ServerStream +} + +func (x *pathInfoServiceListServer) Send(m *PathInfo) error { + return x.ServerStream.SendMsg(m) +} + // PathInfoService_ServiceDesc is the grpc.ServiceDesc for PathInfoService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -233,6 +296,12 @@ var PathInfoService_ServiceDesc = grpc.ServiceDesc{ Handler: _PathInfoService_CalculateNAR_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "List", + Handler: _PathInfoService_List_Handler, + ServerStreams: true, + }, + }, Metadata: "tvix/store/protos/rpc_pathinfo.proto", } diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index c98a89c4b8e7..2bd766697bd1 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,8 +1,12 @@ use super::PathInfoService; -use crate::{blobservice::BlobService, directoryservice::DirectoryService, proto}; +use crate::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self, ListPathInfoRequest}, +}; use std::sync::Arc; use tokio::net::UnixStream; -use tonic::{transport::Channel, Code, Status}; +use tonic::{transport::Channel, Code, Status, Streaming}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] @@ -160,6 +164,62 @@ impl PathInfoService for GRPCPathInfoService { Ok((resp.nar_size, nar_sha256)) } + + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let task: tokio::task::JoinHandle<Result<_, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .list(ListPathInfoRequest::default()) + .await? + .into_inner(); + + Ok(s) + }); + + let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + + Box::new(StreamIterator::new(self.tokio_handle.clone(), stream)) + } +} + +pub struct StreamIterator { + tokio_handle: tokio::runtime::Handle, + stream: Streaming<proto::PathInfo>, +} + +impl StreamIterator { + pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self { + Self { + tokio_handle, + stream, + } + } +} + +impl Iterator for StreamIterator { + type Item = Result<proto::PathInfo, crate::Error>; + + fn next(&mut self) -> Option<Self::Item> { + match self.tokio_handle.block_on(self.stream.message()) { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + return Some(Err(crate::Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))); + } + Some(Ok(pathinfo)) + } + None => None, + }, + Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), + } + } } #[cfg(test)] diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index f7abb2180ef7..aba1216c6e96 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -85,6 +85,18 @@ impl PathInfoService for MemoryPathInfoService { ) .map_err(|e| Error::StorageError(e.to_string())) } + + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_> { + let db = self.db.read().unwrap(); + + // Copy all elements into a list. + // This is a bit ugly, because we can't have db escape the lifetime + // of this function, but elements need to be returned owned anyways, and this in- + // memory impl is only for testing purposes anyways. + let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + + Box::new(items.into_iter()) + } } #[cfg(test)] diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 191e8dbd602e..51d3e51115eb 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -39,4 +39,8 @@ pub trait PathInfoService: Send + Sync { /// This can be used to calculate NAR-based output paths, /// and implementations are encouraged to cache it. fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>; + + /// Iterate over all PathInfo objects in the store. + /// Implementations can decide to disallow listing. + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_>; } diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 2448b073c622..4f327626d19d 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -136,6 +136,31 @@ impl PathInfoService for SledPathInfoService { ) .map_err(|e| Error::StorageError(e.to_string())) } + + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send> { + Box::new(self.db.iter().values().map(|v| match v { + Ok(data) => { + // we retrieved some bytes + match proto::PathInfo::decode(&*data) { + Ok(path_info) => Ok(path_info), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + } + } + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + })) + } } #[cfg(test)] diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index bbf235f8ace9..16a2fd51d0df 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -2,11 +2,14 @@ use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; use std::sync::Arc; +use tokio::task; +use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Result, Status}; -use tracing::{instrument, warn}; +use tracing::{debug, instrument, warn}; pub struct GRPCPathInfoServiceWrapper { path_info_service: Arc<dyn PathInfoService>, + // FUTUREWORK: allow exposing without allowing listing } impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper { @@ -19,6 +22,8 @@ impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper { #[async_trait] impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper { + type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>; + #[instrument(skip(self))] async fn get( &self, @@ -78,6 +83,29 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra } } } + + #[instrument(skip(self))] + async fn list( + &self, + _request: Request<proto::ListPathInfoRequest>, + ) -> Result<Response<Self::ListStream>, Status> { + let (tx, rx) = tokio::sync::mpsc::channel(5); + + let path_info_service = self.path_info_service.clone(); + + let _task = task::spawn(async move { + for e in path_info_service.list() { + let res = e.map_err(|e| Status::internal(e.to_string())); + if tx.send(res).await.is_err() { + debug!("receiver dropped"); + break; + } + } + }); + + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } } impl From<RenderError> for tonic::Status { diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index 95d97bb12890..114e89cacc10 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -9,6 +9,7 @@ use crate::tests::utils::gen_blob_service; use crate::tests::utils::gen_directory_service; use crate::tests::utils::gen_pathinfo_service; use std::sync::Arc; +use tokio_stream::wrappers::ReceiverStream; use tonic::Request; /// generates a GRPCPathInfoService out of blob, directory and pathinfo services. @@ -16,7 +17,8 @@ use tonic::Request; /// We only interact with it via the PathInfo GRPC interface. /// It uses the NonCachingNARCalculationService NARCalculationService to /// calculate NARs. -fn gen_grpc_service() -> Arc<dyn GRPCPathInfoService> { +fn gen_grpc_service( +) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> { let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service( |