diff options
author | Griffin Smith <grfn@gws.fyi> | 2020-08-06T02·46-0400 |
---|---|---|
committer | glittershark <grfn@gws.fyi> | 2020-08-08T17·51+0000 |
commit | 7db734afad313b8669623138130797fad797daa7 (patch) | |
tree | 7acf1dd3f641e47513d6beafe68344909d3115c7 /third_party/nix | |
parent | cc01059d405e982e3a7de78e318374c3dbace48b (diff) |
feat(tvix): Implement AddToStoreNar r/1615
Implement both the client and server sides of AddToStoreNar, using a templated generalization of the sources and sinks we were using for AddToStore on both ends. Change-Id: I73d0ed34118c711b125851dff99a7518ced4af35 Reviewed-on: https://cl.tvl.fyi/c/depot/+/1686 Tested-by: BuildkiteCI Reviewed-by: kanepyork <rikingcoding@gmail.com>
Diffstat (limited to 'third_party/nix')
-rw-r--r-- | third_party/nix/src/libstore/rpc-store.cc | 60 | ||||
-rw-r--r-- | third_party/nix/src/libstore/rpc-store.hh | 6 | ||||
-rw-r--r-- | third_party/nix/src/nix-daemon/nix-daemon-proto.cc | 86 | ||||
-rw-r--r-- | third_party/nix/src/proto/worker.proto | 10 |
4 files changed, 129 insertions, 33 deletions
diff --git a/third_party/nix/src/libstore/rpc-store.cc b/third_party/nix/src/libstore/rpc-store.cc index 751eb449006d..28fc1c9eec7a 100644 --- a/third_party/nix/src/libstore/rpc-store.cc +++ b/third_party/nix/src/libstore/rpc-store.cc @@ -56,29 +56,32 @@ T FillFrom(const U& src) { return result; } -class AddToStorePathWriterSink : public BufferedSink { +template <typename Request> +class RPCSink : public BufferedSink { public: - explicit AddToStorePathWriterSink( - std::unique_ptr< - grpc_impl::ClientWriter<class nix::proto::AddToStoreRequest>>&& - writer) + using Writer = grpc::ClientWriter<Request>; + explicit RPCSink(std::unique_ptr<Writer>&& writer) : writer_(std::move(writer)), good_(true) {} bool good() override { return good_; } void write(const unsigned char* data, size_t len) override { - proto::AddToStoreRequest req; + Request req; req.set_data(data, len); if (!writer_->Write(req)) { good_ = false; } } - grpc::Status Finish() { return writer_->Finish(); } + ~RPCSink() override { flush(); } + + grpc::Status Finish() { + flush(); + return writer_->Finish(); + } private: - std::unique_ptr<grpc_impl::ClientWriter<class nix::proto::AddToStoreRequest>> - writer_; + std::unique_ptr<Writer> writer_; bool good_; }; @@ -319,14 +322,33 @@ void RpcStore::querySubstitutablePathInfos(const PathSet& paths, void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource, RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor) { - throw Unsupported(absl::StrCat("Not implemented ", __func__)); -} + ClientContext ctx; + google::protobuf::Empty response; + auto writer = stub_->AddToStoreNar(&ctx, &response); + + proto::AddToStoreNarRequest path_info_req; + path_info_req.mutable_path_info()->mutable_path()->set_path(info.path); + path_info_req.mutable_path_info()->mutable_deriver()->set_path(info.deriver); + path_info_req.mutable_path_info()->set_nar_hash( + info.narHash.to_string(Base16, false)); + for (const auto& ref : info.references) { + path_info_req.mutable_path_info()->add_references(ref); + } + *path_info_req.mutable_path_info()->mutable_registration_time() = + TimeUtil::TimeTToTimestamp(info.registrationTime); + path_info_req.mutable_path_info()->set_nar_size(info.narSize); + path_info_req.mutable_path_info()->set_ultimate(info.ultimate); + for (const auto& sig : info.sigs) { + path_info_req.mutable_path_info()->add_sigs(sig); + } + path_info_req.mutable_path_info()->set_ca(info.ca); + path_info_req.mutable_path_info()->set_repair(repair); + path_info_req.mutable_path_info()->set_check_sigs(checkSigs); -void RpcStore::addToStore(const ValidPathInfo& info, - const ref<std::string>& nar, RepairFlag repair, - CheckSigsFlag checkSigs, - std::shared_ptr<FSAccessor> accessor) { - throw Unsupported(absl::StrCat("Not implemented ", __func__)); + writer->Write(path_info_req); + + RPCSink sink(std::move(writer)); + copyNAR(narSource, sink); } Path RpcStore::addToStore(const std::string& name, const Path& srcPath, @@ -349,7 +371,7 @@ Path RpcStore::addToStore(const std::string& name, const Path& srcPath, metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo)); writer->Write(metadata_req); - AddToStorePathWriterSink sink(std::move(writer)); + RPCSink sink(std::move(writer)); dumpPath(std::filesystem::absolute(srcPath), sink); sink.flush(); SuccessOrThrow(sink.Finish()); @@ -402,7 +424,9 @@ void RpcStore::ensurePath(const Path& path) { } void RpcStore::addTempRoot(const Path& path) { - throw Unsupported(absl::StrCat("Not implemented ", __func__)); + ClientContext ctx; + google::protobuf::Empty response; + SuccessOrThrow(stub_->AddTempRoot(&ctx, StorePath(path), &response)); } void RpcStore::addIndirectRoot(const Path& path) { diff --git a/third_party/nix/src/libstore/rpc-store.hh b/third_party/nix/src/libstore/rpc-store.hh index 0ef24e22b751..dd421caf44c4 100644 --- a/third_party/nix/src/libstore/rpc-store.hh +++ b/third_party/nix/src/libstore/rpc-store.hh @@ -56,12 +56,6 @@ class RpcStore : public LocalFSStore, public virtual Store { CheckSigsFlag checkSigs = CheckSigs, std::shared_ptr<FSAccessor> accessor = 0) override; - virtual void addToStore(const ValidPathInfo& info, - const ref<std::string>& nar, - RepairFlag repair = NoRepair, - CheckSigsFlag checkSigs = CheckSigs, - std::shared_ptr<FSAccessor> accessor = 0) override; - virtual Path addToStore(const std::string& name, const Path& srcPath, bool recursive = true, HashType hashAlgo = htSHA256, PathFilter& filter = defaultPathFilter, diff --git a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc index b310cc92c43c..33e440dfde92 100644 --- a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc +++ b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc @@ -2,6 +2,7 @@ #include <filesystem> #include <sstream> +#include <string> #include <absl/strings/str_cat.h> #include <absl/strings/str_format.h> @@ -10,6 +11,7 @@ #include <grpcpp/impl/codegen/server_context.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/status_code_enum.h> +#include <grpcpp/impl/codegen/sync_stream.h> #include "libmain/shared.hh" #include "libproto/worker.grpc.pb.h" @@ -24,6 +26,7 @@ namespace nix::daemon { +using ::google::protobuf::util::TimeUtil; using ::grpc::Status; using ::nix::proto::BuildStatus; using ::nix::proto::PathInfo; @@ -31,20 +34,20 @@ using ::nix::proto::StorePath; using ::nix::proto::StorePaths; using ::nix::proto::WorkerService; -class AddToStoreRequestSource final : public Source { - using Reader = grpc::ServerReader<nix::proto::AddToStoreRequest>; - +template <typename Request> +class RPCSource final : public Source { public: - explicit AddToStoreRequestSource(Reader* reader) : reader_(reader) {} + using Reader = grpc::ServerReader<Request>; + explicit RPCSource(Reader* reader) : reader_(reader) {} size_t read(unsigned char* data, size_t len) override { auto got = buffer_.sgetn(reinterpret_cast<char*>(data), len); if (got < len) { - proto::AddToStoreRequest msg; + Request msg; if (!reader_->Read(&msg)) { return got; } - if (msg.add_oneof_case() != proto::AddToStoreRequest::kData) { + if (msg.add_oneof_case() != Request::kData) { // TODO(grfn): Make Source::read return a StatusOr and get rid of this // throw throw Error( @@ -152,7 +155,7 @@ class WorkerServiceImpl final : public WorkerService::Service { } auto meta = metadata_request.meta(); - AddToStoreRequestSource source(reader); + RPCSource source(reader); auto opt_hash_type = hash_type_from(meta.hash_type()); if (!opt_hash_type) { return Status(grpc::StatusCode::INVALID_ARGUMENT, @@ -194,6 +197,62 @@ class WorkerServiceImpl final : public WorkerService::Service { __FUNCTION__); } + Status AddToStoreNar( + grpc::ServerContext* context, + grpc::ServerReader<nix::proto::AddToStoreNarRequest>* reader, + google::protobuf::Empty*) override { + return HandleExceptions( + [&]() -> Status { + proto::AddToStoreNarRequest path_info_request; + auto has_path_info = reader->Read(&path_info_request); + if (!has_path_info || !path_info_request.has_path_info()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Path info must be set before sending nar content"); + } + + auto path_info = path_info_request.path_info(); + + ValidPathInfo info; + info.path = path_info.path().path(); + info.deriver = path_info.deriver().path(); + + if (!info.deriver.empty()) { + ASSERT_INPUT_STORE_PATH(info.deriver); + } + + auto nar_hash = Hash::deserialize(path_info.nar_hash(), htSHA256); + + if (!nar_hash.ok()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + std::string(nar_hash.status().message())); + } + + info.narHash = nar_hash.ConsumeValueOrDie(); + for (const auto& ref : path_info.references()) { + info.references.insert(ref); + } + info.registrationTime = + TimeUtil::TimestampToTimeT(path_info.registration_time()); + info.narSize = path_info.nar_size(); + info.ultimate = path_info.ultimate(); + for (const auto& sig : path_info.sigs()) { + info.sigs.insert(sig); + } + info.ca = path_info.ca(); + + auto repair = path_info.repair(); + auto check_sigs = path_info.check_sigs(); + + std::string saved; + RPCSource source(reader); + store_->addToStore(info, source, repair ? Repair : NoRepair, + check_sigs ? CheckSigs : NoCheckSigs, nullptr); + + return Status::OK; + }, + __FUNCTION__); + } + Status AddTextToStore(grpc::ServerContext*, const nix::proto::AddTextToStoreRequest* request, nix::proto::StorePath* response) override { @@ -228,6 +287,19 @@ class WorkerServiceImpl final : public WorkerService::Service { return Status::OK; } + Status AddTempRoot(grpc::ServerContext*, const nix::proto::StorePath* request, + google::protobuf::Empty*) override { + auto path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + + return HandleExceptions( + [&]() -> Status { + store_->addTempRoot(path); + return Status::OK; + }, + __FUNCTION__); + } + Status AddIndirectRoot(grpc::ServerContext*, const nix::proto::StorePath* request, google::protobuf::Empty*) override { diff --git a/third_party/nix/src/proto/worker.proto b/third_party/nix/src/proto/worker.proto index abe8f622c9f2..cc5be46d6ba7 100644 --- a/third_party/nix/src/proto/worker.proto +++ b/third_party/nix/src/proto/worker.proto @@ -33,6 +33,8 @@ service WorkerService { rpc EnsurePath(StorePath) returns (google.protobuf.Empty); // TODO: What does this do? + // TODO(grfn): This should not actually take a StorePath, as it's not a + // StorePath rpc AddTempRoot(StorePath) returns (google.protobuf.Empty); // TODO: What does this do? @@ -260,6 +262,10 @@ message PathInfo { repeated string sigs = 7; // If non-empty, an assertion that the path is content-addressed string ca = 8; + + // Only used for AddToStoreNarRequest + bool repair = 12; + bool check_sigs = 13; } message SubstitutablePathInfos { @@ -318,9 +324,9 @@ message AddSignaturesRequest { } message AddToStoreNarRequest { - oneof add_oneoff { + oneof add_oneof { PathInfo path_info = 1; - bytes chunk = 2; + bytes data = 2; } } |