From 74a8c3d3591801eea4ad00c74b98f0043f20d4cc Mon Sep 17 00:00:00 2001 From: Griffin Smith Date: Fri, 21 Aug 2020 18:58:58 -0400 Subject: fix(tvix): Chunk the AddTextToStore request Rather than sending the entire AddTextToStore request along in a single message, send it in a stream of chunks using the same metadata-first approach we've been using for the other store gRPC requests. This fixes a bug where certain builds could send more data than the maximum gRPC request size (4194304 bytes, it would appear), resulting in a RESOURCE_EXHAUSTED error. The initial chunk size, which is currently constant but should be made dynamic at some point in the future, has been chosen based on the IPC bandwidth delay product for tazjin's desktop, rounded up. Change-Id: I6f0232cdbc98653484816b39855126873fc59a03 Reviewed-on: https://cl.tvl.fyi/c/depot/+/1835 Tested-by: BuildkiteCI Reviewed-by: tazjin Reviewed-by: kanepyork --- third_party/nix/src/libstore/rpc-store.cc | 33 ++++++++++++++++++---- third_party/nix/src/nix-daemon/nix-daemon-proto.cc | 33 ++++++++++++++++++---- third_party/nix/src/proto/worker.proto | 15 +++++++--- 3 files changed, 65 insertions(+), 16 deletions(-) diff --git a/third_party/nix/src/libstore/rpc-store.cc b/third_party/nix/src/libstore/rpc-store.cc index 92ad4e762bf9..bc1fcc73d818 100644 --- a/third_party/nix/src/libstore/rpc-store.cc +++ b/third_party/nix/src/libstore/rpc-store.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,11 @@ namespace nix { namespace store { +// Should be set to the bandwidth delay product between the client and the +// daemon. The current value, which should eventually be determined dynamically, +// has currently been set to a developer's deskop computer, rounded up +constexpr size_t kChunkSize = 1024 * 64; + using google::protobuf::util::TimeUtil; using grpc::ClientContext; using nix::proto::WorkerService; @@ -308,14 +314,29 @@ Path RpcStore::addTextToStore(const std::string& name, "repairing is not supported when building through the Nix daemon"); } ClientContext ctx; - proto::AddTextToStoreRequest request; - request.set_name(name); - request.set_content(content); + proto::StorePath result; + auto writer = stub_->AddTextToStore(&ctx, &result); + + proto::AddTextToStoreRequest meta; + meta.mutable_meta()->set_name(name); + meta.mutable_meta()->set_size(content.size()); for (const auto& ref : references) { - request.add_references(ref); + meta.mutable_meta()->add_references(ref); } - proto::StorePath result; - SuccessOrThrow(stub_->AddTextToStore(&ctx, request, &result), __FUNCTION__); + writer->Write(meta); + + for (int i = 0; i <= content.size(); i += kChunkSize) { + auto len = std::min(kChunkSize, content.size() - i); + proto::AddTextToStoreRequest data; + data.set_data(content.data() + i, len); + if (!writer->Write(data)) { + // Finish() below will error + break; + } + } + + writer->WritesDone(); + SuccessOrThrow(writer->Finish(), __FUNCTION__); return result.path(); } diff --git a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc index 6d18fc40964f..ed859f584e39 100644 --- a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc +++ b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc @@ -271,17 +271,38 @@ class WorkerServiceImpl final : public WorkerService::Service { __FUNCTION__); } - Status AddTextToStore(grpc::ServerContext*, - const nix::proto::AddTextToStoreRequest* request, - nix::proto::StorePath* response) override { + Status AddTextToStore( + grpc::ServerContext*, + grpc::ServerReader* reader, + nix::proto::StorePath* response) override { return HandleExceptions( [&]() -> Status { + proto::AddTextToStoreRequest request; + auto has_metadata = reader->Read(&request); + if (!has_metadata || !request.has_meta()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Metadata must be set before sending content"); + } + + proto::AddTextToStoreRequest_Metadata meta = request.meta(); + PathSet references; - for (const auto& ref : request->references()) { + for (const auto& ref : meta.references()) { references.insert(ref); } - auto path = store_->addTextToStore(request->name(), - request->content(), references); + + std::string content; + content.reserve(meta.size()); + while (reader->Read(&request)) { + if (request.add_oneof_case() != request.kData) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "All requests except the first must contain data"); + } + + content.append(request.data()); + } + + auto path = store_->addTextToStore(meta.name(), content, references); response->set_path(path); return Status::OK; }, diff --git a/third_party/nix/src/proto/worker.proto b/third_party/nix/src/proto/worker.proto index 6a80b3e32b42..b0dc32afbe5f 100644 --- a/third_party/nix/src/proto/worker.proto +++ b/third_party/nix/src/proto/worker.proto @@ -23,7 +23,7 @@ service WorkerService { rpc AddToStore(stream AddToStoreRequest) returns (StorePath); // Adds the supplied string to the store, as a text file. - rpc AddTextToStore(AddTextToStoreRequest) returns (StorePath); + rpc AddTextToStore(stream AddTextToStoreRequest) returns (StorePath); // Build the specified derivations in one of the specified build // modes, defaulting to a normal build. @@ -223,9 +223,16 @@ message AddToStoreRequest { } message AddTextToStoreRequest { - string name = 1; - string content = 2; - repeated string references = 3; + message Metadata { + string name = 1; + repeated string references = 2; + uint64 size = 3; + } + + oneof add_oneof { + Metadata meta = 4; + bytes data = 5; + } } message BuildPathsRequest { -- cgit 1.4.1