diff options
Diffstat (limited to 'third_party/nix/src/libstore/rpc-store.cc')
-rw-r--r-- | third_party/nix/src/libstore/rpc-store.cc | 549 |
1 files changed, 0 insertions, 549 deletions
diff --git a/third_party/nix/src/libstore/rpc-store.cc b/third_party/nix/src/libstore/rpc-store.cc deleted file mode 100644 index c29bd059de9b..000000000000 --- a/third_party/nix/src/libstore/rpc-store.cc +++ /dev/null @@ -1,549 +0,0 @@ -#include "rpc-store.hh" - -#include <algorithm> -#include <filesystem> -#include <memory> -#include <optional> -#include <ostream> -#include <string_view> - -#include <absl/status/status.h> -#include <absl/strings/str_cat.h> -#include <absl/strings/str_format.h> -#include <absl/strings/string_view.h> -#include <glog/logging.h> -#include <google/protobuf/empty.pb.h> -#include <google/protobuf/util/time_util.h> -#include <grpcpp/create_channel.h> -#include <grpcpp/impl/codegen/async_unary_call.h> -#include <grpcpp/impl/codegen/client_context.h> -#include <grpcpp/impl/codegen/completion_queue.h> -#include <grpcpp/impl/codegen/status.h> -#include <grpcpp/impl/codegen/status_code_enum.h> -#include <grpcpp/impl/codegen/sync_stream.h> -#include <grpcpp/security/credentials.h> -#include <sys/ucontext.h> - -#include "libproto/worker.grpc.pb.h" -#include "libproto/worker.pb.h" -#include "libstore/derivations.hh" -#include "libstore/store-api.hh" -#include "libstore/worker-protocol.hh" -#include "libutil/archive.hh" -#include "libutil/hash.hh" -#include "libutil/proto.hh" -#include "libutil/types.hh" - -namespace nix { - -namespace store { - -// Should be set to the bandwidth delay product between the client and the -// daemon. The current value, which should eventually be determined dynamically, -// has currently been set to a developer's deskop computer, rounded up -constexpr size_t kChunkSize = 1024 * 64; - -using google::protobuf::util::TimeUtil; -using grpc::ClientContext; -using nix::proto::WorkerService; - -static google::protobuf::Empty kEmpty; - -template <typename Request> -class RPCSink : public BufferedSink { - public: - using Writer = grpc::ClientWriter<Request>; - explicit RPCSink(std::unique_ptr<Writer>&& writer) - : writer_(std::move(writer)), good_(true) {} - - bool good() override { return good_; } - - void write(const unsigned char* data, size_t len) override { - Request req; - req.set_data(data, len); - if (!writer_->Write(req)) { - good_ = false; - } - } - - ~RPCSink() override { flush(); } - - grpc::Status Finish() { - flush(); - return writer_->Finish(); - } - - private: - std::unique_ptr<Writer> writer_; - bool good_; -}; - -// TODO(grfn): Obviously this should go away and be replaced by StatusOr... but -// that would require refactoring the entire store api, which we don't feel like -// doing right now. We should at some point though -void const RpcStore::SuccessOrThrow(const grpc::Status& status, - const absl::string_view& call) const { - if (!status.ok()) { - auto uri = uri_.value_or("unknown URI"); - switch (status.error_code()) { - case grpc::StatusCode::UNIMPLEMENTED: - throw Unsupported( - absl::StrFormat("operation %s is not supported by store at %s: %s", - call, uri, status.error_message())); - default: - throw Error(absl::StrFormat( - "Rpc call %s to %s failed (%s): %s ", call, uri, - util::proto::GRPCStatusCodeDescription(status.error_code()), - status.error_message())); - } - } -} - -bool RpcStore::isValidPathUncached(const Path& path) { - ClientContext ctx; - proto::IsValidPathResponse resp; - SuccessOrThrow(stub_->IsValidPath(&ctx, util::proto::StorePath(path), &resp), - __FUNCTION__); - return resp.is_valid(); -} - -PathSet RpcStore::queryAllValidPaths() { - ClientContext ctx; - proto::StorePaths paths; - SuccessOrThrow(stub_->QueryAllValidPaths(&ctx, kEmpty, &paths), __FUNCTION__); - return util::proto::FillFrom<PathSet>(paths.paths()); -} - -PathSet RpcStore::queryValidPaths(const PathSet& paths, - SubstituteFlag maybeSubstitute) { - ClientContext ctx; - proto::StorePaths store_paths; - for (const auto& path : paths) { - store_paths.add_paths(path); - } - proto::StorePaths result_paths; - SuccessOrThrow(stub_->QueryValidPaths(&ctx, store_paths, &result_paths), - __FUNCTION__); - return util::proto::FillFrom<PathSet>(result_paths.paths()); -} - -void RpcStore::queryPathInfoUncached( - const Path& path, - Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept { - ClientContext ctx; - proto::StorePath store_path; - store_path.set_path(path); - - try { - proto::PathInfo path_info; - auto result = stub_->QueryPathInfo(&ctx, store_path, &path_info); - if (result.error_code() == grpc::INVALID_ARGUMENT) { - throw InvalidPath(absl::StrFormat("path '%s' is not valid", path)); - } - SuccessOrThrow(result); - - std::shared_ptr<ValidPathInfo> info; - - if (!path_info.is_valid()) { - throw InvalidPath(absl::StrFormat("path '%s' is not valid", path)); - } - - info = std::make_shared<ValidPathInfo>(); - info->path = path; - info->deriver = path_info.deriver().path(); - if (!info->deriver.empty()) { - assertStorePath(info->deriver); - } - auto hash_ = Hash::deserialize(path_info.nar_hash(), htSHA256); - info->narHash = Hash::unwrap_throw(hash_); - info->references.insert(path_info.references().begin(), - path_info.references().end()); - info->registrationTime = - TimeUtil::TimestampToTimeT(path_info.registration_time()); - info->narSize = path_info.nar_size(); - info->ultimate = path_info.ultimate(); - info->sigs.insert(path_info.sigs().begin(), path_info.sigs().end()); - info->ca = path_info.ca(); - - callback(std::move(info)); - } catch (...) { - callback.rethrow(); - } -} - -void RpcStore::queryReferrers(const Path& path, PathSet& referrers) { - ClientContext ctx; - proto::StorePaths paths; - SuccessOrThrow( - stub_->QueryReferrers(&ctx, util::proto::StorePath(path), &paths), - __FUNCTION__); - referrers.insert(paths.paths().begin(), paths.paths().end()); -} - -PathSet RpcStore::queryValidDerivers(const Path& path) { - ClientContext ctx; - proto::StorePaths paths; - SuccessOrThrow( - stub_->QueryValidDerivers(&ctx, util::proto::StorePath(path), &paths), - __FUNCTION__); - return util::proto::FillFrom<PathSet>(paths.paths()); -} - -PathSet RpcStore::queryDerivationOutputs(const Path& path) { - ClientContext ctx; - proto::StorePaths paths; - SuccessOrThrow( - stub_->QueryDerivationOutputs(&ctx, util::proto::StorePath(path), &paths), - __FUNCTION__); - return util::proto::FillFrom<PathSet>(paths.paths()); -} - -StringSet RpcStore::queryDerivationOutputNames(const Path& path) { - ClientContext ctx; - proto::DerivationOutputNames output_names; - SuccessOrThrow(stub_->QueryDerivationOutputNames( - &ctx, util::proto::StorePath(path), &output_names)); - return util::proto::FillFrom<StringSet>(output_names.names()); -} - -Path RpcStore::queryPathFromHashPart(const std::string& hashPart) { - ClientContext ctx; - proto::StorePath path; - proto::HashPart proto_hash_part; - proto_hash_part.set_hash_part(hashPart); - SuccessOrThrow(stub_->QueryPathFromHashPart(&ctx, proto_hash_part, &path), - __FUNCTION__); - return path.path(); -} - -PathSet RpcStore::querySubstitutablePaths(const PathSet& paths) { - ClientContext ctx; - proto::StorePaths result; - SuccessOrThrow(stub_->QuerySubstitutablePaths( - &ctx, util::proto::StorePaths(paths), &result)); - return util::proto::FillFrom<PathSet>(result.paths()); -} - -void RpcStore::querySubstitutablePathInfos(const PathSet& paths, - SubstitutablePathInfos& infos) { - ClientContext ctx; - proto::SubstitutablePathInfos result; - SuccessOrThrow(stub_->QuerySubstitutablePathInfos( - &ctx, util::proto::StorePaths(paths), &result)); - - for (const auto& path_info : result.path_infos()) { - auto path = path_info.path().path(); - SubstitutablePathInfo& info(infos[path]); - info.deriver = path_info.deriver().path(); - if (!info.deriver.empty()) { - assertStorePath(info.deriver); - } - info.references = util::proto::FillFrom<PathSet>(path_info.references()); - info.downloadSize = path_info.download_size(); - info.narSize = path_info.nar_size(); - } -} - -void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource, - RepairFlag repair, CheckSigsFlag checkSigs, - std::shared_ptr<FSAccessor> accessor) { - ClientContext ctx; - google::protobuf::Empty response; - auto writer = stub_->AddToStoreNar(&ctx, &response); - - proto::AddToStoreNarRequest path_info_req; - path_info_req.mutable_path_info()->mutable_path()->set_path(info.path); - path_info_req.mutable_path_info()->mutable_deriver()->set_path(info.deriver); - path_info_req.mutable_path_info()->set_nar_hash( - info.narHash.to_string(Base16, false)); - for (const auto& ref : info.references) { - path_info_req.mutable_path_info()->add_references(ref); - } - *path_info_req.mutable_path_info()->mutable_registration_time() = - TimeUtil::TimeTToTimestamp(info.registrationTime); - path_info_req.mutable_path_info()->set_nar_size(info.narSize); - path_info_req.mutable_path_info()->set_ultimate(info.ultimate); - for (const auto& sig : info.sigs) { - path_info_req.mutable_path_info()->add_sigs(sig); - } - path_info_req.mutable_path_info()->set_ca(info.ca); - path_info_req.mutable_path_info()->set_repair(repair); - path_info_req.mutable_path_info()->set_check_sigs(checkSigs); - - if (!writer->Write(path_info_req)) { - throw Error("Could not write to nix daemon"); - } - - RPCSink sink(std::move(writer)); - copyNAR(narSource, sink); - SuccessOrThrow(sink.Finish(), __FUNCTION__); -} - -Path RpcStore::addToStore(const std::string& name, const Path& srcPath, - bool recursive, HashType hashAlgo, PathFilter& filter, - RepairFlag repair) { - if (repair != 0u) { - throw Error( - "repairing is not supported when building through the Nix daemon"); - } - - ClientContext ctx; - proto::StorePath response; - auto writer = stub_->AddToStore(&ctx, &response); - - proto::AddToStoreRequest metadata_req; - metadata_req.mutable_meta()->set_base_name(name); - // TODO(grfn): what is fixed? - metadata_req.mutable_meta()->set_fixed(!(hashAlgo == htSHA256 && recursive)); - metadata_req.mutable_meta()->set_recursive(recursive); - metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo)); - - if (!writer->Write(metadata_req)) { - throw Error("Could not write to nix daemon"); - } - - RPCSink sink(std::move(writer)); - dumpPath(std::filesystem::absolute(srcPath), sink); - sink.flush(); - SuccessOrThrow(sink.Finish(), __FUNCTION__); - - return response.path(); -} - -Path RpcStore::addTextToStore(const std::string& name, - const std::string& content, - const PathSet& references, RepairFlag repair) { - if (repair != 0u) { - throw Error( - "repairing is not supported when building through the Nix daemon"); - } - ClientContext ctx; - proto::StorePath result; - auto writer = stub_->AddTextToStore(&ctx, &result); - - proto::AddTextToStoreRequest meta; - meta.mutable_meta()->set_name(name); - meta.mutable_meta()->set_size(content.size()); - for (const auto& ref : references) { - meta.mutable_meta()->add_references(ref); - } - writer->Write(meta); - - for (int i = 0; i <= content.size(); i += kChunkSize) { - auto len = std::min(kChunkSize, content.size() - i); - proto::AddTextToStoreRequest data; - data.set_data(content.data() + i, len); - if (!writer->Write(data)) { - // Finish() below will error - break; - } - } - - writer->WritesDone(); - SuccessOrThrow(writer->Finish(), __FUNCTION__); - return result.path(); -} - -absl::Status RpcStore::buildPaths(std::ostream& log_sink, const PathSet& paths, - BuildMode build_mode) { - ClientContext ctx; - proto::BuildPathsRequest request; - for (const auto& path : paths) { - request.add_drvs(path); - } - - google::protobuf::Empty response; - request.set_mode(nix::BuildModeToProto(build_mode)); - - std::unique_ptr<grpc::ClientReader<proto::BuildEvent>> reader = - stub_->BuildPaths(&ctx, request); - - proto::BuildEvent event; - while (reader->Read(&event)) { - if (event.has_build_log()) { - // TODO(tazjin): Include .path()? - log_sink << event.build_log().line(); - } else { - log_sink << "Building path: " << event.building_path().path() - << std::endl; - } - - // has_result() is not in use in this call (for now) - } - - return nix::util::proto::GRPCStatusToAbsl(reader->Finish()); -} - -BuildResult RpcStore::buildDerivation(std::ostream& log_sink, - const Path& drvPath, - const BasicDerivation& drv, - BuildMode buildMode) { - ClientContext ctx; - proto::BuildDerivationRequest request; - request.mutable_drv_path()->set_path(drvPath); - proto::Derivation proto_drv = drv.to_proto(); - *request.mutable_derivation() = proto_drv; - request.set_build_mode(BuildModeToProto(buildMode)); - - std::unique_ptr<grpc::ClientReader<proto::BuildEvent>> reader = - stub_->BuildDerivation(&ctx, request); - - std::optional<BuildResult> result; - - proto::BuildEvent event; - while (reader->Read(&event)) { - if (event.has_build_log()) { - log_sink << event.build_log().line(); - } else if (event.has_result()) { - result = BuildResult::FromProto(event.result()); - } - } - SuccessOrThrow(reader->Finish(), __FUNCTION__); - - if (!result.has_value()) { - throw Error("Invalid response from daemon for buildDerivation"); - } - return result.value(); -} - -void RpcStore::ensurePath(const Path& path) { - ClientContext ctx; - google::protobuf::Empty response; - SuccessOrThrow( - stub_->EnsurePath(&ctx, util::proto::StorePath(path), &response), - __FUNCTION__); -} - -void RpcStore::addTempRoot(const Path& path) { - ClientContext ctx; - google::protobuf::Empty response; - SuccessOrThrow( - stub_->AddTempRoot(&ctx, util::proto::StorePath(path), &response), - __FUNCTION__); -} - -void RpcStore::addIndirectRoot(const Path& path) { - ClientContext ctx; - google::protobuf::Empty response; - SuccessOrThrow( - stub_->AddIndirectRoot(&ctx, util::proto::StorePath(path), &response), - __FUNCTION__); -} - -void RpcStore::syncWithGC() { - ClientContext ctx; - google::protobuf::Empty response; - SuccessOrThrow(stub_->SyncWithGC(&ctx, kEmpty, &response), __FUNCTION__); -} - -Roots RpcStore::findRoots(bool censor) { - ClientContext ctx; - proto::FindRootsResponse response; - SuccessOrThrow(stub_->FindRoots(&ctx, kEmpty, &response), __FUNCTION__); - Roots result; - - for (const auto& [target, links] : response.roots()) { - auto link_paths = - util::proto::FillFrom<std::unordered_set<std::string>>(links.paths()); - result.insert({target, link_paths}); - } - - return result; -} - -void RpcStore::collectGarbage(const GCOptions& options, GCResults& results) { - ClientContext ctx; - proto::CollectGarbageRequest request; - request.set_action(options.ActionToProto()); - for (const auto& path : options.pathsToDelete) { - request.add_paths_to_delete(path); - } - request.set_ignore_liveness(options.ignoreLiveness); - request.set_max_freed(options.maxFreed); - - proto::CollectGarbageResponse response; - SuccessOrThrow(stub_->CollectGarbage(&ctx, request, &response), __FUNCTION__); - - for (const auto& path : response.deleted_paths()) { - results.paths.insert(path); - } - results.bytesFreed = response.bytes_freed(); -} - -void RpcStore::optimiseStore() { - ClientContext ctx; - google::protobuf::Empty response; - SuccessOrThrow(stub_->OptimiseStore(&ctx, kEmpty, &response), __FUNCTION__); -} - -bool RpcStore::verifyStore(bool checkContents, RepairFlag repair) { - ClientContext ctx; - proto::VerifyStoreRequest request; - request.set_check_contents(checkContents); - request.set_repair(repair); - proto::VerifyStoreResponse response; - SuccessOrThrow(stub_->VerifyStore(&ctx, request, &response), __FUNCTION__); - return response.errors(); -} - -void RpcStore::addSignatures(const Path& storePath, const StringSet& sigs) { - ClientContext ctx; - proto::AddSignaturesRequest request; - request.mutable_path()->set_path(storePath); - for (const auto& sig : sigs) { - request.mutable_sigs()->add_sigs(sig); - } - google::protobuf::Empty response; - SuccessOrThrow(stub_->AddSignatures(&ctx, request, &response), __FUNCTION__); -} - -void RpcStore::queryMissing(const PathSet& targets, PathSet& willBuild, - PathSet& willSubstitute, PathSet& unknown, - unsigned long long& downloadSize, - unsigned long long& narSize) { - ClientContext ctx; - proto::QueryMissingResponse response; - SuccessOrThrow( - stub_->QueryMissing(&ctx, util::proto::StorePaths(targets), &response), - __FUNCTION__); - - willBuild = util::proto::FillFrom<PathSet>(response.will_build()); - willSubstitute = util::proto::FillFrom<PathSet>(response.will_substitute()); - unknown = util::proto::FillFrom<PathSet>(response.unknown()); - downloadSize = response.download_size(); - narSize = response.nar_size(); -} - -std::shared_ptr<std::string> RpcStore::getBuildLog(const Path& path) { - ClientContext ctx; - proto::BuildLog response; - SuccessOrThrow( - stub_->GetBuildLog(&ctx, util::proto::StorePath(path), &response), - __FUNCTION__); - - auto build_log = response.build_log(); - if (build_log.empty()) { - return nullptr; - } - return std::make_shared<std::string>(build_log); -} - -unsigned int RpcStore::getProtocol() { return PROTOCOL_VERSION; } - -} // namespace store - -constexpr std::string_view kUriScheme = "unix://"; - -// TODO(grfn): Make this a function that we call from main rather than... this -static RegisterStoreImplementation regStore([](const std::string& uri, - const Store::Params& params) - -> std::shared_ptr<Store> { - if (std::string(uri, 0, kUriScheme.size()) != kUriScheme) { - return nullptr; - } - auto channel = grpc::CreateChannel(uri, grpc::InsecureChannelCredentials()); - return std::make_shared<store::RpcStore>( - uri, params, proto::WorkerService::NewStub(channel)); -}); - -} // namespace nix |