#include "nix-daemon-proto.hh" #include <filesystem> #include <ostream> #include <sstream> #include <streambuf> #include <string> #include <absl/strings/str_cat.h> #include <absl/strings/str_format.h> #include <google/protobuf/empty.pb.h> #include <google/protobuf/util/time_util.h> #include <grpcpp/impl/codegen/server_context.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/status_code_enum.h> #include <grpcpp/impl/codegen/sync_stream.h> #include "libmain/shared.hh" #include "libproto/worker.grpc.pb.h" #include "libproto/worker.pb.h" #include "libstore/derivations.hh" #include "libstore/local-store.hh" #include "libstore/store-api.hh" #include "libutil/archive.hh" #include "libutil/hash.hh" #include "libutil/proto.hh" #include "libutil/serialise.hh" #include "libutil/types.hh" namespace nix::daemon { using ::google::protobuf::util::TimeUtil; using ::grpc::Status; using ::nix::proto::PathInfo; using ::nix::proto::StorePath; using ::nix::proto::StorePaths; using ::nix::proto::WorkerService; template <typename Request> class RPCSource final : public Source { public: using Reader = grpc::ServerReader<Request>; explicit RPCSource(Reader* reader) : reader_(reader) {} size_t read(unsigned char* data, size_t len) override { auto got = buffer_.sgetn(reinterpret_cast<char*>(data), len); if (got < len) { Request msg; if (!reader_->Read(&msg)) { return got; } if (msg.add_oneof_case() != Request::kData) { // TODO(grfn): Make Source::read return a StatusOr and get rid of this // throw throw Error( "Invalid AddToStoreRequest: all messages except the first must " "contain data"); } buffer_.sputn(msg.data().data(), msg.data().length()); return got + read(data + got, len - got); } return got; }; private: std::stringbuf buffer_; Reader* reader_; }; // TODO(grfn): Make this some sort of pipe so we don't have to store data in // memory /* If the NAR archive contains a single file at top-level, then save the contents of the file to `s'. Otherwise barf. */ struct RetrieveRegularNARSink : ParseSink { bool regular{true}; std::string s; RetrieveRegularNARSink() {} void createDirectory(const Path& path) override { regular = false; } void receiveContents(unsigned char* data, unsigned int len) override { s.append(reinterpret_cast<const char*>(data), len); } void createSymlink(const Path& path, const std::string& target) override { regular = false; } }; #define ASSERT_INPUT_STORE_PATH(path) \ if (!store_->isStorePath(path)) { \ return Status(grpc::StatusCode::INVALID_ARGUMENT, \ absl::StrFormat("path '%s' is not in the Nix store", path)); \ } class BuildLogStreambuf final : public std::streambuf { public: using Writer = grpc::ServerWriter<nix::proto::BuildEvent>; explicit BuildLogStreambuf(Writer* writer) : writer_(writer) {} // TODO(grfn): buffer with a timeout so we don't have too many messages std::streamsize xsputn(const char_type* s, std::streamsize n) override { nix::proto::BuildEvent event; event.mutable_build_log()->set_line(s, n); writer_->Write(event); return n; } int_type overflow(int_type ch) override { if (ch != traits_type::eof()) { nix::proto::BuildEvent event; event.mutable_build_log()->set_line(std::string(1, ch)); writer_->Write(event); } return ch; } private: Writer* writer_{}; }; class WorkerServiceImpl final : public WorkerService::Service { public: WorkerServiceImpl(nix::Store& store) : store_(&store) {} Status IsValidPath(grpc::ServerContext* context, const StorePath* request, nix::proto::IsValidPathResponse* response) override { return HandleExceptions( [&]() -> Status { const auto& path = request->path(); response->set_is_valid(store_->isValidPath(path)); return Status::OK; }, __FUNCTION__); } Status HasSubstitutes(grpc::ServerContext* context, const StorePath* request, nix::proto::HasSubstitutesResponse* response) override { return HandleExceptions( [&]() -> Status { const auto& path = request->path(); ASSERT_INPUT_STORE_PATH(path); PathSet res = store_->querySubstitutablePaths({path}); response->set_has_substitutes(res.find(path) != res.end()); return Status::OK; }, __FUNCTION__); } Status QueryReferrers(grpc::ServerContext* context, const StorePath* request, StorePaths* response) override { return HandleExceptions( [&]() -> Status { const auto& path = request->path(); ASSERT_INPUT_STORE_PATH(path); PathSet paths; store_->queryReferrers(path, paths); for (const auto& path : paths) { response->add_paths(path); } return Status::OK; }, __FUNCTION__); } Status AddToStore(grpc::ServerContext* context, grpc::ServerReader<nix::proto::AddToStoreRequest>* reader, nix::proto::StorePath* response) override { return HandleExceptions( [&]() -> Status { proto::AddToStoreRequest metadata_request; auto has_metadata = reader->Read(&metadata_request); if (!has_metadata || !metadata_request.has_meta()) { return Status(grpc::StatusCode::INVALID_ARGUMENT, "Metadata must be set before sending file content"); } auto meta = metadata_request.meta(); RPCSource source(reader); auto opt_hash_type = hash_type_from(meta.hash_type()); if (!opt_hash_type) { return Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid hash type"); } std::string* data; RetrieveRegularNARSink nar; TeeSource saved_nar(source); if (meta.recursive()) { // TODO(grfn): Don't store the full data in memory, instead just // make addToStoreFromDump take a Source ParseSink sink; parseDump(sink, saved_nar); data = &(*saved_nar.data); } else { parseDump(nar, source); if (!nar.regular) { return Status(grpc::StatusCode::INVALID_ARGUMENT, "Regular file expected"); } data = &nar.s; } auto local_store = store_.dynamic_pointer_cast<LocalStore>(); if (!local_store) { return Status(grpc::StatusCode::FAILED_PRECONDITION, "operation is only supported by LocalStore"); } auto path = local_store->addToStoreFromDump( *data, meta.base_name(), meta.recursive(), opt_hash_type.value()); response->set_path(path); return Status::OK; }, __FUNCTION__); } Status AddToStoreNar( grpc::ServerContext* context, grpc::ServerReader<nix::proto::AddToStoreNarRequest>* reader, google::protobuf::Empty*) override { return HandleExceptions( [&]() -> Status { proto::AddToStoreNarRequest path_info_request; auto has_path_info = reader->Read(&path_info_request); if (!has_path_info || !path_info_request.has_path_info()) { return Status(grpc::StatusCode::INVALID_ARGUMENT, "Path info must be set before sending nar content"); } auto path_info = path_info_request.path_info(); ValidPathInfo info; info.path = path_info.path().path(); info.deriver = path_info.deriver().path(); if (!info.deriver.empty()) { ASSERT_INPUT_STORE_PATH(info.deriver); } auto nar_hash = Hash::deserialize(path_info.nar_hash(), htSHA256); if (!nar_hash.ok()) { return Status(grpc::StatusCode::INVALID_ARGUMENT, std::string(nar_hash.status().message())); } info.narHash = *nar_hash; for (const auto& ref : path_info.references()) { info.references.insert(ref); } info.registrationTime = TimeUtil::TimestampToTimeT(path_info.registration_time()); info.narSize = path_info.nar_size(); info.ultimate = path_info.ultimate(); for (const auto& sig : path_info.sigs()) { info.sigs.insert(sig); } info.ca = path_info.ca(); auto repair = path_info.repair(); auto check_sigs = path_info.check_sigs(); std::string saved; RPCSource source(reader); store_->addToStore(info, source, repair ? Repair : NoRepair, check_sigs ? CheckSigs : NoCheckSigs, nullptr); return Status::OK; }, __FUNCTION__); } Status AddTextToStore( grpc::ServerContext*, grpc::ServerReader<nix::proto::AddTextToStoreRequest>* reader, nix::proto::StorePath* response) override { return HandleExceptions( [&]() -> Status { proto::AddTextToStoreRequest request; auto has_metadata = reader->Read(&request); if (!has_metadata || !request.has_meta()) { return Status(grpc::StatusCode::INVALID_ARGUMENT, "Metadata must be set before sending content"); } proto::AddTextToStoreRequest_Metadata meta = request.meta(); PathSet references; for (const auto& ref : meta.references()) { references.insert(ref); } std::string content; content.reserve(meta.size()); while (reader->Read(&request)) { if (request.add_oneof_case() != request.kData) { return Status(grpc::StatusCode::INVALID_ARGUMENT, "All requests except the first must contain data"); } content.append(request.data()); } auto path = store_->addTextToStore(meta.name(), content, references); response->set_path(path); return Status::OK; }, __FUNCTION__); } Status BuildPaths( grpc::ServerContext*, const nix::proto::BuildPathsRequest* request, grpc::ServerWriter<nix::proto::BuildEvent>* writer) override { return HandleExceptions( [&]() -> Status { PathSet drvs; for (const auto& drv : request->drvs()) { drvs.insert(drv); } auto mode = BuildModeFrom(request->mode()); if (!mode.has_value()) { return Status(grpc::StatusCode::INTERNAL, "Invalid build mode"); } BuildLogStreambuf log_buffer(writer); std::ostream log_sink(&log_buffer); // TODO(grfn): If mode is repair and not trusted, we need to return an // error here (but we can't yet because we don't know anything about // trusted users) return nix::util::proto::AbslToGRPCStatus( store_->buildPaths(log_sink, drvs, mode.value())); }, __FUNCTION__); } Status EnsurePath(grpc::ServerContext* context, const nix::proto::StorePath* request, google::protobuf::Empty*) override { auto path = request->path(); ASSERT_INPUT_STORE_PATH(path); return HandleExceptions( [&]() -> Status { store_->ensurePath(path); return Status::OK; }, __FUNCTION__); } Status AddTempRoot(grpc::ServerContext*, const nix::proto::StorePath* request, google::protobuf::Empty*) override { auto path = request->path(); ASSERT_INPUT_STORE_PATH(path); return HandleExceptions( [&]() -> Status { store_->addTempRoot(path); return Status::OK; }, __FUNCTION__); } Status AddIndirectRoot(grpc::ServerContext*, const nix::proto::StorePath* request, google::protobuf::Empty*) override { auto path = std::filesystem::canonical(request->path()); ASSERT_INPUT_STORE_PATH(path); return HandleExceptions( [&]() -> Status { store_->addIndirectRoot(path); return Status::OK; }, __FUNCTION__); } Status SyncWithGC(grpc::ServerContext*, const google::protobuf::Empty*, google::protobuf::Empty*) override { return HandleExceptions( [&]() -> Status { store_->syncWithGC(); return Status::OK; }, __FUNCTION__); } Status FindRoots(grpc::ServerContext*, const google::protobuf::Empty*, nix::proto::FindRootsResponse* response) override { return HandleExceptions( [&]() -> Status { auto roots = store_->findRoots(false); for (const auto& [target, links] : roots) { StorePaths link_paths; for (const auto& link : links) { link_paths.add_paths(link); } response->mutable_roots()->insert({target, link_paths}); } return Status::OK; }, __FUNCTION__); } Status CollectGarbage(grpc::ServerContext*, const proto::CollectGarbageRequest* request, proto::CollectGarbageResponse* response) override { return HandleExceptions( [&]() -> Status { GCOptions options; auto action = GCActionFromProto(request->action()); if (!action.has_value()) { return Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid GC action"); } options.action = action.value(); for (const auto& path : request->paths_to_delete()) { options.pathsToDelete.insert(path); } options.ignoreLiveness = request->ignore_liveness(); options.maxFreed = request->max_freed(); if (options.ignoreLiveness) { return Status(grpc::StatusCode::INVALID_ARGUMENT, "you are not allowed to ignore liveness"); } GCResults results; store_->collectGarbage(options, results); for (const auto& path : results.paths) { response->add_deleted_paths(path); } response->set_bytes_freed(results.bytesFreed); return Status::OK; }, __FUNCTION__); } Status QuerySubstitutablePathInfos( grpc::ServerContext*, const StorePaths* request, nix::proto::SubstitutablePathInfos* response) override { return HandleExceptions( [&]() -> Status { SubstitutablePathInfos infos; PathSet paths; for (const auto& path : request->paths()) { paths.insert(path); } store_->querySubstitutablePathInfos(paths, infos); for (const auto& [path, path_info] : infos) { auto proto_path_info = response->add_path_infos(); proto_path_info->mutable_path()->set_path(path); proto_path_info->mutable_deriver()->set_path(path_info.deriver); for (const auto& ref : path_info.references) { proto_path_info->add_references(ref); } proto_path_info->set_download_size(path_info.downloadSize); proto_path_info->set_nar_size(path_info.narSize); } return Status::OK; }, __FUNCTION__); } Status QueryValidDerivers(grpc::ServerContext* context, const StorePath* request, StorePaths* response) override { return HandleExceptions( [&]() -> Status { const auto& path = request->path(); ASSERT_INPUT_STORE_PATH(path); PathSet paths = store_->queryValidDerivers(path); for (const auto& path : paths) { response->add_paths(path); } return Status::OK; }, __FUNCTION__); } Status QueryDerivationOutputs(grpc::ServerContext* context, const StorePath* request, StorePaths* response) override { return HandleExceptions( [&]() -> Status { const auto& path = request->path(); ASSERT_INPUT_STORE_PATH(path); PathSet paths = store_->queryDerivationOutputs(path); for (const auto& path : paths) { response->add_paths(path); } return Status::OK; }, __FUNCTION__); } Status QueryAllValidPaths(grpc::ServerContext* context, const google::protobuf::Empty* request, StorePaths* response) override { return HandleExceptions( [&]() -> Status { const auto paths = store_->queryAllValidPaths(); for (const auto& path : paths) { response->add_paths(path); } return Status::OK; }, __FUNCTION__); } Status QueryPathInfo(grpc::ServerContext* context, const StorePath* request, PathInfo* response) override { return HandleExceptions( [&]() -> Status { auto path = request->path(); ASSERT_INPUT_STORE_PATH(path); response->mutable_path()->set_path(path); try { auto info = store_->queryPathInfo(path); response->mutable_deriver()->set_path(info->deriver); response->set_nar_hash( reinterpret_cast<const char*>(&info->narHash.hash[0]), info->narHash.hashSize); for (const auto& reference : info->references) { response->add_references(reference); } *response->mutable_registration_time() = google::protobuf::util::TimeUtil::TimeTToTimestamp( info->registrationTime); response->set_nar_size(info->narSize); response->set_ultimate(info->ultimate); for (const auto& sig : info->sigs) { response->add_sigs(sig); } response->set_ca(info->ca); return Status::OK; } catch (InvalidPath& e) { return Status(grpc::StatusCode::INVALID_ARGUMENT, e.msg()); } }, __FUNCTION__); } Status QueryDerivationOutputNames( grpc::ServerContext* context, const StorePath* request, nix::proto::DerivationOutputNames* response) override { return HandleExceptions( [&]() -> Status { auto path = request->path(); ASSERT_INPUT_STORE_PATH(path); auto names = store_->queryDerivationOutputNames(path); for (const auto& name : names) { response->add_names(name); } return Status::OK; }, __FUNCTION__); } Status QueryPathFromHashPart(grpc::ServerContext* context, const nix::proto::HashPart* request, StorePath* response) override { return HandleExceptions( [&]() -> Status { auto hash_part = request->hash_part(); auto path = store_->queryPathFromHashPart(hash_part); ASSERT_INPUT_STORE_PATH(path); response->set_path(path); return Status::OK; }, __FUNCTION__); } Status QueryValidPaths(grpc::ServerContext* context, const StorePaths* request, StorePaths* response) override { return HandleExceptions( [&]() -> Status { std::set<Path> paths; for (const auto& path : request->paths()) { ASSERT_INPUT_STORE_PATH(path); paths.insert(path); } auto res = store_->queryValidPaths(paths); for (const auto& path : res) { response->add_paths(path); } return Status::OK; }, __FUNCTION__); } Status QuerySubstitutablePaths(grpc::ServerContext* context, const StorePaths* request, StorePaths* response) override { return HandleExceptions( [&]() -> Status { std::set<Path> paths; for (const auto& path : request->paths()) { ASSERT_INPUT_STORE_PATH(path); paths.insert(path); } auto res = store_->querySubstitutablePaths(paths); for (const auto& path : res) { response->add_paths(path); } return Status::OK; }, __FUNCTION__); } Status OptimiseStore(grpc::ServerContext* context, const google::protobuf::Empty* request, google::protobuf::Empty* response) override { return HandleExceptions( [&]() -> Status { store_->optimiseStore(); return Status::OK; }, __FUNCTION__); } Status VerifyStore(grpc::ServerContext* context, const nix::proto::VerifyStoreRequest* request, nix::proto::VerifyStoreResponse* response) override { return HandleExceptions( [&]() -> Status { auto errors = store_->verifyStore(request->check_contents(), static_cast<RepairFlag>(request->repair())); response->set_errors(errors); return Status::OK; }, __FUNCTION__); } Status BuildDerivation( grpc::ServerContext*, const nix::proto::BuildDerivationRequest* request, grpc::ServerWriter<nix::proto::BuildEvent>* writer) override { return HandleExceptions( [&]() -> Status { auto drv_path = request->drv_path().path(); ASSERT_INPUT_STORE_PATH(drv_path); auto drv = BasicDerivation::from_proto(&request->derivation()); auto build_mode = nix::BuildModeFrom(request->build_mode()); if (!build_mode) { return Status(grpc::StatusCode::INTERNAL, "Invalid build mode"); } BuildLogStreambuf log_buffer(writer); std::ostream log_sink(&log_buffer); BuildResult res = store_->buildDerivation(log_sink, drv_path, drv, *build_mode); proto::BuildResult proto_res{}; proto_res.set_status(res.status_to_proto()); if (!res.errorMsg.empty()) { proto_res.set_msg(res.errorMsg); } proto::BuildEvent event{}; *event.mutable_result() = proto_res; writer->Write(event); return Status::OK; }, __FUNCTION__); } Status AddSignatures(grpc::ServerContext* context, const nix::proto::AddSignaturesRequest* request, google::protobuf::Empty* response) override { return HandleExceptions( [&]() -> Status { auto path = request->path().path(); ASSERT_INPUT_STORE_PATH(path); StringSet sigs; sigs.insert(request->sigs().sigs().begin(), request->sigs().sigs().end()); store_->addSignatures(path, sigs); return Status::OK; }, __FUNCTION__); } Status QueryMissing(grpc::ServerContext* context, const StorePaths* request, nix::proto::QueryMissingResponse* response) override { return HandleExceptions( [&]() -> Status { std::set<Path> targets; for (auto& path : request->paths()) { ASSERT_INPUT_STORE_PATH(path); targets.insert(path); } PathSet will_build; PathSet will_substitute; PathSet unknown; // TODO(grfn): Switch to concrete size type unsigned long long download_size; unsigned long long nar_size; store_->queryMissing(targets, will_build, will_substitute, unknown, download_size, nar_size); for (auto& path : will_build) { response->add_will_build(path); } for (auto& path : will_substitute) { response->add_will_substitute(path); } for (auto& path : unknown) { response->add_unknown(path); } response->set_download_size(download_size); response->set_nar_size(nar_size); return Status::OK; }, __FUNCTION__); }; Status GetBuildLog(grpc::ServerContext* context, const StorePath* request, proto::BuildLog* response) override { return HandleExceptions( [&]() -> Status { const auto log = store_->getBuildLog(request->path()); if (log) { response->set_build_log(*log); } return Status::OK; }, __FUNCTION__); } private: Status HandleExceptions(std::function<Status(void)> fn, absl::string_view methodName) { try { return fn(); } catch (Unsupported& e) { return Status(grpc::StatusCode::UNIMPLEMENTED, absl::StrCat(methodName, " is not supported: ", e.what())); } catch (Error& e) { return Status(grpc::StatusCode::INTERNAL, e.what()); } // add more specific Error-Status mappings above } ref<nix::Store> store_; }; WorkerService::Service* NewWorkerService(nix::Store& store) { return new WorkerServiceImpl(store); } } // namespace nix::daemon