about summary refs log tree commit diff
path: root/third_party
diff options
context:
space:
mode:
authorGriffin Smith <grfn@gws.fyi>2020-07-25T17·49-0400
committerglittershark <grfn@gws.fyi>2020-07-25T20·18+0000
commitb10970a66f39e91f36975a2e83166c0192d9476e (patch)
tree4b3d9e3c08791fab0e19f6f5a2c6a0003d345bc9 /third_party
parentdcaba9de64354fa699ee6b292efbedfb984582db (diff)
feat(3p/nix): Start implementing RPC store client r/1473
Add a stub class for wrapping a gRPC client to the new, proto-backed nix
store protocol, along with several methods implemented but several left
throwing a not implemented exception.

Paired-With: Vincent Ambo <mail@tazj.in>
Paired-With: Perry Lorier <isomer@tvl.fyi>
Change-Id: Id943d4f6d75084b8498786d580e6c9f7c92c104d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/1436
Tested-by: BuildkiteCI
Reviewed-by: kanepyork <rikingcoding@gmail.com>
Diffstat (limited to 'third_party')
-rw-r--r--third_party/nix/src/libstore/CMakeLists.txt2
-rw-r--r--third_party/nix/src/libstore/rpc-store.cc282
-rw-r--r--third_party/nix/src/libstore/rpc-store.hh142
-rw-r--r--third_party/nix/src/libstore/store-api.cc39
-rw-r--r--third_party/nix/src/proto/worker.proto1
5 files changed, 450 insertions, 16 deletions
diff --git a/third_party/nix/src/libstore/CMakeLists.txt b/third_party/nix/src/libstore/CMakeLists.txt
index 83dd387e2a..4bfbb165f6 100644
--- a/third_party/nix/src/libstore/CMakeLists.txt
+++ b/third_party/nix/src/libstore/CMakeLists.txt
@@ -38,6 +38,7 @@ set(HEADER_FILES
     references.hh
     remote-fs-accessor.hh
     remote-store.hh
+    rpc-store.hh
     s3-binary-cache-store.hh
     s3.hh
     serve-protocol.hh
@@ -78,6 +79,7 @@ target_sources(nixstore
     references.cc
     remote-fs-accessor.cc
     remote-store.cc
+    rpc-store.cc
     s3-binary-cache-store.cc
     sqlite.cc
     ssh.cc
diff --git a/third_party/nix/src/libstore/rpc-store.cc b/third_party/nix/src/libstore/rpc-store.cc
new file mode 100644
index 0000000000..6d31f1aa81
--- /dev/null
+++ b/third_party/nix/src/libstore/rpc-store.cc
@@ -0,0 +1,282 @@
+#include "rpc-store.hh"
+
+#include <algorithm>
+#include <memory>
+
+#include <absl/strings/str_cat.h>
+#include <absl/strings/str_format.h>
+#include <google/protobuf/empty.pb.h>
+#include <google/protobuf/util/time_util.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/client_context.h>
+#include <grpcpp/impl/codegen/completion_queue.h>
+#include <grpcpp/impl/codegen/status.h>
+#include <grpcpp/impl/codegen/sync_stream.h>
+#include <grpcpp/security/credentials.h>
+
+#include "libproto/worker.grpc.pb.h"
+#include "libproto/worker.pb.h"
+#include "libstore/store-api.hh"
+#include "libutil/hash.hh"
+#include "libutil/types.hh"
+
+namespace nix {
+
+namespace store {
+
+using google::protobuf::util::TimeUtil;
+using grpc::ClientContext;
+using nix::proto::WorkerService;
+
+static google::protobuf::Empty kEmpty;
+static ClientContext ctx;
+
+proto::StorePath StorePath(const Path& path) {
+  proto::StorePath store_path;
+  store_path.set_path(path);
+  return store_path;
+}
+
+template <typename T, typename U>
+T FillFrom(const U& src) {
+  T result;
+  result.insert(src.begin(), src.end());
+  return result;
+}
+
+// TODO(grfn): Obviously this should go away and be replaced by StatusOr... but
+// that would require refactoring the entire store api, which we don't feel like
+// doing right now. We should at some point though
+void SuccessOrThrow(const grpc::Status& status) {
+  if (!status.ok()) {
+    throw Error(absl::StrFormat("Rpc call failed (%d): %s ",
+                                status.error_code(), status.error_message()));
+  }
+}
+
+bool RpcStore::isValidPathUncached(const Path& path) {
+  proto::IsValidPathResponse resp;
+  SuccessOrThrow(stub_->IsValidPath(&ctx, StorePath(path), &resp));
+  return resp.is_valid();
+}
+
+PathSet RpcStore::queryAllValidPaths() {
+  proto::StorePaths paths;
+  SuccessOrThrow(stub_->QueryAllValidPaths(&ctx, kEmpty, &paths));
+  return FillFrom<PathSet>(paths.paths());
+}
+
+PathSet RpcStore::queryValidPaths(const PathSet& paths,
+                                  SubstituteFlag maybeSubstitute) {
+  proto::StorePaths store_paths;
+  for (const auto& path : paths) {
+    store_paths.add_paths(path);
+  }
+  proto::StorePaths result_paths;
+  SuccessOrThrow(stub_->QueryValidPaths(&ctx, store_paths, &result_paths));
+  return FillFrom<PathSet>(result_paths.paths());
+}
+
+void RpcStore::queryPathInfoUncached(
+    const Path& path,
+    Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept {
+  proto::StorePath store_path;
+  store_path.set_path(path);
+
+  try {
+    proto::PathInfo path_info;
+    SuccessOrThrow(stub_->QueryPathInfo(&ctx, store_path, &path_info));
+
+    std::shared_ptr<ValidPathInfo> info;
+
+    if (!path_info.is_valid()) {
+      throw InvalidPath(absl::StrFormat("path '%s' is not valid", path));
+    }
+
+    info = std::make_shared<ValidPathInfo>();
+    info->path = path;
+    info->deriver = path_info.deriver().path();
+    if (!info->deriver.empty()) {
+      assertStorePath(info->deriver);
+    }
+    info->narHash = Hash(path_info.nar_hash(), htSHA256);
+    info->references.insert(path_info.references().begin(),
+                            path_info.references().end());
+    info->registrationTime =
+        TimeUtil::TimestampToTimeT(path_info.registration_time());
+    info->narSize = path_info.nar_size();
+    info->ultimate = path_info.ultimate();
+    info->sigs.insert(path_info.sigs().begin(), path_info.sigs().end());
+    info->ca = path_info.ca();
+
+    callback(std::move(info));
+  } catch (...) {
+    callback.rethrow();
+  }
+}
+
+void RpcStore::queryReferrers(const Path& path, PathSet& referrers) {
+  proto::StorePaths paths;
+  SuccessOrThrow(stub_->QueryReferrers(&ctx, StorePath(path), &paths));
+  referrers.insert(paths.paths().begin(), paths.paths().end());
+}
+
+PathSet RpcStore::queryValidDerivers(const Path& path) {
+  proto::StorePaths paths;
+  SuccessOrThrow(stub_->QueryValidDerivers(&ctx, StorePath(path), &paths));
+  return FillFrom<PathSet>(paths.paths());
+}
+
+PathSet RpcStore::queryDerivationOutputs(const Path& path) {
+  proto::StorePaths paths;
+  SuccessOrThrow(stub_->QueryDerivationOutputs(&ctx, StorePath(path), &paths));
+  return FillFrom<PathSet>(paths.paths());
+}
+
+StringSet RpcStore::queryDerivationOutputNames(const Path& path) {
+  proto::DerivationOutputNames output_names;
+  SuccessOrThrow(
+      stub_->QueryDerivationOutputNames(&ctx, StorePath(path), &output_names));
+  return FillFrom<StringSet>(output_names.names());
+}
+
+Path RpcStore::queryPathFromHashPart(const std::string& hashPart) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+PathSet RpcStore::querySubstitutablePaths(const PathSet& paths) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::querySubstitutablePathInfos(const PathSet& paths,
+                                           SubstitutablePathInfos& infos) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource,
+                          RepairFlag repair, CheckSigsFlag checkSigs,
+                          std::shared_ptr<FSAccessor> accessor) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::addToStore(const ValidPathInfo& info,
+                          const ref<std::string>& nar, RepairFlag repair,
+                          CheckSigsFlag checkSigs,
+                          std::shared_ptr<FSAccessor> accessor) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
+                          bool recursive, HashType hashAlgo, PathFilter& filter,
+                          RepairFlag repair) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+Path RpcStore::addTextToStore(const std::string& name, const std::string& s,
+                              const PathSet& references, RepairFlag repair) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::narFromPath(const Path& path, Sink& sink) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::buildPaths(const PathSet& paths, BuildMode buildMode) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+BuildResult RpcStore::buildDerivation(const Path& drvPath,
+                                      const BasicDerivation& drv,
+                                      BuildMode buildMode) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::ensurePath(const Path& path) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::addTempRoot(const Path& path) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::addIndirectRoot(const Path& path) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::syncWithGC() {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+Roots RpcStore::findRoots(bool censor) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::collectGarbage(const GCOptions& options, GCResults& results) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::optimiseStore() {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+bool RpcStore::verifyStore(bool checkContents, RepairFlag repair) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::addSignatures(const Path& storePath, const StringSet& sigs) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::computeFSClosure(const PathSet& paths, PathSet& paths_,
+                                bool flipDirection, bool includeOutputs,
+                                bool includeDerivers) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::queryMissing(const PathSet& targets, PathSet& willBuild,
+                            PathSet& willSubstitute, PathSet& unknown,
+                            unsigned long long& downloadSize,
+                            unsigned long long& narSize) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+std::shared_ptr<std::string> RpcStore::getBuildLog(const Path& path) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::connect() { throw absl::StrCat("Not implemented ", __func__); }
+
+unsigned int RpcStore::getProtocol() {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+int RpcStore::getPriority() {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+Path RpcStore::toRealPath(const Path& storePath) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+void RpcStore::createUser(const std::string& userName, uid_t userId) {
+  throw absl::StrCat("Not implemented ", __func__);
+}
+
+}  // namespace store
+
+static std::string uriScheme = "unix://";
+
+// TODO(grfn): Make this a function that we call from main rather than... this
+static RegisterStoreImplementation regStore([](const std::string& uri,
+                                               const Store::Params& params)
+                                                -> std::shared_ptr<Store> {
+  if (std::string(uri, 0, uriScheme.size()) != uriScheme) {
+    return nullptr;
+  }
+  auto channel = grpc::CreateChannel(uri, grpc::InsecureChannelCredentials());
+  return std::make_shared<store::RpcStore>(
+      uri, params, proto::WorkerService::NewStub(channel));
+});
+
+}  // namespace nix
diff --git a/third_party/nix/src/libstore/rpc-store.hh b/third_party/nix/src/libstore/rpc-store.hh
new file mode 100644
index 0000000000..332d1f4ba1
--- /dev/null
+++ b/third_party/nix/src/libstore/rpc-store.hh
@@ -0,0 +1,142 @@
+#pragma once
+
+#include "libproto/worker.grpc.pb.h"
+#include "libproto/worker.pb.h"
+#include "libstore/remote-store.hh"
+#include "libstore/store-api.hh"
+
+namespace nix::store {
+
+// TODO(grfn): Currently, since the RPCStore is only used for the connection to
+// the nix daemon over a unix socket, it inherits from the LocalFSStore since it
+// shares a filesystem with the daemon. This will not always be the case, at
+// which point we should tease these two things apart.
+class RpcStore : public LocalFSStore, public virtual Store {
+ public:
+  RpcStore(const Params& params,
+           std::unique_ptr<nix::proto::WorkerService::Stub> stub)
+      : Store(params), LocalFSStore(params), stub_(std::move(stub)) {}
+
+  RpcStore(std::string uri, const Params& params,
+           std::unique_ptr<nix::proto::WorkerService::Stub> stub)
+      : Store(params),
+        LocalFSStore(params),
+        uri_(uri),
+        stub_(std::move(stub)) {}
+
+  std::string getUri() override {
+    if (uri_.has_value()) {
+      return uri_.value();
+    } else {
+      return "daemon";
+    }
+  };
+
+  virtual PathSet queryAllValidPaths() override;
+
+  virtual void queryReferrers(const Path& path, PathSet& referrers) override;
+
+  virtual PathSet queryValidDerivers(const Path& path) override;
+
+  virtual PathSet queryDerivationOutputs(const Path& path) override;
+
+  virtual StringSet queryDerivationOutputNames(const Path& path) override;
+
+  virtual Path queryPathFromHashPart(const std::string& hashPart) override;
+
+  virtual PathSet querySubstitutablePaths(const PathSet& paths) override;
+
+  virtual void querySubstitutablePathInfos(
+      const PathSet& paths, SubstitutablePathInfos& infos) override;
+
+  virtual bool wantMassQuery() override { return true; }
+
+  virtual void addToStore(const ValidPathInfo& info, Source& narSource,
+                          RepairFlag repair = NoRepair,
+                          CheckSigsFlag checkSigs = CheckSigs,
+                          std::shared_ptr<FSAccessor> accessor = 0) override;
+
+  virtual void addToStore(const ValidPathInfo& info,
+                          const ref<std::string>& nar,
+                          RepairFlag repair = NoRepair,
+                          CheckSigsFlag checkSigs = CheckSigs,
+                          std::shared_ptr<FSAccessor> accessor = 0) override;
+
+  virtual Path addToStore(const std::string& name, const Path& srcPath,
+                          bool recursive = true, HashType hashAlgo = htSHA256,
+                          PathFilter& filter = defaultPathFilter,
+                          RepairFlag repair = NoRepair) override;
+
+  virtual Path addTextToStore(const std::string& name, const std::string& s,
+                              const PathSet& references,
+                              RepairFlag repair = NoRepair) override;
+
+  virtual void narFromPath(const Path& path, Sink& sink) override;
+
+  virtual void buildPaths(const PathSet& paths,
+                          BuildMode buildMode = bmNormal) override;
+
+  virtual BuildResult buildDerivation(const Path& drvPath,
+                                      const BasicDerivation& drv,
+                                      BuildMode buildMode = bmNormal) override;
+
+  virtual void ensurePath(const Path& path) override;
+
+  virtual void addTempRoot(const Path& path) override;
+
+  virtual void addIndirectRoot(const Path& path) override;
+
+  virtual void syncWithGC() override;
+
+  virtual Roots findRoots(bool censor) override;
+
+  virtual void collectGarbage(const GCOptions& options,
+                              GCResults& results) override;
+
+  virtual void optimiseStore() override;
+
+  virtual bool verifyStore(bool checkContents,
+                           RepairFlag repair = NoRepair) override;
+
+  virtual void addSignatures(const Path& storePath,
+                             const StringSet& sigs) override;
+
+  virtual void computeFSClosure(const PathSet& paths, PathSet& paths_,
+                                bool flipDirection = false,
+                                bool includeOutputs = false,
+                                bool includeDerivers = false) override;
+
+  virtual void queryMissing(const PathSet& targets, PathSet& willBuild,
+                            PathSet& willSubstitute, PathSet& unknown,
+                            unsigned long long& downloadSize,
+                            unsigned long long& narSize) override;
+
+  virtual std::shared_ptr<std::string> getBuildLog(const Path& path) override;
+
+  virtual void connect() override;
+
+  virtual unsigned int getProtocol() override;
+
+  virtual int getPriority() override;
+
+  virtual Path toRealPath(const Path& storePath) override;
+
+  virtual void createUser(const std::string& userName, uid_t userId) override;
+
+ protected:
+  virtual bool isValidPathUncached(const Path& path) override;
+
+  virtual PathSet queryValidPaths(
+      const PathSet& paths,
+      SubstituteFlag maybeSubstitute = NoSubstitute) override;
+
+  virtual void queryPathInfoUncached(
+      const Path& path,
+      Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept override;
+
+ private:
+  std::optional<std::string> uri_;
+  std::unique_ptr<nix::proto::WorkerService::Stub> stub_;
+};
+
+}  // namespace nix::store
diff --git a/third_party/nix/src/libstore/store-api.cc b/third_party/nix/src/libstore/store-api.cc
index 47f85e6e26..12c60dffca 100644
--- a/third_party/nix/src/libstore/store-api.cc
+++ b/third_party/nix/src/libstore/store-api.cc
@@ -7,11 +7,13 @@
 #include <absl/strings/numbers.h>
 #include <absl/strings/str_split.h>
 #include <glog/logging.h>
+#include <grpcpp/create_channel.h>
 
 #include "libstore/crypto.hh"
 #include "libstore/derivations.hh"
 #include "libstore/globals.hh"
 #include "libstore/nar-info-disk-cache.hh"
+#include "libstore/rpc-store.hh"
 #include "libutil/json.hh"
 #include "libutil/thread-pool.hh"
 #include "libutil/util.hh"
@@ -978,23 +980,28 @@ StoreType getStoreType(const std::string& uri, const std::string& stateDir) {
   }
 }
 
-static RegisterStoreImplementation regStore([](const std::string& uri,
-                                               const Store::Params& params)
-                                                -> std::shared_ptr<Store> {
-  switch (getStoreType(uri, get(params, "state", settings.nixStateDir))) {
-    case tDaemon:
-      return std::shared_ptr<Store>(std::make_shared<UDSRemoteStore>(params));
-    case tLocal: {
-      Store::Params params2 = params;
-      if (absl::StartsWith(uri, "/")) {
-        params2["root"] = uri;
+static RegisterStoreImplementation regStore(
+    [](const std::string& uri,
+       const Store::Params& params) -> std::shared_ptr<Store> {
+      switch (getStoreType(uri, get(params, "state", settings.nixStateDir))) {
+        case tDaemon: {
+          auto daemon_socket_uri = settings.nixDaemonSocketFile;
+          auto channel = grpc::CreateChannel(
+              daemon_socket_uri, grpc::InsecureChannelCredentials());
+          return std::shared_ptr<Store>(std::make_shared<nix::store::RpcStore>(
+              params, proto::WorkerService::NewStub(channel)));
+        }
+        case tLocal: {
+          Store::Params params2 = params;
+          if (absl::StartsWith(uri, "/")) {
+            params2["root"] = uri;
+          }
+          return std::shared_ptr<Store>(std::make_shared<LocalStore>(params2));
+        }
+        default:
+          return nullptr;
       }
-      return std::shared_ptr<Store>(std::make_shared<LocalStore>(params2));
-    }
-    default:
-      return nullptr;
-  }
-});
+    });
 
 std::list<ref<Store>> getDefaultSubstituters() {
   static auto stores([]() {
diff --git a/third_party/nix/src/proto/worker.proto b/third_party/nix/src/proto/worker.proto
index d8d2cc8bbd..3ddaa575e7 100644
--- a/third_party/nix/src/proto/worker.proto
+++ b/third_party/nix/src/proto/worker.proto
@@ -251,6 +251,7 @@ message CollectGarbageResponse {
 }
 
 message PathInfo {
+  bool is_valid = 9;
   StorePath deriver = 1;
   bytes nar_hash = 2;
   repeated string references = 3;