#include "rpc-store.hh"
#include <algorithm>
#include <filesystem>
#include <memory>
#include <absl/status/status.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h>
#include <google/protobuf/empty.pb.h>
#include <google/protobuf/util/time_util.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
#include <grpcpp/impl/codegen/client_context.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/sync_stream.h>
#include <grpcpp/security/credentials.h>
#include <sys/ucontext.h>
#include "libproto/worker.grpc.pb.h"
#include "libproto/worker.pb.h"
#include "libstore/store-api.hh"
#include "libutil/archive.hh"
#include "libutil/hash.hh"
#include "libutil/types.hh"
namespace nix {
namespace store {
using google::protobuf::util::TimeUtil;
using grpc::ClientContext;
using nix::proto::WorkerService;
static google::protobuf::Empty kEmpty;
proto::StorePath StorePath(const Path& path) {
proto::StorePath store_path;
store_path.set_path(path);
return store_path;
}
proto::StorePaths StorePaths(const PathSet& paths) {
proto::StorePaths result;
for (const auto& path : paths) {
result.add_paths(path);
}
return result;
}
template <typename T, typename U>
T FillFrom(const U& src) {
T result;
result.insert(src.begin(), src.end());
return result;
}
template <typename Request>
class RPCSink : public BufferedSink {
public:
using Writer = grpc::ClientWriter<Request>;
explicit RPCSink(std::unique_ptr<Writer>&& writer)
: writer_(std::move(writer)), good_(true) {}
bool good() override { return good_; }
void write(const unsigned char* data, size_t len) override {
Request req;
req.set_data(data, len);
if (!writer_->Write(req)) {
good_ = false;
}
}
~RPCSink() override { flush(); }
grpc::Status Finish() {
flush();
return writer_->Finish();
}
private:
std::unique_ptr<Writer> writer_;
bool good_;
};
constexpr absl::StatusCode GRPCStatusCodeToAbsl(grpc::StatusCode code) {
switch (code) {
case grpc::StatusCode::OK:
return absl::StatusCode::kOk;
case grpc::StatusCode::CANCELLED:
return absl::StatusCode::kCancelled;
case grpc::StatusCode::UNKNOWN:
return absl::StatusCode::kUnknown;
case grpc::StatusCode::INVALID_ARGUMENT:
return absl::StatusCode::kInvalidArgument;
case grpc::StatusCode::DEADLINE_EXCEEDED:
return absl::StatusCode::kDeadlineExceeded;
case grpc::StatusCode::NOT_FOUND:
return absl::StatusCode::kNotFound;
case grpc::StatusCode::ALREADY_EXISTS:
return absl::StatusCode::kAlreadyExists;
case grpc::StatusCode::PERMISSION_DENIED:
return absl::StatusCode::kPermissionDenied;
case grpc::StatusCode::UNAUTHENTICATED:
return absl::StatusCode::kUnauthenticated;
case grpc::StatusCode::RESOURCE_EXHAUSTED:
return absl::StatusCode::kResourceExhausted;
case grpc::StatusCode::FAILED_PRECONDITION:
return absl::StatusCode::kFailedPrecondition;
case grpc::StatusCode::ABORTED:
return absl::StatusCode::kAborted;
case grpc::StatusCode::OUT_OF_RANGE:
return absl::StatusCode::kOutOfRange;
case grpc::StatusCode::UNIMPLEMENTED:
return absl::StatusCode::kUnimplemented;
case grpc::StatusCode::INTERNAL:
return absl::StatusCode::kInternal;
case grpc::StatusCode::UNAVAILABLE:
return absl::StatusCode::kUnavailable;
case grpc::StatusCode::DATA_LOSS:
return absl::StatusCode::kDataLoss;
default:
return absl::StatusCode::kInternal;
}
}
constexpr absl::string_view GRPCStatusCodeDescription(grpc::StatusCode code) {
switch (code) {
case grpc::StatusCode::OK:
return "OK";
case grpc::StatusCode::CANCELLED:
return "CANCELLED";
case grpc::StatusCode::UNKNOWN:
return "UNKNOWN";
case grpc::StatusCode::INVALID_ARGUMENT:
return "INVALID_ARGUMENT";
case grpc::StatusCode::DEADLINE_EXCEEDED:
return "DEADLINE_EXCEEDED";
case grpc::StatusCode::NOT_FOUND:
return "NOT_FOUND";
case grpc::StatusCode::ALREADY_EXISTS:
return "ALREADY_EXISTS";
case grpc::StatusCode::PERMISSION_DENIED:
return "PERMISSION_DENIED";
case grpc::StatusCode::UNAUTHENTICATED:
return "UNAUTHENTICATED";
case grpc::StatusCode::RESOURCE_EXHAUSTED:
return "RESOURCE_EXHAUSTED";
case grpc::StatusCode::FAILED_PRECONDITION:
return "FAILED_PRECONDITION";
case grpc::StatusCode::ABORTED:
return "ABORTED";
case grpc::StatusCode::OUT_OF_RANGE:
return "OUT_OF_RANGE";
case grpc::StatusCode::UNIMPLEMENTED:
return "UNIMPLEMENTED";
case grpc::StatusCode::INTERNAL:
return "INTERNAL";
case grpc::StatusCode::UNAVAILABLE:
return "UNAVAILABLE";
case grpc::StatusCode::DATA_LOSS:
return "DATA_LOSS";
default:
return "<BAD ERROR CODE>";
};
}
// TODO(grfn): Obviously this should go away and be replaced by StatusOr... but
// that would require refactoring the entire store api, which we don't feel like
// doing right now. We should at some point though
void const RpcStore::SuccessOrThrow(const grpc::Status& status,
const absl::string_view& call) const {
if (!status.ok()) {
auto uri = uri_.value_or("unknown URI");
switch (status.error_code()) {
case grpc::StatusCode::UNIMPLEMENTED:
throw Unsupported(
absl::StrFormat("operation is not supported by store at %s: %s",
uri, status.error_message()));
default:
throw Error(
absl::StrFormat("Rpc call to %s failed (%s): %s ", uri,
GRPCStatusCodeDescription(status.error_code()),
status.error_message()));
}
}
}
bool RpcStore::isValidPathUncached(const Path& path) {
ClientContext ctx;
proto::IsValidPathResponse resp;
SuccessOrThrow(stub_->IsValidPath(&ctx, StorePath(path), &resp),
__FUNCTION__);
return resp.is_valid();
}
PathSet RpcStore::queryAllValidPaths() {
ClientContext ctx;
proto::StorePaths paths;
SuccessOrThrow(stub_->QueryAllValidPaths(&ctx, kEmpty, &paths), __FUNCTION__);
return FillFrom<PathSet>(paths.paths());
}
PathSet RpcStore::queryValidPaths(const PathSet& paths,
SubstituteFlag maybeSubstitute) {
ClientContext ctx;
proto::StorePaths store_paths;
for (const auto& path : paths) {
store_paths.add_paths(path);
}
proto::StorePaths result_paths;
SuccessOrThrow(stub_->QueryValidPaths(&ctx, store_paths, &result_paths),
__FUNCTION__);
return FillFrom<PathSet>(result_paths.paths());
}
void RpcStore::queryPathInfoUncached(
const Path& path,
Callback<std::shared_ptr<ValidPathInfo>> callback) noexcept {
ClientContext ctx;
proto::StorePath store_path;
store_path.set_path(path);
try {
proto::PathInfo path_info;
SuccessOrThrow(stub_->QueryPathInfo(&ctx, store_path, &path_info),
__FUNCTION__);
std::shared_ptr<ValidPathInfo> info;
if (!path_info.is_valid()) {
throw InvalidPath(absl::StrFormat("path '%s' is not valid", path));
}
info = std::make_shared<ValidPathInfo>();
info->path = path;
info->deriver = path_info.deriver().path();
if (!info->deriver.empty()) {
assertStorePath(info->deriver);
}
auto hash_ = Hash::deserialize(path_info.nar_hash(), htSHA256);
info->narHash = Hash::unwrap_throw(hash_);
info->references.insert(path_info.references().begin(),
path_info.references().end());
info->registrationTime =
TimeUtil::TimestampToTimeT(path_info.registration_time());
info->narSize = path_info.nar_size();
info->ultimate = path_info.ultimate();
info->sigs.insert(path_info.sigs().begin(), path_info.sigs().end());
info->ca = path_info.ca();
callback(std::move(info));
} catch (...) {
callback.rethrow();
}
}
void RpcStore::queryReferrers(const Path& path, PathSet& referrers) {
ClientContext ctx;
proto::StorePaths paths;
SuccessOrThrow(stub_->QueryReferrers(&ctx, StorePath(path), &paths),
__FUNCTION__);
referrers.insert(paths.paths().begin(), paths.paths().end());
}
PathSet RpcStore::queryValidDerivers(const Path& path) {
ClientContext ctx;
proto::StorePaths paths;
SuccessOrThrow(stub_->QueryValidDerivers(&ctx, StorePath(path), &paths),
__FUNCTION__);
return FillFrom<PathSet>(paths.paths());
}
PathSet RpcStore::queryDerivationOutputs(const Path& path) {
ClientContext ctx;
proto::StorePaths paths;
SuccessOrThrow(stub_->QueryDerivationOutputs(&ctx, StorePath(path), &paths),
__FUNCTION__);
return FillFrom<PathSet>(paths.paths());
}
StringSet RpcStore::queryDerivationOutputNames(const Path& path) {
ClientContext ctx;
proto::DerivationOutputNames output_names;
SuccessOrThrow(
stub_->QueryDerivationOutputNames(&ctx, StorePath(path), &output_names));
return FillFrom<StringSet>(output_names.names());
}
Path RpcStore::queryPathFromHashPart(const std::string& hashPart) {
ClientContext ctx;
proto::StorePath path;
proto::HashPart proto_hash_part;
proto_hash_part.set_hash_part(hashPart);
SuccessOrThrow(stub_->QueryPathFromHashPart(&ctx, proto_hash_part, &path),
__FUNCTION__);
return path.path();
}
PathSet RpcStore::querySubstitutablePaths(const PathSet& paths) {
ClientContext ctx;
proto::StorePaths result;
SuccessOrThrow(
stub_->QuerySubstitutablePaths(&ctx, StorePaths(paths), &result));
return FillFrom<PathSet>(result.paths());
}
void RpcStore::querySubstitutablePathInfos(const PathSet& paths,
SubstitutablePathInfos& infos) {
ClientContext ctx;
proto::SubstitutablePathInfos result;
SuccessOrThrow(
stub_->QuerySubstitutablePathInfos(&ctx, StorePaths(paths), &result));
for (const auto& path_info : result.path_infos()) {
auto path = path_info.path().path();
SubstitutablePathInfo& info(infos[path]);
info.deriver = path_info.deriver().path();
if (!info.deriver.empty()) {
assertStorePath(info.deriver);
}
info.references = FillFrom<PathSet>(path_info.references());
info.downloadSize = path_info.download_size();
info.narSize = path_info.nar_size();
}
}
void RpcStore::addToStore(const ValidPathInfo& info, Source& narSource,
RepairFlag repair, CheckSigsFlag checkSigs,
std::shared_ptr<FSAccessor> accessor) {
ClientContext ctx;
google::protobuf::Empty response;
auto writer = stub_->AddToStoreNar(&ctx, &response);
proto::AddToStoreNarRequest path_info_req;
path_info_req.mutable_path_info()->mutable_path()->set_path(info.path);
path_info_req.mutable_path_info()->mutable_deriver()->set_path(info.deriver);
path_info_req.mutable_path_info()->set_nar_hash(
info.narHash.to_string(Base16, false));
for (const auto& ref : info.references) {
path_info_req.mutable_path_info()->add_references(ref);
}
*path_info_req.mutable_path_info()->mutable_registration_time() =
TimeUtil::TimeTToTimestamp(info.registrationTime);
path_info_req.mutable_path_info()->set_nar_size(info.narSize);
path_info_req.mutable_path_info()->set_ultimate(info.ultimate);
for (const auto& sig : info.sigs) {
path_info_req.mutable_path_info()->add_sigs(sig);
}
path_info_req.mutable_path_info()->set_ca(info.ca);
path_info_req.mutable_path_info()->set_repair(repair);
path_info_req.mutable_path_info()->set_check_sigs(checkSigs);
writer->Write(path_info_req);
RPCSink sink(std::move(writer));
copyNAR(narSource, sink);
SuccessOrThrow(sink.Finish(), __FUNCTION__);
}
Path RpcStore::addToStore(const std::string& name, const Path& srcPath,
bool recursive, HashType hashAlgo, PathFilter& filter,
RepairFlag repair) {
if (repair != 0u) {
throw Error(
"repairing is not supported when building through the Nix daemon");
}
ClientContext ctx;
proto::StorePath response;
auto writer = stub_->AddToStore(&ctx, &response);
proto::AddToStoreRequest metadata_req;
metadata_req.mutable_meta()->set_base_name(name);
// TODO(grfn): what is fixed?
metadata_req.mutable_meta()->set_fixed(!(hashAlgo == htSHA256 && recursive));
metadata_req.mutable_meta()->set_recursive(recursive);
metadata_req.mutable_meta()->set_hash_type(HashTypeToProto(hashAlgo));
writer->Write(metadata_req);
RPCSink sink(std::move(writer));
dumpPath(std::filesystem::absolute(srcPath), sink);
sink.flush();
SuccessOrThrow(sink.Finish(), __FUNCTION__);
return response.path();
}
Path RpcStore::addTextToStore(const std::string& name,
const std::string& content,
const PathSet& references, RepairFlag repair) {
if (repair != 0u) {
throw Error(
"repairing is not supported when building through the Nix daemon");
}
ClientContext ctx;
proto::AddTextToStoreRequest request;
request.set_name(name);
request.set_content(content);
for (const auto& ref : references) {
request.add_references(ref);
}
proto::StorePath result;
SuccessOrThrow(stub_->AddTextToStore(&ctx, request, &result), __FUNCTION__);
return result.path();
}
void RpcStore::narFromPath(const Path& path, Sink& sink) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::buildPaths(const PathSet& paths, BuildMode buildMode) {
ClientContext ctx;
proto::BuildPathsRequest request;
for (const auto& path : paths) {
request.add_drvs(path);
}
google::protobuf::Empty response;
request.set_mode(nix::BuildModeToProto(buildMode));
SuccessOrThrow(stub_->BuildPaths(&ctx, request, &response), __FUNCTION__);
}
BuildResult RpcStore::buildDerivation(const Path& drvPath,
const BasicDerivation& drv,
BuildMode buildMode) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::ensurePath(const Path& path) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::addTempRoot(const Path& path) {
ClientContext ctx;
google::protobuf::Empty response;
SuccessOrThrow(stub_->AddTempRoot(&ctx, StorePath(path), &response),
__FUNCTION__);
}
void RpcStore::addIndirectRoot(const Path& path) {
ClientContext ctx;
google::protobuf::Empty response;
SuccessOrThrow(stub_->AddIndirectRoot(&ctx, StorePath(path), &response),
__FUNCTION__);
}
void RpcStore::syncWithGC() {
ClientContext ctx;
google::protobuf::Empty response;
SuccessOrThrow(stub_->SyncWithGC(&ctx, kEmpty, &response), __FUNCTION__);
}
Roots RpcStore::findRoots(bool censor) {
ClientContext ctx;
proto::FindRootsResponse response;
SuccessOrThrow(stub_->FindRoots(&ctx, kEmpty, &response), __FUNCTION__);
Roots result;
for (const auto& [target, links] : response.roots()) {
auto link_paths = FillFrom<std::unordered_set<std::string>>(links.paths());
result.insert({target, link_paths});
}
return result;
}
void RpcStore::collectGarbage(const GCOptions& options, GCResults& results) {
ClientContext ctx;
proto::CollectGarbageRequest request;
request.set_action(options.ActionToProto());
for (const auto& path : options.pathsToDelete) {
request.add_paths_to_delete(path);
}
request.set_ignore_liveness(options.ignoreLiveness);
request.set_max_freed(options.maxFreed);
proto::CollectGarbageResponse response;
SuccessOrThrow(stub_->CollectGarbage(&ctx, request, &response), __FUNCTION__);
for (const auto& path : response.deleted_paths()) {
results.paths.insert(path);
}
results.bytesFreed = response.bytes_freed();
}
void RpcStore::optimiseStore() {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
bool RpcStore::verifyStore(bool checkContents, RepairFlag repair) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::addSignatures(const Path& storePath, const StringSet& sigs) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::computeFSClosure(const PathSet& paths, PathSet& paths_,
bool flipDirection, bool includeOutputs,
bool includeDerivers) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::queryMissing(const PathSet& targets, PathSet& willBuild,
PathSet& willSubstitute, PathSet& unknown,
unsigned long long& downloadSize,
unsigned long long& narSize) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
std::shared_ptr<std::string> RpcStore::getBuildLog(const Path& path) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::connect() {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
unsigned int RpcStore::getProtocol() {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
int RpcStore::getPriority() {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
Path RpcStore::toRealPath(const Path& storePath) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
void RpcStore::createUser(const std::string& userName, uid_t userId) {
throw Unsupported(absl::StrCat("Not implemented ", __func__));
}
} // namespace store
static std::string uriScheme = "unix://";
// TODO(grfn): Make this a function that we call from main rather than... this
static RegisterStoreImplementation regStore([](const std::string& uri,
const Store::Params& params)
-> std::shared_ptr<Store> {
if (std::string(uri, 0, uriScheme.size()) != uriScheme) {
return nullptr;
}
auto channel = grpc::CreateChannel(uri, grpc::InsecureChannelCredentials());
return std::make_shared<store::RpcStore>(
uri, params, proto::WorkerService::NewStub(channel));
});
} // namespace nix