From 7db734afad313b8669623138130797fad797daa7 Mon Sep 17 00:00:00 2001 From: Griffin Smith Date: Wed, 5 Aug 2020 22:46:21 -0400 Subject: feat(tvix): Implement AddToStoreNar Implement both the client and server sides of AddToStoreNar, using a templated generalization of the sources and sinks we were using for AddToStore on both ends. Change-Id: I73d0ed34118c711b125851dff99a7518ced4af35 Reviewed-on: https://cl.tvl.fyi/c/depot/+/1686 Tested-by: BuildkiteCI Reviewed-by: kanepyork --- third_party/nix/src/libstore/rpc-store.cc | 60 ++++++++++----- third_party/nix/src/libstore/rpc-store.hh | 6 -- third_party/nix/src/nix-daemon/nix-daemon-proto.cc | 86 ++++++++++++++++++++-- third_party/nix/src/proto/worker.proto | 10 ++- 4 files changed, 129 insertions(+), 33 deletions(-) (limited to 'third_party') diff --git a/third_party/nix/src/libstore/rpc-store.cc b/third_party/nix/src/libstore/rpc-store.cc index 751eb44900..28fc1c9eec 100644 --- a/third_party/nix/src/libstore/rpc-store.cc +++ b/third_party/nix/src/libstore/rpc-store.cc @@ -56,29 +56,32 @@ T FillFrom(const U& src) { return result; } -class AddToStorePathWriterSink : public BufferedSink { +template +class RPCSink : public BufferedSink { public: - explicit AddToStorePathWriterSink( - std::unique_ptr< - grpc_impl::ClientWriter>&& - writer) + using Writer = grpc::ClientWriter; + explicit RPCSink(std::unique_ptr&& writer) : writer_(std::move(writer)), good_(true) {} bool good() override { return good_; } void write(const unsigned char* data, size_t len) override { - proto::AddToStoreRequest req; + Request req; req.set_data(data, len); if (!writer_->Write(req)) { good_ = false; } } - grpc::Status Finish() { return writer_->Finish(); } + ~RPCSink() override { flush(); } + + grpc::Status Finish() { + flush(); + return writer_->Finish(); + } private: - std::unique_ptr> - writer_; + std::unique_ptr writer_; bool good_; }; @@ -319,14 +322,33 @@ void RpcStore::querySubstitutablePathInfos(const PathSet& paths, void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource, RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr accessor) { - throw Unsupported(absl::StrCat("Not implemented ", __func__)); -} + ClientContext ctx; + google::protobuf::Empty response; + auto writer = stub_->AddToStoreNar(&ctx, &response); + + proto::AddToStoreNarRequest path_info_req; + path_info_req.mutable_path_info()->mutable_path()->set_path(info.path); + path_info_req.mutable_path_info()->mutable_deriver()->set_path(info.deriver); + path_info_req.mutable_path_info()->set_nar_hash( + info.narHash.to_string(Base16, false)); + for (const auto& ref : info.references) { + path_info_req.mutable_path_info()->add_references(ref); + } + *path_info_req.mutable_path_info()->mutable_registration_time() = + TimeUtil::TimeTToTimestamp(info.registrationTime); + path_info_req.mutable_path_info()->set_nar_size(info.narSize); + path_info_req.mutable_path_info()->set_ultimate(info.ultimate); + for (const auto& sig : info.sigs) { + path_info_req.mutable_path_info()->add_sigs(sig); + } + path_info_req.mutable_path_info()->set_ca(info.ca); + path_info_req.mutable_path_info()->set_repair(repair); + path_info_req.mutable_path_info()->set_check_sigs(checkSigs); -void RpcStore::addToStore(const ValidPathInfo& info, - const ref& nar, RepairFlag repair, - CheckSigsFlag checkSigs, - std::shared_ptr accessor) { - throw Unsupported(absl::StrCat("Not implemented ", __func__)); + writer->Write(path_info_req); + + RPCSink sink(std::move(writer)); + copyNAR(narSource, sink); } Path RpcStore::addToStore(const std::string& name, const Path& srcPath, @@ -349,7 +371,7 @@ Path RpcStore::addToStore(const std::string& name, const Path& srcPath, metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo)); writer->Write(metadata_req); - AddToStorePathWriterSink sink(std::move(writer)); + RPCSink sink(std::move(writer)); dumpPath(std::filesystem::absolute(srcPath), sink); sink.flush(); SuccessOrThrow(sink.Finish()); @@ -402,7 +424,9 @@ void RpcStore::ensurePath(const Path& path) { } void RpcStore::addTempRoot(const Path& path) { - throw Unsupported(absl::StrCat("Not implemented ", __func__)); + ClientContext ctx; + google::protobuf::Empty response; + SuccessOrThrow(stub_->AddTempRoot(&ctx, StorePath(path), &response)); } void RpcStore::addIndirectRoot(const Path& path) { diff --git a/third_party/nix/src/libstore/rpc-store.hh b/third_party/nix/src/libstore/rpc-store.hh index 0ef24e22b7..dd421caf44 100644 --- a/third_party/nix/src/libstore/rpc-store.hh +++ b/third_party/nix/src/libstore/rpc-store.hh @@ -56,12 +56,6 @@ class RpcStore : public LocalFSStore, public virtual Store { CheckSigsFlag checkSigs = CheckSigs, std::shared_ptr accessor = 0) override; - virtual void addToStore(const ValidPathInfo& info, - const ref& nar, - RepairFlag repair = NoRepair, - CheckSigsFlag checkSigs = CheckSigs, - std::shared_ptr accessor = 0) override; - virtual Path addToStore(const std::string& name, const Path& srcPath, bool recursive = true, HashType hashAlgo = htSHA256, PathFilter& filter = defaultPathFilter, diff --git a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc index b310cc92c4..33e440dfde 100644 --- a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc +++ b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -10,6 +11,7 @@ #include #include #include +#include #include "libmain/shared.hh" #include "libproto/worker.grpc.pb.h" @@ -24,6 +26,7 @@ namespace nix::daemon { +using ::google::protobuf::util::TimeUtil; using ::grpc::Status; using ::nix::proto::BuildStatus; using ::nix::proto::PathInfo; @@ -31,20 +34,20 @@ using ::nix::proto::StorePath; using ::nix::proto::StorePaths; using ::nix::proto::WorkerService; -class AddToStoreRequestSource final : public Source { - using Reader = grpc::ServerReader; - +template +class RPCSource final : public Source { public: - explicit AddToStoreRequestSource(Reader* reader) : reader_(reader) {} + using Reader = grpc::ServerReader; + explicit RPCSource(Reader* reader) : reader_(reader) {} size_t read(unsigned char* data, size_t len) override { auto got = buffer_.sgetn(reinterpret_cast(data), len); if (got < len) { - proto::AddToStoreRequest msg; + Request msg; if (!reader_->Read(&msg)) { return got; } - if (msg.add_oneof_case() != proto::AddToStoreRequest::kData) { + if (msg.add_oneof_case() != Request::kData) { // TODO(grfn): Make Source::read return a StatusOr and get rid of this // throw throw Error( @@ -152,7 +155,7 @@ class WorkerServiceImpl final : public WorkerService::Service { } auto meta = metadata_request.meta(); - AddToStoreRequestSource source(reader); + RPCSource source(reader); auto opt_hash_type = hash_type_from(meta.hash_type()); if (!opt_hash_type) { return Status(grpc::StatusCode::INVALID_ARGUMENT, @@ -194,6 +197,62 @@ class WorkerServiceImpl final : public WorkerService::Service { __FUNCTION__); } + Status AddToStoreNar( + grpc::ServerContext* context, + grpc::ServerReader* reader, + google::protobuf::Empty*) override { + return HandleExceptions( + [&]() -> Status { + proto::AddToStoreNarRequest path_info_request; + auto has_path_info = reader->Read(&path_info_request); + if (!has_path_info || !path_info_request.has_path_info()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Path info must be set before sending nar content"); + } + + auto path_info = path_info_request.path_info(); + + ValidPathInfo info; + info.path = path_info.path().path(); + info.deriver = path_info.deriver().path(); + + if (!info.deriver.empty()) { + ASSERT_INPUT_STORE_PATH(info.deriver); + } + + auto nar_hash = Hash::deserialize(path_info.nar_hash(), htSHA256); + + if (!nar_hash.ok()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + std::string(nar_hash.status().message())); + } + + info.narHash = nar_hash.ConsumeValueOrDie(); + for (const auto& ref : path_info.references()) { + info.references.insert(ref); + } + info.registrationTime = + TimeUtil::TimestampToTimeT(path_info.registration_time()); + info.narSize = path_info.nar_size(); + info.ultimate = path_info.ultimate(); + for (const auto& sig : path_info.sigs()) { + info.sigs.insert(sig); + } + info.ca = path_info.ca(); + + auto repair = path_info.repair(); + auto check_sigs = path_info.check_sigs(); + + std::string saved; + RPCSource source(reader); + store_->addToStore(info, source, repair ? Repair : NoRepair, + check_sigs ? CheckSigs : NoCheckSigs, nullptr); + + return Status::OK; + }, + __FUNCTION__); + } + Status AddTextToStore(grpc::ServerContext*, const nix::proto::AddTextToStoreRequest* request, nix::proto::StorePath* response) override { @@ -228,6 +287,19 @@ class WorkerServiceImpl final : public WorkerService::Service { return Status::OK; } + Status AddTempRoot(grpc::ServerContext*, const nix::proto::StorePath* request, + google::protobuf::Empty*) override { + auto path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + + return HandleExceptions( + [&]() -> Status { + store_->addTempRoot(path); + return Status::OK; + }, + __FUNCTION__); + } + Status AddIndirectRoot(grpc::ServerContext*, const nix::proto::StorePath* request, google::protobuf::Empty*) override { diff --git a/third_party/nix/src/proto/worker.proto b/third_party/nix/src/proto/worker.proto index abe8f622c9..cc5be46d6b 100644 --- a/third_party/nix/src/proto/worker.proto +++ b/third_party/nix/src/proto/worker.proto @@ -33,6 +33,8 @@ service WorkerService { rpc EnsurePath(StorePath) returns (google.protobuf.Empty); // TODO: What does this do? + // TODO(grfn): This should not actually take a StorePath, as it's not a + // StorePath rpc AddTempRoot(StorePath) returns (google.protobuf.Empty); // TODO: What does this do? @@ -260,6 +262,10 @@ message PathInfo { repeated string sigs = 7; // If non-empty, an assertion that the path is content-addressed string ca = 8; + + // Only used for AddToStoreNarRequest + bool repair = 12; + bool check_sigs = 13; } message SubstitutablePathInfos { @@ -318,9 +324,9 @@ message AddSignaturesRequest { } message AddToStoreNarRequest { - oneof add_oneoff { + oneof add_oneof { PathInfo path_info = 1; - bytes chunk = 2; + bytes data = 2; } } -- cgit 1.4.1