diff options
author | Griffin Smith <grfn@gws.fyi> | 2020-07-25T17·49-0400 |
---|---|---|
committer | glittershark <grfn@gws.fyi> | 2020-07-25T20·18+0000 |
commit | b10970a66f39e91f36975a2e83166c0192d9476e (patch) | |
tree | 4b3d9e3c08791fab0e19f6f5a2c6a0003d345bc9 /third_party/nix | |
parent | dcaba9de64354fa699ee6b292efbedfb984582db (diff) |
feat(3p/nix): Start implementing RPC store client r/1473
Add a stub class for wrapping a gRPC client to the new, proto-backed nix store protocol, along with several methods implemented but several left throwing a not implemented exception. Paired-With: Vincent Ambo <mail@tazj.in> Paired-With: Perry Lorier <isomer@tvl.fyi> Change-Id: Id943d4f6d75084b8498786d580e6c9f7c92c104d Reviewed-on: https://cl.tvl.fyi/c/depot/+/1436 Tested-by: BuildkiteCI Reviewed-by: kanepyork <rikingcoding@gmail.com>
Diffstat (limited to 'third_party/nix')
-rw-r--r-- | third_party/nix/src/libstore/CMakeLists.txt | 2 | ||||
-rw-r--r-- | third_party/nix/src/libstore/rpc-store.cc | 282 | ||||
-rw-r--r-- | third_party/nix/src/libstore/rpc-store.hh | 142 | ||||
-rw-r--r-- | third_party/nix/src/libstore/store-api.cc | 39 | ||||
-rw-r--r-- | third_party/nix/src/proto/worker.proto | 1 |
5 files changed, 450 insertions, 16 deletions
diff --git a/third_party/nix/src/libstore/CMakeLists.txt b/third_party/nix/src/libstore/CMakeLists.txt index 83dd387e2af0..4bfbb165f6b2 100644 --- a/third_party/nix/src/libstore/CMakeLists.txt +++ b/third_party/nix/src/libstore/CMakeLists.txt @@ -38,6 +38,7 @@ set(HEADER_FILES references.hh remote-fs-accessor.hh remote-store.hh + rpc-store.hh s3-binary-cache-store.hh s3.hh serve-protocol.hh @@ -78,6 +79,7 @@ target_sources(nixstore references.cc remote-fs-accessor.cc remote-store.cc + rpc-store.cc s3-binary-cache-store.cc sqlite.cc ssh.cc diff --git a/third_party/nix/src/libstore/rpc-store.cc b/third_party/nix/src/libstore/rpc-store.cc new file mode 100644 index 000000000000..6d31f1aa81d3 --- /dev/null +++ b/third_party/nix/src/libstore/rpc-store.cc @@ -0,0 +1,282 @@ +#include "rpc-store.hh" + +#include <algorithm> +#include <memory> + +#include <absl/strings/str_cat.h> +#include <absl/strings/str_format.h> +#include <google/protobuf/empty.pb.h> +#include <google/protobuf/util/time_util.h> +#include <grpcpp/create_channel.h> +#include <grpcpp/impl/codegen/async_unary_call.h> +#include <grpcpp/impl/codegen/client_context.h> +#include <grpcpp/impl/codegen/completion_queue.h> +#include <grpcpp/impl/codegen/status.h> +#include <grpcpp/impl/codegen/sync_stream.h> +#include <grpcpp/security/credentials.h> + +#include "libproto/worker.grpc.pb.h" +#include "libproto/worker.pb.h" +#include "libstore/store-api.hh" +#include "libutil/hash.hh" +#include "libutil/types.hh" + +namespace nix { + +namespace store { + +using google::protobuf::util::TimeUtil; +using grpc::ClientContext; +using nix::proto::WorkerService; + +static google::protobuf::Empty kEmpty; +static ClientContext ctx; + +proto::StorePath StorePath(const Path& path) { + proto::StorePath store_path; + store_path.set_path(path); + return store_path; +} + +template <typename T, typename U> +T FillFrom(const U& src) { + T result; + result.insert(src.begin(), src.end()); + return result; +} + +// TODO(grfn): Obviously this should go away and be replaced by StatusOr... but +// that would require refactoring the entire store api, which we don't feel like +// doing right now. We should at some point though +void SuccessOrThrow(const grpc::Status& status) { + if (!status.ok()) { + throw Error(absl::StrFormat("Rpc call failed (%d): %s ", + status.error_code(), status.error_message())); + } +} + +bool RpcStore::isValidPathUncached(const Path& path) { + proto::IsValidPathResponse resp; + SuccessOrThrow(stub_->IsValidPath(&ctx, StorePath(path), &resp)); + return resp.is_valid(); +} + +PathSet RpcStore::queryAllValidPaths() { + proto::StorePaths paths; + SuccessOrThrow(stub_->QueryAllValidPaths(&ctx, kEmpty, &paths)); + return FillFrom<PathSet>(paths.paths()); +} + +PathSet RpcStore::queryValidPaths(const PathSet& paths, + SubstituteFlag maybeSubstitute) { + proto::StorePaths store_paths; + for (const auto& path : paths) { + store_paths.add_paths(path); + } + proto::StorePaths result_paths; + SuccessOrThrow(stub_->QueryValidPaths(&ctx, store_paths, &result_paths)); + return FillFrom<PathSet>(result_paths.paths()); +} + +void RpcStore::queryPathInfoUncached( + const Path& path, + Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept { + proto::StorePath store_path; + store_path.set_path(path); + + try { + proto::PathInfo path_info; + SuccessOrThrow(stub_->QueryPathInfo(&ctx, store_path, &path_info)); + + std::shared_ptr<ValidPathInfo> info; + + if (!path_info.is_valid()) { + throw InvalidPath(absl::StrFormat("path '%s' is not valid", path)); + } + + info = std::make_shared<ValidPathInfo>(); + info->path = path; + info->deriver = path_info.deriver().path(); + if (!info->deriver.empty()) { + assertStorePath(info->deriver); + } + info->narHash = Hash(path_info.nar_hash(), htSHA256); + info->references.insert(path_info.references().begin(), + path_info.references().end()); + info->registrationTime = + TimeUtil::TimestampToTimeT(path_info.registration_time()); + info->narSize = path_info.nar_size(); + info->ultimate = path_info.ultimate(); + info->sigs.insert(path_info.sigs().begin(), path_info.sigs().end()); + info->ca = path_info.ca(); + + callback(std::move(info)); + } catch (...) { + callback.rethrow(); + } +} + +void RpcStore::queryReferrers(const Path& path, PathSet& referrers) { + proto::StorePaths paths; + SuccessOrThrow(stub_->QueryReferrers(&ctx, StorePath(path), &paths)); + referrers.insert(paths.paths().begin(), paths.paths().end()); +} + +PathSet RpcStore::queryValidDerivers(const Path& path) { + proto::StorePaths paths; + SuccessOrThrow(stub_->QueryValidDerivers(&ctx, StorePath(path), &paths)); + return FillFrom<PathSet>(paths.paths()); +} + +PathSet RpcStore::queryDerivationOutputs(const Path& path) { + proto::StorePaths paths; + SuccessOrThrow(stub_->QueryDerivationOutputs(&ctx, StorePath(path), &paths)); + return FillFrom<PathSet>(paths.paths()); +} + +StringSet RpcStore::queryDerivationOutputNames(const Path& path) { + proto::DerivationOutputNames output_names; + SuccessOrThrow( + stub_->QueryDerivationOutputNames(&ctx, StorePath(path), &output_names)); + return FillFrom<StringSet>(output_names.names()); +} + +Path RpcStore::queryPathFromHashPart(const std::string& hashPart) { + throw absl::StrCat("Not implemented ", __func__); +} + +PathSet RpcStore::querySubstitutablePaths(const PathSet& paths) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::querySubstitutablePathInfos(const PathSet& paths, + SubstitutablePathInfos& infos) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource, + RepairFlag repair, CheckSigsFlag checkSigs, + std::shared_ptr<FSAccessor> accessor) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::addToStore(const ValidPathInfo& info, + const ref<std::string>& nar, RepairFlag repair, + CheckSigsFlag checkSigs, + std::shared_ptr<FSAccessor> accessor) { + throw absl::StrCat("Not implemented ", __func__); +} + +Path RpcStore::addToStore(const std::string& name, const Path& srcPath, + bool recursive, HashType hashAlgo, PathFilter& filter, + RepairFlag repair) { + throw absl::StrCat("Not implemented ", __func__); +} + +Path RpcStore::addTextToStore(const std::string& name, const std::string& s, + const PathSet& references, RepairFlag repair) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::narFromPath(const Path& path, Sink& sink) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::buildPaths(const PathSet& paths, BuildMode buildMode) { + throw absl::StrCat("Not implemented ", __func__); +} + +BuildResult RpcStore::buildDerivation(const Path& drvPath, + const BasicDerivation& drv, + BuildMode buildMode) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::ensurePath(const Path& path) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::addTempRoot(const Path& path) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::addIndirectRoot(const Path& path) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::syncWithGC() { + throw absl::StrCat("Not implemented ", __func__); +} + +Roots RpcStore::findRoots(bool censor) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::collectGarbage(const GCOptions& options, GCResults& results) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::optimiseStore() { + throw absl::StrCat("Not implemented ", __func__); +} + +bool RpcStore::verifyStore(bool checkContents, RepairFlag repair) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::addSignatures(const Path& storePath, const StringSet& sigs) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::computeFSClosure(const PathSet& paths, PathSet& paths_, + bool flipDirection, bool includeOutputs, + bool includeDerivers) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::queryMissing(const PathSet& targets, PathSet& willBuild, + PathSet& willSubstitute, PathSet& unknown, + unsigned long long& downloadSize, + unsigned long long& narSize) { + throw absl::StrCat("Not implemented ", __func__); +} + +std::shared_ptr<std::string> RpcStore::getBuildLog(const Path& path) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::connect() { throw absl::StrCat("Not implemented ", __func__); } + +unsigned int RpcStore::getProtocol() { + throw absl::StrCat("Not implemented ", __func__); +} + +int RpcStore::getPriority() { + throw absl::StrCat("Not implemented ", __func__); +} + +Path RpcStore::toRealPath(const Path& storePath) { + throw absl::StrCat("Not implemented ", __func__); +} + +void RpcStore::createUser(const std::string& userName, uid_t userId) { + throw absl::StrCat("Not implemented ", __func__); +} + +} // namespace store + +static std::string uriScheme = "unix://"; + +// TODO(grfn): Make this a function that we call from main rather than... this +static RegisterStoreImplementation regStore([](const std::string& uri, + const Store::Params& params) + -> std::shared_ptr<Store> { + if (std::string(uri, 0, uriScheme.size()) != uriScheme) { + return nullptr; + } + auto channel = grpc::CreateChannel(uri, grpc::InsecureChannelCredentials()); + return std::make_shared<store::RpcStore>( + uri, params, proto::WorkerService::NewStub(channel)); +}); + +} // namespace nix diff --git a/third_party/nix/src/libstore/rpc-store.hh b/third_party/nix/src/libstore/rpc-store.hh new file mode 100644 index 000000000000..332d1f4ba12b --- /dev/null +++ b/third_party/nix/src/libstore/rpc-store.hh @@ -0,0 +1,142 @@ +#pragma once + +#include "libproto/worker.grpc.pb.h" +#include "libproto/worker.pb.h" +#include "libstore/remote-store.hh" +#include "libstore/store-api.hh" + +namespace nix::store { + +// TODO(grfn): Currently, since the RPCStore is only used for the connection to +// the nix daemon over a unix socket, it inherits from the LocalFSStore since it +// shares a filesystem with the daemon. This will not always be the case, at +// which point we should tease these two things apart. +class RpcStore : public LocalFSStore, public virtual Store { + public: + RpcStore(const Params& params, + std::unique_ptr<nix::proto::WorkerService::Stub> stub) + : Store(params), LocalFSStore(params), stub_(std::move(stub)) {} + + RpcStore(std::string uri, const Params& params, + std::unique_ptr<nix::proto::WorkerService::Stub> stub) + : Store(params), + LocalFSStore(params), + uri_(uri), + stub_(std::move(stub)) {} + + std::string getUri() override { + if (uri_.has_value()) { + return uri_.value(); + } else { + return "daemon"; + } + }; + + virtual PathSet queryAllValidPaths() override; + + virtual void queryReferrers(const Path& path, PathSet& referrers) override; + + virtual PathSet queryValidDerivers(const Path& path) override; + + virtual PathSet queryDerivationOutputs(const Path& path) override; + + virtual StringSet queryDerivationOutputNames(const Path& path) override; + + virtual Path queryPathFromHashPart(const std::string& hashPart) override; + + virtual PathSet querySubstitutablePaths(const PathSet& paths) override; + + virtual void querySubstitutablePathInfos( + const PathSet& paths, SubstitutablePathInfos& infos) override; + + virtual bool wantMassQuery() override { return true; } + + virtual void addToStore(const ValidPathInfo& info, Source& narSource, + RepairFlag repair = NoRepair, + CheckSigsFlag checkSigs = CheckSigs, + std::shared_ptr<FSAccessor> accessor = 0) override; + + virtual void addToStore(const ValidPathInfo& info, + const ref<std::string>& nar, + RepairFlag repair = NoRepair, + CheckSigsFlag checkSigs = CheckSigs, + std::shared_ptr<FSAccessor> accessor = 0) override; + + virtual Path addToStore(const std::string& name, const Path& srcPath, + bool recursive = true, HashType hashAlgo = htSHA256, + PathFilter& filter = defaultPathFilter, + RepairFlag repair = NoRepair) override; + + virtual Path addTextToStore(const std::string& name, const std::string& s, + const PathSet& references, + RepairFlag repair = NoRepair) override; + + virtual void narFromPath(const Path& path, Sink& sink) override; + + virtual void buildPaths(const PathSet& paths, + BuildMode buildMode = bmNormal) override; + + virtual BuildResult buildDerivation(const Path& drvPath, + const BasicDerivation& drv, + BuildMode buildMode = bmNormal) override; + + virtual void ensurePath(const Path& path) override; + + virtual void addTempRoot(const Path& path) override; + + virtual void addIndirectRoot(const Path& path) override; + + virtual void syncWithGC() override; + + virtual Roots findRoots(bool censor) override; + + virtual void collectGarbage(const GCOptions& options, + GCResults& results) override; + + virtual void optimiseStore() override; + + virtual bool verifyStore(bool checkContents, + RepairFlag repair = NoRepair) override; + + virtual void addSignatures(const Path& storePath, + const StringSet& sigs) override; + + virtual void computeFSClosure(const PathSet& paths, PathSet& paths_, + bool flipDirection = false, + bool includeOutputs = false, + bool includeDerivers = false) override; + + virtual void queryMissing(const PathSet& targets, PathSet& willBuild, + PathSet& willSubstitute, PathSet& unknown, + unsigned long long& downloadSize, + unsigned long long& narSize) override; + + virtual std::shared_ptr<std::string> getBuildLog(const Path& path) override; + + virtual void connect() override; + + virtual unsigned int getProtocol() override; + + virtual int getPriority() override; + + virtual Path toRealPath(const Path& storePath) override; + + virtual void createUser(const std::string& userName, uid_t userId) override; + + protected: + virtual bool isValidPathUncached(const Path& path) override; + + virtual PathSet queryValidPaths( + const PathSet& paths, + SubstituteFlag maybeSubstitute = NoSubstitute) override; + + virtual void queryPathInfoUncached( + const Path& path, + Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept override; + + private: + std::optional<std::string> uri_; + std::unique_ptr<nix::proto::WorkerService::Stub> stub_; +}; + +} // namespace nix::store diff --git a/third_party/nix/src/libstore/store-api.cc b/third_party/nix/src/libstore/store-api.cc index 47f85e6e2610..12c60dffca35 100644 --- a/third_party/nix/src/libstore/store-api.cc +++ b/third_party/nix/src/libstore/store-api.cc @@ -7,11 +7,13 @@ #include <absl/strings/numbers.h> #include <absl/strings/str_split.h> #include <glog/logging.h> +#include <grpcpp/create_channel.h> #include "libstore/crypto.hh" #include "libstore/derivations.hh" #include "libstore/globals.hh" #include "libstore/nar-info-disk-cache.hh" +#include "libstore/rpc-store.hh" #include "libutil/json.hh" #include "libutil/thread-pool.hh" #include "libutil/util.hh" @@ -978,23 +980,28 @@ StoreType getStoreType(const std::string& uri, const std::string& stateDir) { } } -static RegisterStoreImplementation regStore([](const std::string& uri, - const Store::Params& params) - -> std::shared_ptr<Store> { - switch (getStoreType(uri, get(params, "state", settings.nixStateDir))) { - case tDaemon: - return std::shared_ptr<Store>(std::make_shared<UDSRemoteStore>(params)); - case tLocal: { - Store::Params params2 = params; - if (absl::StartsWith(uri, "/")) { - params2["root"] = uri; +static RegisterStoreImplementation regStore( + [](const std::string& uri, + const Store::Params& params) -> std::shared_ptr<Store> { + switch (getStoreType(uri, get(params, "state", settings.nixStateDir))) { + case tDaemon: { + auto daemon_socket_uri = settings.nixDaemonSocketFile; + auto channel = grpc::CreateChannel( + daemon_socket_uri, grpc::InsecureChannelCredentials()); + return std::shared_ptr<Store>(std::make_shared<nix::store::RpcStore>( + params, proto::WorkerService::NewStub(channel))); + } + case tLocal: { + Store::Params params2 = params; + if (absl::StartsWith(uri, "/")) { + params2["root"] = uri; + } + return std::shared_ptr<Store>(std::make_shared<LocalStore>(params2)); + } + default: + return nullptr; } - return std::shared_ptr<Store>(std::make_shared<LocalStore>(params2)); - } - default: - return nullptr; - } -}); + }); std::list<ref<Store>> getDefaultSubstituters() { static auto stores([]() { diff --git a/third_party/nix/src/proto/worker.proto b/third_party/nix/src/proto/worker.proto index d8d2cc8bbd2e..3ddaa575e7f4 100644 --- a/third_party/nix/src/proto/worker.proto +++ b/third_party/nix/src/proto/worker.proto @@ -251,6 +251,7 @@ message CollectGarbageResponse { } message PathInfo { + bool is_valid = 9; StorePath deriver = 1; bytes nar_hash = 2; repeated string references = 3; |