about summary refs log tree commit diff
diff options
context:
space:
mode:
authorGriffin Smith <grfn@gws.fyi>2020-08-06T02·46-0400
committerglittershark <grfn@gws.fyi>2020-08-08T17·51+0000
commit7db734afad313b8669623138130797fad797daa7 (patch)
tree7acf1dd3f641e47513d6beafe68344909d3115c7
parentcc01059d405e982e3a7de78e318374c3dbace48b (diff)
feat(tvix): Implement AddToStoreNar r/1615
Implement both the client and server sides of AddToStoreNar, using a
templated generalization of the sources and sinks we were using for
AddToStore on both ends.

Change-Id: I73d0ed34118c711b125851dff99a7518ced4af35
Reviewed-on: https://cl.tvl.fyi/c/depot/+/1686
Tested-by: BuildkiteCI
Reviewed-by: kanepyork <rikingcoding@gmail.com>
-rw-r--r--third_party/nix/src/libstore/rpc-store.cc60
-rw-r--r--third_party/nix/src/libstore/rpc-store.hh6
-rw-r--r--third_party/nix/src/nix-daemon/nix-daemon-proto.cc86
-rw-r--r--third_party/nix/src/proto/worker.proto10
4 files changed, 129 insertions, 33 deletions
diff --git a/third_party/nix/src/libstore/rpc-store.cc b/third_party/nix/src/libstore/rpc-store.cc
index 751eb449006d..28fc1c9eec7a 100644
--- a/third_party/nix/src/libstore/rpc-store.cc
+++ b/third_party/nix/src/libstore/rpc-store.cc
@@ -56,29 +56,32 @@ T FillFrom(const U& src) {
   return result;
 }
 
-class AddToStorePathWriterSink : public BufferedSink {
+template <typename Request>
+class RPCSink : public BufferedSink {
  public:
-  explicit AddToStorePathWriterSink(
-      std::unique_ptr<
-          grpc_impl::ClientWriter<class nix::proto::AddToStoreRequest>>&&
-          writer)
+  using Writer = grpc::ClientWriter<Request>;
+  explicit RPCSink(std::unique_ptr<Writer>&& writer)
       : writer_(std::move(writer)), good_(true) {}
 
   bool good() override { return good_; }
 
   void write(const unsigned char* data, size_t len) override {
-    proto::AddToStoreRequest req;
+    Request req;
     req.set_data(data, len);
     if (!writer_->Write(req)) {
       good_ = false;
     }
   }
 
-  grpc::Status Finish() { return writer_->Finish(); }
+  ~RPCSink() override { flush(); }
+
+  grpc::Status Finish() {
+    flush();
+    return writer_->Finish();
+  }
 
  private:
-  std::unique_ptr<grpc_impl::ClientWriter<class nix::proto::AddToStoreRequest>>
-      writer_;
+  std::unique_ptr<Writer> writer_;
   bool good_;
 };
 
@@ -319,14 +322,33 @@ void RpcStore::querySubstitutablePathInfos(const PathSet& paths,
 void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource,
                           RepairFlag repair, CheckSigsFlag checkSigs,
                           std::shared_ptr<FSAccessor> accessor) {
-  throw Unsupported(absl::StrCat("Not implemented ", __func__));
-}
+  ClientContext ctx;
+  google::protobuf::Empty response;
+  auto writer = stub_->AddToStoreNar(&ctx, &response);
+
+  proto::AddToStoreNarRequest path_info_req;
+  path_info_req.mutable_path_info()->mutable_path()->set_path(info.path);
+  path_info_req.mutable_path_info()->mutable_deriver()->set_path(info.deriver);
+  path_info_req.mutable_path_info()->set_nar_hash(
+      info.narHash.to_string(Base16, false));
+  for (const auto& ref : info.references) {
+    path_info_req.mutable_path_info()->add_references(ref);
+  }
+  *path_info_req.mutable_path_info()->mutable_registration_time() =
+      TimeUtil::TimeTToTimestamp(info.registrationTime);
+  path_info_req.mutable_path_info()->set_nar_size(info.narSize);
+  path_info_req.mutable_path_info()->set_ultimate(info.ultimate);
+  for (const auto& sig : info.sigs) {
+    path_info_req.mutable_path_info()->add_sigs(sig);
+  }
+  path_info_req.mutable_path_info()->set_ca(info.ca);
+  path_info_req.mutable_path_info()->set_repair(repair);
+  path_info_req.mutable_path_info()->set_check_sigs(checkSigs);
 
-void RpcStore::addToStore(const ValidPathInfo& info,
-                          const ref<std::string>& nar, RepairFlag repair,
-                          CheckSigsFlag checkSigs,
-                          std::shared_ptr<FSAccessor> accessor) {
-  throw Unsupported(absl::StrCat("Not implemented ", __func__));
+  writer->Write(path_info_req);
+
+  RPCSink sink(std::move(writer));
+  copyNAR(narSource, sink);
 }
 
 Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
@@ -349,7 +371,7 @@ Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
   metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo));
   writer->Write(metadata_req);
 
-  AddToStorePathWriterSink sink(std::move(writer));
+  RPCSink sink(std::move(writer));
   dumpPath(std::filesystem::absolute(srcPath), sink);
   sink.flush();
   SuccessOrThrow(sink.Finish());
@@ -402,7 +424,9 @@ void RpcStore::ensurePath(const Path& path) {
 }
 
 void RpcStore::addTempRoot(const Path& path) {
-  throw Unsupported(absl::StrCat("Not implemented ", __func__));
+  ClientContext ctx;
+  google::protobuf::Empty response;
+  SuccessOrThrow(stub_->AddTempRoot(&ctx, StorePath(path), &response));
 }
 
 void RpcStore::addIndirectRoot(const Path& path) {
diff --git a/third_party/nix/src/libstore/rpc-store.hh b/third_party/nix/src/libstore/rpc-store.hh
index 0ef24e22b751..dd421caf44c4 100644
--- a/third_party/nix/src/libstore/rpc-store.hh
+++ b/third_party/nix/src/libstore/rpc-store.hh
@@ -56,12 +56,6 @@ class RpcStore : public LocalFSStore, public virtual Store {
                           CheckSigsFlag checkSigs = CheckSigs,
                           std::shared_ptr<FSAccessor> accessor = 0) override;
 
-  virtual void addToStore(const ValidPathInfo& info,
-                          const ref<std::string>& nar,
-                          RepairFlag repair = NoRepair,
-                          CheckSigsFlag checkSigs = CheckSigs,
-                          std::shared_ptr<FSAccessor> accessor = 0) override;
-
   virtual Path addToStore(const std::string& name, const Path& srcPath,
                           bool recursive = true, HashType hashAlgo = htSHA256,
                           PathFilter& filter = defaultPathFilter,
diff --git a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc
index b310cc92c43c..33e440dfde92 100644
--- a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc
+++ b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc
@@ -2,6 +2,7 @@
 
 #include <filesystem>
 #include <sstream>
+#include <string>
 
 #include <absl/strings/str_cat.h>
 #include <absl/strings/str_format.h>
@@ -10,6 +11,7 @@
 #include <grpcpp/impl/codegen/server_context.h>
 #include <grpcpp/impl/codegen/status.h>
 #include <grpcpp/impl/codegen/status_code_enum.h>
+#include <grpcpp/impl/codegen/sync_stream.h>
 
 #include "libmain/shared.hh"
 #include "libproto/worker.grpc.pb.h"
@@ -24,6 +26,7 @@
 
 namespace nix::daemon {
 
+using ::google::protobuf::util::TimeUtil;
 using ::grpc::Status;
 using ::nix::proto::BuildStatus;
 using ::nix::proto::PathInfo;
@@ -31,20 +34,20 @@ using ::nix::proto::StorePath;
 using ::nix::proto::StorePaths;
 using ::nix::proto::WorkerService;
 
-class AddToStoreRequestSource final : public Source {
-  using Reader = grpc::ServerReader<nix::proto::AddToStoreRequest>;
-
+template <typename Request>
+class RPCSource final : public Source {
  public:
-  explicit AddToStoreRequestSource(Reader* reader) : reader_(reader) {}
+  using Reader = grpc::ServerReader<Request>;
+  explicit RPCSource(Reader* reader) : reader_(reader) {}
 
   size_t read(unsigned char* data, size_t len) override {
     auto got = buffer_.sgetn(reinterpret_cast<char*>(data), len);
     if (got < len) {
-      proto::AddToStoreRequest msg;
+      Request msg;
       if (!reader_->Read(&msg)) {
         return got;
       }
-      if (msg.add_oneof_case() != proto::AddToStoreRequest::kData) {
+      if (msg.add_oneof_case() != Request::kData) {
         // TODO(grfn): Make Source::read return a StatusOr and get rid of this
         // throw
         throw Error(
@@ -152,7 +155,7 @@ class WorkerServiceImpl final : public WorkerService::Service {
           }
 
           auto meta = metadata_request.meta();
-          AddToStoreRequestSource source(reader);
+          RPCSource source(reader);
           auto opt_hash_type = hash_type_from(meta.hash_type());
           if (!opt_hash_type) {
             return Status(grpc::StatusCode::INVALID_ARGUMENT,
@@ -194,6 +197,62 @@ class WorkerServiceImpl final : public WorkerService::Service {
         __FUNCTION__);
   }
 
+  Status AddToStoreNar(
+      grpc::ServerContext* context,
+      grpc::ServerReader<nix::proto::AddToStoreNarRequest>* reader,
+      google::protobuf::Empty*) override {
+    return HandleExceptions(
+        [&]() -> Status {
+          proto::AddToStoreNarRequest path_info_request;
+          auto has_path_info = reader->Read(&path_info_request);
+          if (!has_path_info || !path_info_request.has_path_info()) {
+            return Status(grpc::StatusCode::INVALID_ARGUMENT,
+                          "Path info must be set before sending nar content");
+          }
+
+          auto path_info = path_info_request.path_info();
+
+          ValidPathInfo info;
+          info.path = path_info.path().path();
+          info.deriver = path_info.deriver().path();
+
+          if (!info.deriver.empty()) {
+            ASSERT_INPUT_STORE_PATH(info.deriver);
+          }
+
+          auto nar_hash = Hash::deserialize(path_info.nar_hash(), htSHA256);
+
+          if (!nar_hash.ok()) {
+            return Status(grpc::StatusCode::INVALID_ARGUMENT,
+                          std::string(nar_hash.status().message()));
+          }
+
+          info.narHash = nar_hash.ConsumeValueOrDie();
+          for (const auto& ref : path_info.references()) {
+            info.references.insert(ref);
+          }
+          info.registrationTime =
+              TimeUtil::TimestampToTimeT(path_info.registration_time());
+          info.narSize = path_info.nar_size();
+          info.ultimate = path_info.ultimate();
+          for (const auto& sig : path_info.sigs()) {
+            info.sigs.insert(sig);
+          }
+          info.ca = path_info.ca();
+
+          auto repair = path_info.repair();
+          auto check_sigs = path_info.check_sigs();
+
+          std::string saved;
+          RPCSource source(reader);
+          store_->addToStore(info, source, repair ? Repair : NoRepair,
+                             check_sigs ? CheckSigs : NoCheckSigs, nullptr);
+
+          return Status::OK;
+        },
+        __FUNCTION__);
+  }
+
   Status AddTextToStore(grpc::ServerContext*,
                         const nix::proto::AddTextToStoreRequest* request,
                         nix::proto::StorePath* response) override {
@@ -228,6 +287,19 @@ class WorkerServiceImpl final : public WorkerService::Service {
     return Status::OK;
   }
 
+  Status AddTempRoot(grpc::ServerContext*, const nix::proto::StorePath* request,
+                     google::protobuf::Empty*) override {
+    auto path = request->path();
+    ASSERT_INPUT_STORE_PATH(path);
+
+    return HandleExceptions(
+        [&]() -> Status {
+          store_->addTempRoot(path);
+          return Status::OK;
+        },
+        __FUNCTION__);
+  }
+
   Status AddIndirectRoot(grpc::ServerContext*,
                          const nix::proto::StorePath* request,
                          google::protobuf::Empty*) override {
diff --git a/third_party/nix/src/proto/worker.proto b/third_party/nix/src/proto/worker.proto
index abe8f622c9f2..cc5be46d6ba7 100644
--- a/third_party/nix/src/proto/worker.proto
+++ b/third_party/nix/src/proto/worker.proto
@@ -33,6 +33,8 @@ service WorkerService {
   rpc EnsurePath(StorePath) returns (google.protobuf.Empty);
 
   // TODO: What does this do?
+  // TODO(grfn): This should not actually take a StorePath, as it's not a
+  // StorePath
   rpc AddTempRoot(StorePath) returns (google.protobuf.Empty);
 
   // TODO: What does this do?
@@ -260,6 +262,10 @@ message PathInfo {
   repeated string sigs = 7;
   // If non-empty, an assertion that the path is content-addressed
   string ca = 8;
+
+  // Only used for AddToStoreNarRequest
+  bool repair = 12;
+  bool check_sigs = 13;
 }
 
 message SubstitutablePathInfos {
@@ -318,9 +324,9 @@ message AddSignaturesRequest {
 }
 
 message AddToStoreNarRequest {
-  oneof add_oneoff {
+  oneof add_oneof {
     PathInfo path_info = 1;
-    bytes chunk = 2;
+    bytes data = 2;
   }
 }