diff options
Diffstat (limited to 'third_party/nix/src/nix-daemon')
-rw-r--r-- | third_party/nix/src/nix-daemon/CMakeLists.txt | 29 | ||||
-rw-r--r-- | third_party/nix/src/nix-daemon/nix-daemon-legacy.cc | 1185 | ||||
-rw-r--r-- | third_party/nix/src/nix-daemon/nix-daemon-proto.cc | 799 | ||||
-rw-r--r-- | third_party/nix/src/nix-daemon/nix-daemon-proto.hh | 12 | ||||
-rw-r--r-- | third_party/nix/src/nix-daemon/nix-daemon.cc | 201 |
5 files changed, 2226 insertions, 0 deletions
diff --git a/third_party/nix/src/nix-daemon/CMakeLists.txt b/third_party/nix/src/nix-daemon/CMakeLists.txt new file mode 100644 index 000000000000..63125a9b26b2 --- /dev/null +++ b/third_party/nix/src/nix-daemon/CMakeLists.txt @@ -0,0 +1,29 @@ +# -*- mode: cmake; -*- + +# The nix-daemon is the binary running the gRPC server component to +# which other components of Nix talk to perform store and builder +# operations. + +add_executable(nix-daemon) +include_directories(${PROJECT_BINARY_DIR}) # for config.h +set_property(TARGET nix-daemon PROPERTY CXX_STANDARD 17) + +pkg_check_modules(systemd REQUIRED) + +target_sources(nix-daemon + PRIVATE + nix-daemon-proto.hh + nix-daemon-proto.cc + nix-daemon.cc +) + +target_link_libraries(nix-daemon + nixutil + nixstore + nixmain + absl::flags + absl::flags_parse + systemd +) + +install(TARGETS nix-daemon DESTINATION bin) diff --git a/third_party/nix/src/nix-daemon/nix-daemon-legacy.cc b/third_party/nix/src/nix-daemon/nix-daemon-legacy.cc new file mode 100644 index 000000000000..97cf5195d35c --- /dev/null +++ b/third_party/nix/src/nix-daemon/nix-daemon-legacy.cc @@ -0,0 +1,1185 @@ +/* + NOTE: You are looking at the *previous* implementation of the Nix + daemon. This file is not in use, is only left in here for reference + and will be deleted from the codebase eventually. + */ + +#include <algorithm> +#include <cerrno> +#include <climits> +#include <csignal> +#include <cstring> + +#include <fcntl.h> +#include <glog/logging.h> +#include <grp.h> +#include <pwd.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/un.h> +#include <sys/wait.h> +#include <unistd.h> + +#include "libmain/shared.hh" +#include "libproto/worker.pb.h" +#include "libstore/derivations.hh" +#include "libstore/globals.hh" +#include "libstore/local-store.hh" +#include "libstore/worker-protocol.hh" +#include "libutil/affinity.hh" +#include "libutil/archive.hh" +#include "libutil/finally.hh" +#include "libutil/monitor-fd.hh" +#include "libutil/serialise.hh" +#include "libutil/util.hh" +#include "nix/legacy.hh" + +using namespace nix; + +#ifndef __linux__ +#define SPLICE_F_MOVE 0 +static ssize_t splice(int fd_in, void* off_in, int fd_out, void* off_out, + size_t len, unsigned int flags) { + /* We ignore most parameters, we just have them for conformance with the linux + * syscall */ + std::vector<char> buf(8192); + auto read_count = read(fd_in, buf.data(), buf.size()); + if (read_count == -1) { + return read_count; + } + auto write_count = decltype(read_count)(0); + while (write_count < read_count) { + auto res = + write(fd_out, buf.data() + write_count, read_count - write_count); + if (res == -1) { + return res; + } + write_count += res; + } + return read_count; +} +#endif + +static FdSource from(STDIN_FILENO); +static FdSink to(STDOUT_FILENO); + +/* Logger that forwards log messages to the client, *if* we're in a + state where the protocol allows it (i.e., when canSendStderr is + true). */ +struct TunnelLogger { + struct State { + bool canSendStderr = false; + std::vector<std::string> pendingMsgs; + }; + + Sync<State> state_; + + unsigned int clientVersion; + + explicit TunnelLogger(unsigned int clientVersion) + : clientVersion(clientVersion) {} + + void enqueueMsg(const std::string& s) { + auto state(state_.lock()); + + if (state->canSendStderr) { + assert(state->pendingMsgs.empty()); + try { + to(s); + to.flush(); + } catch (...) { + /* Write failed; that means that the other side is + gone. */ + state->canSendStderr = false; + throw; + } + } else { + state->pendingMsgs.push_back(s); + } + } + + void log(const FormatOrString& fs) { + StringSink buf; + buf << STDERR_NEXT << (fs.s + "\n"); + enqueueMsg(*buf.s); + } + + /* startWork() means that we're starting an operation for which we + want to send out stderr to the client. */ + void startWork() { + auto state(state_.lock()); + state->canSendStderr = true; + + for (auto& msg : state->pendingMsgs) { + to(msg); + } + + state->pendingMsgs.clear(); + + to.flush(); + } + + /* stopWork() means that we're done; stop sending stderr to the + client. */ + void stopWork(bool success = true, const std::string& msg = "", + unsigned int status = 0) { + auto state(state_.lock()); + + state->canSendStderr = false; + + if (success) { + to << STDERR_LAST; + } else { + to << STDERR_ERROR << msg; + if (status != 0) { + to << status; + } + } + } + + void startActivity(const std::string& s) { + DLOG(INFO) << "startActivity(" << s << ")"; + if (GET_PROTOCOL_MINOR(clientVersion) < 20) { + if (!s.empty()) { + LOG(INFO) << s; + } + return; + } + + StringSink buf; + buf << STDERR_START_ACTIVITY << s; + enqueueMsg(*buf.s); + } +}; + +struct TunnelSink : Sink { + Sink& to; + explicit TunnelSink(Sink& to) : to(to) {} + void operator()(const unsigned char* data, size_t len) override { + to << STDERR_WRITE; + writeString(data, len, to); + } +}; + +struct TunnelSource : BufferedSource { + Source& from; + explicit TunnelSource(Source& from) : from(from) {} + + protected: + size_t readUnbuffered(unsigned char* data, size_t len) override { + to << STDERR_READ << len; + to.flush(); + size_t n = readString(data, len, from); + if (n == 0) { + throw EndOfFile("unexpected end-of-file"); + } + return n; + } +}; + +/* If the NAR archive contains a single file at top-level, then save + the contents of the file to `s'. Otherwise barf. */ +struct RetrieveRegularNARSink : ParseSink { + bool regular{true}; + std::string s; + + RetrieveRegularNARSink() {} + + void createDirectory(const Path& path) override { regular = false; } + + void receiveContents(unsigned char* data, unsigned int len) override { + s.append((const char*)data, len); + } + + void createSymlink(const Path& path, const std::string& target) override { + regular = false; + } +}; + +static void performOp(TunnelLogger* logger, const ref<Store>& store, + bool trusted, unsigned int clientVersion, Source& from, + Sink& to, unsigned int op) { + switch (op) { + case wopIsValidPath: { + /* 'readStorePath' could raise an error leading to the connection + being closed. To be able to recover from an invalid path error, + call 'startWork' early, and do 'assertStorePath' afterwards so + that the 'Error' exception handler doesn't close the + connection. */ + Path path = readString(from); + logger->startWork(); + store->assertStorePath(path); + bool result = store->isValidPath(path); + logger->stopWork(); + to << static_cast<uint64_t>(result); + break; + } + + case wopQueryValidPaths: { + auto paths = readStorePaths<PathSet>(*store, from); + logger->startWork(); + PathSet res = store->queryValidPaths(paths); + logger->stopWork(); + to << res; + break; + } + + case wopHasSubstitutes: { + Path path = readStorePath(*store, from); + logger->startWork(); + PathSet res = store->querySubstitutablePaths({path}); + logger->stopWork(); + to << static_cast<uint64_t>(res.find(path) != res.end()); + break; + } + + case wopQuerySubstitutablePaths: { + auto paths = readStorePaths<PathSet>(*store, from); + logger->startWork(); + PathSet res = store->querySubstitutablePaths(paths); + logger->stopWork(); + to << res; + break; + } + + case wopQueryPathHash: { + Path path = readStorePath(*store, from); + logger->startWork(); + auto hash = store->queryPathInfo(path)->narHash; + logger->stopWork(); + to << hash.to_string(Base16, false); + break; + } + + case wopQueryReferences: + case wopQueryReferrers: + case wopQueryValidDerivers: + case wopQueryDerivationOutputs: { + Path path = readStorePath(*store, from); + logger->startWork(); + PathSet paths; + if (op == wopQueryReferences) { + paths = store->queryPathInfo(path)->references; + } else if (op == wopQueryReferrers) { + store->queryReferrers(path, paths); + } else if (op == wopQueryValidDerivers) { + paths = store->queryValidDerivers(path); + } else { + paths = store->queryDerivationOutputs(path); + } + logger->stopWork(); + to << paths; + break; + } + + case wopQueryDerivationOutputNames: { + Path path = readStorePath(*store, from); + logger->startWork(); + StringSet names; + names = store->queryDerivationOutputNames(path); + logger->stopWork(); + to << names; + break; + } + + case wopQueryDeriver: { + Path path = readStorePath(*store, from); + logger->startWork(); + auto deriver = store->queryPathInfo(path)->deriver; + logger->stopWork(); + to << deriver; + break; + } + + case wopQueryPathFromHashPart: { + std::string hashPart = readString(from); + logger->startWork(); + Path path = store->queryPathFromHashPart(hashPart); + logger->stopWork(); + to << path; + break; + } + + case wopAddToStore: { + bool fixed = false; + bool recursive = false; + std::string hashType; + std::string baseName; + from >> baseName >> fixed /* obsolete */ >> recursive >> hashType; + /* Compatibility hack. */ + if (!fixed) { + hashType = "sha256"; + recursive = true; + } + HashType hashAlgo = parseHashType(hashType); + + TeeSource savedNAR(from); + RetrieveRegularNARSink savedRegular; + + if (recursive) { + /* Get the entire NAR dump from the client and save it to + a string so that we can pass it to + addToStoreFromDump(). */ + ParseSink sink; /* null sink; just parse the NAR */ + parseDump(sink, savedNAR); + } else { + parseDump(savedRegular, from); + } + + logger->startWork(); + if (!savedRegular.regular) { + throw Error("regular file expected"); + } + + auto store2 = store.dynamic_pointer_cast<LocalStore>(); + if (!store2) { + throw Error("operation is only supported by LocalStore"); + } + + Path path = store2->addToStoreFromDump( + recursive ? *savedNAR.data : savedRegular.s, baseName, recursive, + hashAlgo); + logger->stopWork(); + + to << path; + break; + } + + case wopAddTextToStore: { + std::string suffix = readString(from); + std::string s = readString(from); + auto refs = readStorePaths<PathSet>(*store, from); + logger->startWork(); + Path path = store->addTextToStore(suffix, s, refs, NoRepair); + logger->stopWork(); + to << path; + break; + } + + case wopExportPath: { + Path path = readStorePath(*store, from); + readInt(from); // obsolete + logger->startWork(); + TunnelSink sink(to); + store->exportPath(path, sink); + logger->stopWork(); + to << 1; + break; + } + + case wopImportPaths: { + logger->startWork(); + TunnelSource source(from); + Paths paths = store->importPaths(source, nullptr, + trusted ? NoCheckSigs : CheckSigs); + logger->stopWork(); + to << paths; + break; + } + + case wopBuildPaths: { + auto drvs = readStorePaths<PathSet>(*store, from); + BuildMode mode = bmNormal; + if (GET_PROTOCOL_MINOR(clientVersion) >= 15) { + mode = (BuildMode)readInt(from); + + /* Repairing is not atomic, so disallowed for "untrusted" + clients. */ + if (mode == bmRepair && !trusted) { + throw Error( + "repairing is not allowed because you are not in " + "'trusted-users'"); + } + } + logger->startWork(); + store->buildPaths(drvs, mode); + logger->stopWork(); + to << 1; + break; + } + + case wopBuildDerivation: { + Path drvPath = readStorePath(*store, from); + BasicDerivation drv; + readDerivation(from, *store, drv); + auto buildMode = (BuildMode)readInt(from); + logger->startWork(); + if (!trusted) { + throw Error("you are not privileged to build derivations"); + } + auto res = store->buildDerivation(drvPath, drv, buildMode); + logger->stopWork(); + to << res.status << res.errorMsg; + break; + } + + case wopEnsurePath: { + Path path = readStorePath(*store, from); + logger->startWork(); + store->ensurePath(path); + logger->stopWork(); + to << 1; + break; + } + + case wopAddTempRoot: { + Path path = readStorePath(*store, from); + logger->startWork(); + store->addTempRoot(path); + logger->stopWork(); + to << 1; + break; + } + + case wopAddIndirectRoot: { + Path path = absPath(readString(from)); + logger->startWork(); + store->addIndirectRoot(path); + logger->stopWork(); + to << 1; + break; + } + + case wopSyncWithGC: { + logger->startWork(); + store->syncWithGC(); + logger->stopWork(); + to << 1; + break; + } + + case wopFindRoots: { + logger->startWork(); + Roots roots = store->findRoots(!trusted); + logger->stopWork(); + + size_t size = 0; + for (auto& i : roots) { + size += i.second.size(); + } + + to << size; + + for (auto& [target, links] : roots) { + for (auto& link : links) { + to << link << target; + } + } + + break; + } + + case wopCollectGarbage: { + GCOptions options; + options.action = (GCOptions::GCAction)readInt(from); + options.pathsToDelete = readStorePaths<PathSet>(*store, from); + from >> options.ignoreLiveness >> options.maxFreed; + // obsolete fields + readInt(from); + readInt(from); + readInt(from); + + GCResults results; + + logger->startWork(); + if (options.ignoreLiveness) { + throw Error("you are not allowed to ignore liveness"); + } + store->collectGarbage(options, results); + logger->stopWork(); + + to << results.paths << results.bytesFreed << 0 /* obsolete */; + + break; + } + + case wopSetOptions: { + settings.keepFailed = readInt(from) != 0u; + settings.keepGoing = readInt(from) != 0u; + settings.tryFallback = readInt(from) != 0u; + readInt(from); // obsolete verbosity + settings.maxBuildJobs.assign(readInt(from)); + settings.maxSilentTime = readInt(from); + readInt(from); // obsolete useBuildHook + settings.verboseBuild = 0 == readInt(from); + readInt(from); // obsolete logType + readInt(from); // obsolete printBuildTrace + settings.buildCores = readInt(from); + settings.useSubstitutes = readInt(from) != 0u; + + StringMap overrides; + if (GET_PROTOCOL_MINOR(clientVersion) >= 12) { + unsigned int n = readInt(from); + for (unsigned int i = 0; i < n; i++) { + std::string name = readString(from); + std::string value = readString(from); + overrides.emplace(name, value); + } + } + + logger->startWork(); + + for (auto& i : overrides) { + auto& name(i.first); + auto& value(i.second); + + auto setSubstituters = [&](Setting<Strings>& res) { + if (name != res.name && res.aliases.count(name) == 0) { + return false; + } + StringSet trusted = settings.trustedSubstituters; + for (auto& s : settings.substituters.get()) { + trusted.insert(s); + } + Strings subs; + Strings ss = absl::StrSplit(value, absl::ByAnyChar(" \t\n\r"), + absl::SkipEmpty()); + for (auto& s : ss) { + if (trusted.count(s) != 0u) { + subs.push_back(s); + } else { + LOG(WARNING) << "ignoring untrusted substituter '" << s << "'"; + } + } + res = subs; + return true; + }; + + try { + if (name == "ssh-auth-sock") { // obsolete + ; + } else if (trusted || name == settings.buildTimeout.name || + name == "connect-timeout" || + (name == "builders" && value.empty())) { + settings.set(name, value); + } else if (setSubstituters(settings.substituters)) { + ; + } else if (setSubstituters(settings.extraSubstituters)) { + ; + } else { + LOG(WARNING) << "ignoring the user-specified setting '" << name + << "', because it is a " + << "restricted setting and you are not a trusted user"; + } + } catch (UsageError& e) { + LOG(WARNING) << e.what(); + } + } + + logger->stopWork(); + break; + } + + case wopQuerySubstitutablePathInfo: { + Path path = absPath(readString(from)); + logger->startWork(); + SubstitutablePathInfos infos; + store->querySubstitutablePathInfos({path}, infos); + logger->stopWork(); + auto i = infos.find(path); + if (i == infos.end()) { + to << 0; + } else { + to << 1 << i->second.deriver << i->second.references + << i->second.downloadSize << i->second.narSize; + } + break; + } + + case wopQuerySubstitutablePathInfos: { + auto paths = readStorePaths<PathSet>(*store, from); + logger->startWork(); + SubstitutablePathInfos infos; + store->querySubstitutablePathInfos(paths, infos); + logger->stopWork(); + to << infos.size(); + for (auto& i : infos) { + to << i.first << i.second.deriver << i.second.references + << i.second.downloadSize << i.second.narSize; + } + break; + } + + case wopQueryAllValidPaths: { + logger->startWork(); + PathSet paths = store->queryAllValidPaths(); + logger->stopWork(); + to << paths; + break; + } + + case wopQueryPathInfo: { + Path path = readStorePath(*store, from); + std::shared_ptr<const ValidPathInfo> info; + logger->startWork(); + try { + info = store->queryPathInfo(path); + } catch (InvalidPath&) { + if (GET_PROTOCOL_MINOR(clientVersion) < 17) { + throw; + } + } + logger->stopWork(); + if (info) { + if (GET_PROTOCOL_MINOR(clientVersion) >= 17) { + to << 1; + } + to << info->deriver << info->narHash.to_string(Base16, false) + << info->references << info->registrationTime << info->narSize; + if (GET_PROTOCOL_MINOR(clientVersion) >= 16) { + to << static_cast<uint64_t>(info->ultimate) << info->sigs << info->ca; + } + } else { + assert(GET_PROTOCOL_MINOR(clientVersion) >= 17); + to << 0; + } + break; + } + + case wopOptimiseStore: { + logger->startWork(); + store->optimiseStore(); + logger->stopWork(); + to << 1; + break; + } + + case wopVerifyStore: { + bool checkContents; + bool repair; + from >> checkContents >> repair; + logger->startWork(); + if (repair && !trusted) { + throw Error("you are not privileged to repair paths"); + } + bool errors = store->verifyStore(checkContents, (RepairFlag)repair); + logger->stopWork(); + to << static_cast<uint64_t>(errors); + break; + } + + case wopAddSignatures: { + Path path = readStorePath(*store, from); + auto sigs = readStrings<StringSet>(from); + logger->startWork(); + if (!trusted) { + throw Error("you are not privileged to add signatures"); + } + store->addSignatures(path, sigs); + logger->stopWork(); + to << 1; + break; + } + + case wopNarFromPath: { + auto path = readStorePath(*store, from); + logger->startWork(); + logger->stopWork(); + dumpPath(path, to); + break; + } + + case wopAddToStoreNar: { + bool repair; + bool dontCheckSigs; + ValidPathInfo info; + info.path = readStorePath(*store, from); + from >> info.deriver; + if (!info.deriver.empty()) { + store->assertStorePath(info.deriver); + } + info.narHash = Hash(readString(from), htSHA256); + info.references = readStorePaths<PathSet>(*store, from); + from >> info.registrationTime >> info.narSize >> info.ultimate; + info.sigs = readStrings<StringSet>(from); + from >> info.ca >> repair >> dontCheckSigs; + if (!trusted && dontCheckSigs) { + dontCheckSigs = false; + } + if (!trusted) { + info.ultimate = false; + } + + std::string saved; + std::unique_ptr<Source> source; + if (GET_PROTOCOL_MINOR(clientVersion) >= 21) { + source = std::make_unique<TunnelSource>(from); + } else { + TeeSink tee(from); + parseDump(tee, tee.source); + saved = std::move(*tee.source.data); + source = std::make_unique<StringSource>(saved); + } + + logger->startWork(); + + // FIXME: race if addToStore doesn't read source? + store->addToStore(info, *source, (RepairFlag)repair, + dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr); + + logger->stopWork(); + break; + } + + case wopQueryMissing: { + auto targets = readStorePaths<PathSet>(*store, from); + logger->startWork(); + PathSet willBuild; + PathSet willSubstitute; + PathSet unknown; + unsigned long long downloadSize; + unsigned long long narSize; + store->queryMissing(targets, willBuild, willSubstitute, unknown, + downloadSize, narSize); + logger->stopWork(); + to << willBuild << willSubstitute << unknown << downloadSize << narSize; + break; + } + + default: + throw Error(format("invalid operation %1%") % op); + } +} + +static void processConnection(bool trusted, const std::string& userName, + uid_t userId) { + MonitorFdHup monitor(from.fd); + + /* Exchange the greeting. */ + unsigned int magic = readInt(from); + if (magic != WORKER_MAGIC_1) { + throw Error("protocol mismatch"); + } + to << WORKER_MAGIC_2 << PROTOCOL_VERSION; + to.flush(); + unsigned int clientVersion = readInt(from); + + if (clientVersion < 0x10a) { + throw Error("the Nix client version is too old"); + } + + auto tunnelLogger = new TunnelLogger(clientVersion); + // logger = tunnelLogger; + + unsigned int opCount = 0; + + Finally finally([&]() { + _isInterrupted = false; + DLOG(INFO) << opCount << " operations"; + }); + + if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && (readInt(from) != 0u)) { + setAffinityTo(readInt(from)); + } + + readInt(from); // obsolete reserveSpace + + /* Send startup error messages to the client. */ + tunnelLogger->startWork(); + + try { + /* If we can't accept clientVersion, then throw an error + *here* (not above). */ + +#if 0 + /* Prevent users from doing something very dangerous. */ + if (geteuid() == 0 && + querySetting("build-users-group", "") == "") + throw Error("if you run 'nix-daemon' as root, then you MUST set 'build-users-group'!"); +#endif + + /* Open the store. */ + Store::Params params; // FIXME: get params from somewhere + // Disable caching since the client already does that. + params["path-info-cache-size"] = "0"; + auto store = openStore(settings.storeUri, params); + + store->createUser(userName, userId); + + tunnelLogger->stopWork(); + to.flush(); + + /* Process client requests. */ + while (true) { + WorkerOp op; + try { + op = (WorkerOp)readInt(from); + } catch (Interrupted& e) { + break; + } catch (EndOfFile& e) { + break; + } + + opCount++; + + try { + performOp(tunnelLogger, store, trusted, clientVersion, from, to, op); + } catch (Error& e) { + /* If we're not in a state where we can send replies, then + something went wrong processing the input of the + client. This can happen especially if I/O errors occur + during addTextToStore() / importPath(). If that + happens, just send the error message and exit. */ + bool errorAllowed = tunnelLogger->state_.lock()->canSendStderr; + tunnelLogger->stopWork(false, e.msg(), e.status); + if (!errorAllowed) { + throw; + } + } catch (std::bad_alloc& e) { + tunnelLogger->stopWork(false, "Nix daemon out of memory", 1); + throw; + } + + to.flush(); + + assert(!tunnelLogger->state_.lock()->canSendStderr); + }; + + } catch (std::exception& e) { + tunnelLogger->stopWork(false, e.what(), 1); + to.flush(); + return; + } +} + +static void sigChldHandler(int sigNo) { + // Ensure we don't modify errno of whatever we've interrupted + auto saved_errno = errno; + /* Reap all dead children. */ + while (waitpid(-1, nullptr, WNOHANG) > 0) { + ; + } + errno = saved_errno; +} + +static void setSigChldAction(bool autoReap) { + struct sigaction act; + struct sigaction oact; + act.sa_handler = autoReap ? sigChldHandler : SIG_DFL; + sigfillset(&act.sa_mask); + act.sa_flags = 0; + if (sigaction(SIGCHLD, &act, &oact) != 0) { + throw SysError("setting SIGCHLD handler"); + } +} + +bool matchUser(const std::string& user, const std::string& group, + const Strings& users) { + if (find(users.begin(), users.end(), "*") != users.end()) { + return true; + } + + if (find(users.begin(), users.end(), user) != users.end()) { + return true; + } + + for (auto& i : users) { + if (std::string(i, 0, 1) == "@") { + if (group == std::string(i, 1)) { + return true; + } + struct group* gr = getgrnam(i.c_str() + 1); + if (gr == nullptr) { + continue; + } + for (char** mem = gr->gr_mem; *mem != nullptr; mem++) { + if (user == std::string(*mem)) { + return true; + } + } + } + } + + return false; +} + +struct PeerInfo { + bool pidKnown; + pid_t pid; + bool uidKnown; + uid_t uid; + bool gidKnown; + gid_t gid; +}; + +/* Get the identity of the caller, if possible. */ +static PeerInfo getPeerInfo(int remote) { + PeerInfo peer = {false, 0, false, 0, false, 0}; + +#if defined(SO_PEERCRED) + + ucred cred; + socklen_t credLen = sizeof(cred); + if (getsockopt(remote, SOL_SOCKET, SO_PEERCRED, &cred, &credLen) == -1) { + throw SysError("getting peer credentials"); + } + peer = {true, cred.pid, true, cred.uid, true, cred.gid}; + +#elif defined(LOCAL_PEERCRED) + +#if !defined(SOL_LOCAL) +#define SOL_LOCAL 0 +#endif + + xucred cred; + socklen_t credLen = sizeof(cred); + if (getsockopt(remote, SOL_LOCAL, LOCAL_PEERCRED, &cred, &credLen) == -1) + throw SysError("getting peer credentials"); + peer = {false, 0, true, cred.cr_uid, false, 0}; + +#endif + + return peer; +} + +#define SD_LISTEN_FDS_START 3 + +static void daemonLoop(char** argv) { + if (chdir("/") == -1) { + throw SysError("cannot change current directory"); + } + + /* Get rid of children automatically; don't let them become + zombies. */ + setSigChldAction(true); + + AutoCloseFD fdSocket; + + /* Handle socket-based activation by systemd. */ + if (!getEnv("LISTEN_FDS").empty()) { + if (getEnv("LISTEN_PID") != std::to_string(getpid()) || + getEnv("LISTEN_FDS") != "1") { + throw Error("unexpected systemd environment variables"); + } + fdSocket = SD_LISTEN_FDS_START; + } + + /* Otherwise, create and bind to a Unix domain socket. */ + else { + /* Create and bind to a Unix domain socket. */ + fdSocket = socket(PF_UNIX, SOCK_STREAM, 0); + if (!fdSocket) { + throw SysError("cannot create Unix domain socket"); + } + + std::string socketPath = settings.nixDaemonSocketFile; + + createDirs(dirOf(socketPath)); + + /* Urgh, sockaddr_un allows path names of only 108 characters. + So chdir to the socket directory so that we can pass a + relative path name. */ + if (chdir(dirOf(socketPath).c_str()) == -1) { + throw SysError("cannot change current directory"); + } + Path socketPathRel = "./" + baseNameOf(socketPath); + + struct sockaddr_un addr; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, socketPathRel.c_str(), sizeof(addr.sun_path)); + if (addr.sun_path[sizeof(addr.sun_path) - 1] != '\0') { + throw Error(format("socket path '%1%' is too long") % socketPathRel); + } + + unlink(socketPath.c_str()); + + /* Make sure that the socket is created with 0666 permission + (everybody can connect --- provided they have access to the + directory containing the socket). */ + mode_t oldMode = umask(0111); + int res = bind(fdSocket.get(), (struct sockaddr*)&addr, sizeof(addr)); + umask(oldMode); + if (res == -1) { + throw SysError(format("cannot bind to socket '%1%'") % socketPath); + } + + if (chdir("/") == -1) { /* back to the root */ + throw SysError("cannot change current directory"); + } + + if (listen(fdSocket.get(), 5) == -1) { + throw SysError(format("cannot listen on socket '%1%'") % socketPath); + } + } + + closeOnExec(fdSocket.get()); + + /* Loop accepting connections. */ + while (true) { + try { + /* Accept a connection. */ + struct sockaddr_un remoteAddr; + socklen_t remoteAddrLen = sizeof(remoteAddr); + + AutoCloseFD remote = + accept(fdSocket.get(), (struct sockaddr*)&remoteAddr, &remoteAddrLen); + checkInterrupt(); + if (!remote) { + if (errno == EINTR) { + continue; + } + throw SysError("accepting connection"); + } + + closeOnExec(remote.get()); + + bool trusted = false; + PeerInfo peer = getPeerInfo(remote.get()); + + struct passwd* pw = peer.uidKnown ? getpwuid(peer.uid) : nullptr; + std::string user = pw != nullptr ? pw->pw_name : std::to_string(peer.uid); + + struct group* gr = peer.gidKnown ? getgrgid(peer.gid) : nullptr; + std::string group = + gr != nullptr ? gr->gr_name : std::to_string(peer.gid); + + Strings trustedUsers = settings.trustedUsers; + Strings allowedUsers = settings.allowedUsers; + + if (matchUser(user, group, trustedUsers)) { + trusted = true; + } + + if ((!trusted && !matchUser(user, group, allowedUsers)) || + group == settings.buildUsersGroup) { + throw Error( + format("user '%1%' is not allowed to connect to the Nix daemon") % + user); + } + + LOG(INFO) << "accepted connection from pid " + << (peer.pidKnown ? std::to_string(peer.pid) : "<unknown>") + << ", user " << (peer.uidKnown ? user : "<unknown>") + << (trusted ? " (trusted)" : ""); + + /* Fork a child to handle the connection. */ + ProcessOptions options; + options.errorPrefix = "unexpected Nix daemon error: "; + options.dieWithParent = false; + options.runExitHandlers = true; + startProcess( + [&]() { + fdSocket = -1; + + /* Background the daemon. */ + if (setsid() == -1) { + throw SysError(format("creating a new session")); + } + + /* Restore normal handling of SIGCHLD. */ + setSigChldAction(false); + + /* For debugging, stuff the pid into argv[1]. */ + if (peer.pidKnown && (argv[1] != nullptr)) { + std::string processName = std::to_string(peer.pid); + strncpy(argv[1], processName.c_str(), strlen(argv[1])); + } + + /* Handle the connection. */ + from.fd = remote.get(); + to.fd = remote.get(); + processConnection(trusted, user, peer.uid); + + exit(0); + }, + options); + + } catch (Interrupted& e) { + return; + } catch (Error& e) { + LOG(ERROR) << "error processing connection: " << e.msg(); + } + } +} + +static int _main(int argc, char** argv) { + { + auto stdio = false; + + parseCmdLine(argc, argv, + [&](Strings::iterator& arg, const Strings::iterator& end) { + if (*arg == "--daemon") { + ; /* ignored for backwards compatibility */ + } else if (*arg == "--help") { + showManPage("nix-daemon"); + } else if (*arg == "--version") { + printVersion("nix-daemon"); + } else if (*arg == "--stdio") { + stdio = true; + } else { + return false; + } + return true; + }); + + if (stdio) { + if (getStoreType() == tDaemon) { + /* Forward on this connection to the real daemon */ + auto socketPath = settings.nixDaemonSocketFile; + auto s = socket(PF_UNIX, SOCK_STREAM, 0); + if (s == -1) { + throw SysError("creating Unix domain socket"); + } + + auto socketDir = dirOf(socketPath); + if (chdir(socketDir.c_str()) == -1) { + throw SysError(format("changing to socket directory '%1%'") % + socketDir); + } + + auto socketName = baseNameOf(socketPath); + auto addr = sockaddr_un{}; + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, socketName.c_str(), sizeof(addr.sun_path)); + if (addr.sun_path[sizeof(addr.sun_path) - 1] != '\0') { + throw Error(format("socket name %1% is too long") % socketName); + } + + if (connect(s, (struct sockaddr*)&addr, sizeof(addr)) == -1) { + throw SysError(format("cannot connect to daemon at %1%") % + socketPath); + } + + auto nfds = (s > STDIN_FILENO ? s : STDIN_FILENO) + 1; + while (true) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(s, &fds); + FD_SET(STDIN_FILENO, &fds); + if (select(nfds, &fds, nullptr, nullptr, nullptr) == -1) { + throw SysError("waiting for data from client or server"); + } + if (FD_ISSET(s, &fds)) { + auto res = splice(s, nullptr, STDOUT_FILENO, nullptr, SSIZE_MAX, + SPLICE_F_MOVE); + if (res == -1) { + throw SysError("splicing data from daemon socket to stdout"); + } + if (res == 0) { + throw EndOfFile("unexpected EOF from daemon socket"); + } + } + if (FD_ISSET(STDIN_FILENO, &fds)) { + auto res = splice(STDIN_FILENO, nullptr, s, nullptr, SSIZE_MAX, + SPLICE_F_MOVE); + if (res == -1) { + throw SysError("splicing data from stdin to daemon socket"); + } + if (res == 0) { + return 0; + } + } + } + } else { + processConnection(true, "root", 0); + } + } else { + daemonLoop(argv); + } + + return 0; + } +} + +static RegisterLegacyCommand s1("nix-daemon", _main); diff --git a/third_party/nix/src/nix-daemon/nix-daemon-proto.cc b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc new file mode 100644 index 000000000000..d6498e77c241 --- /dev/null +++ b/third_party/nix/src/nix-daemon/nix-daemon-proto.cc @@ -0,0 +1,799 @@ +#include "nix-daemon-proto.hh" + +#include <filesystem> +#include <ostream> +#include <sstream> +#include <streambuf> +#include <string> + +#include <absl/strings/str_cat.h> +#include <absl/strings/str_format.h> +#include <google/protobuf/empty.pb.h> +#include <google/protobuf/util/time_util.h> +#include <grpcpp/impl/codegen/server_context.h> +#include <grpcpp/impl/codegen/status.h> +#include <grpcpp/impl/codegen/status_code_enum.h> +#include <grpcpp/impl/codegen/sync_stream.h> + +#include "libmain/shared.hh" +#include "libproto/worker.grpc.pb.h" +#include "libproto/worker.pb.h" +#include "libstore/derivations.hh" +#include "libstore/local-store.hh" +#include "libstore/store-api.hh" +#include "libutil/archive.hh" +#include "libutil/hash.hh" +#include "libutil/proto.hh" +#include "libutil/serialise.hh" +#include "libutil/types.hh" + +namespace nix::daemon { + +using ::google::protobuf::util::TimeUtil; +using ::grpc::Status; +using ::nix::proto::PathInfo; +using ::nix::proto::StorePath; +using ::nix::proto::StorePaths; +using ::nix::proto::WorkerService; + +template <typename Request> +class RPCSource final : public Source { + public: + using Reader = grpc::ServerReader<Request>; + explicit RPCSource(Reader* reader) : reader_(reader) {} + + size_t read(unsigned char* data, size_t len) override { + auto got = buffer_.sgetn(reinterpret_cast<char*>(data), len); + if (got < len) { + Request msg; + if (!reader_->Read(&msg)) { + return got; + } + if (msg.add_oneof_case() != Request::kData) { + // TODO(grfn): Make Source::read return a StatusOr and get rid of this + // throw + throw Error( + "Invalid AddToStoreRequest: all messages except the first must " + "contain data"); + } + buffer_.sputn(msg.data().data(), msg.data().length()); + return got + read(data + got, len - got); + } + return got; + }; + + private: + std::stringbuf buffer_; + Reader* reader_; +}; + +// TODO(grfn): Make this some sort of pipe so we don't have to store data in +// memory +/* If the NAR archive contains a single file at top-level, then save + the contents of the file to `s'. Otherwise barf. */ +struct RetrieveRegularNARSink : ParseSink { + bool regular{true}; + std::string s; + + RetrieveRegularNARSink() {} + + void createDirectory(const Path& path) override { regular = false; } + + void receiveContents(unsigned char* data, unsigned int len) override { + s.append(reinterpret_cast<const char*>(data), len); + } + + void createSymlink(const Path& path, const std::string& target) override { + regular = false; + } +}; + +#define ASSERT_INPUT_STORE_PATH(path) \ + if (!store_->isStorePath(path)) { \ + return Status(grpc::StatusCode::INVALID_ARGUMENT, \ + absl::StrFormat("path '%s' is not in the Nix store", path)); \ + } + +class BuildLogStreambuf final : public std::streambuf { + public: + using Writer = grpc::ServerWriter<nix::proto::BuildEvent>; + explicit BuildLogStreambuf(Writer* writer) : writer_(writer) {} + + // TODO(grfn): buffer with a timeout so we don't have too many messages + std::streamsize xsputn(const char_type* s, std::streamsize n) override { + nix::proto::BuildEvent event; + event.mutable_build_log()->set_line(s, n); + writer_->Write(event); + return n; + } + + int_type overflow(int_type ch) override { + if (ch != traits_type::eof()) { + nix::proto::BuildEvent event; + event.mutable_build_log()->set_line(std::string(1, ch)); + writer_->Write(event); + } + return ch; + } + + private: + Writer* writer_{}; +}; + +class WorkerServiceImpl final : public WorkerService::Service { + public: + WorkerServiceImpl(nix::Store& store) : store_(&store) {} + + Status IsValidPath(grpc::ServerContext* context, const StorePath* request, + nix::proto::IsValidPathResponse* response) override { + return HandleExceptions( + [&]() -> Status { + const auto& path = request->path(); + response->set_is_valid(store_->isValidPath(path)); + + return Status::OK; + }, + __FUNCTION__); + } + + Status HasSubstitutes(grpc::ServerContext* context, const StorePath* request, + nix::proto::HasSubstitutesResponse* response) override { + return HandleExceptions( + [&]() -> Status { + const auto& path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + PathSet res = store_->querySubstitutablePaths({path}); + response->set_has_substitutes(res.find(path) != res.end()); + + return Status::OK; + }, + __FUNCTION__); + } + + Status QueryReferrers(grpc::ServerContext* context, const StorePath* request, + StorePaths* response) override { + return HandleExceptions( + [&]() -> Status { + const auto& path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + + PathSet paths; + store_->queryReferrers(path, paths); + + for (const auto& path : paths) { + response->add_paths(path); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status AddToStore(grpc::ServerContext* context, + grpc::ServerReader<nix::proto::AddToStoreRequest>* reader, + nix::proto::StorePath* response) override { + return HandleExceptions( + [&]() -> Status { + proto::AddToStoreRequest metadata_request; + auto has_metadata = reader->Read(&metadata_request); + + if (!has_metadata || !metadata_request.has_meta()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Metadata must be set before sending file content"); + } + + auto meta = metadata_request.meta(); + RPCSource source(reader); + auto opt_hash_type = hash_type_from(meta.hash_type()); + if (!opt_hash_type) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Invalid hash type"); + } + + std::string* data; + RetrieveRegularNARSink nar; + TeeSource saved_nar(source); + + if (meta.recursive()) { + // TODO(grfn): Don't store the full data in memory, instead just + // make addToStoreFromDump take a Source + ParseSink sink; + parseDump(sink, saved_nar); + data = &(*saved_nar.data); + } else { + parseDump(nar, source); + if (!nar.regular) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Regular file expected"); + } + data = &nar.s; + } + + auto local_store = store_.dynamic_pointer_cast<LocalStore>(); + if (!local_store) { + return Status(grpc::StatusCode::FAILED_PRECONDITION, + "operation is only supported by LocalStore"); + } + + auto path = local_store->addToStoreFromDump( + *data, meta.base_name(), meta.recursive(), opt_hash_type.value()); + + response->set_path(path); + + return Status::OK; + }, + __FUNCTION__); + } + + Status AddToStoreNar( + grpc::ServerContext* context, + grpc::ServerReader<nix::proto::AddToStoreNarRequest>* reader, + google::protobuf::Empty*) override { + return HandleExceptions( + [&]() -> Status { + proto::AddToStoreNarRequest path_info_request; + auto has_path_info = reader->Read(&path_info_request); + if (!has_path_info || !path_info_request.has_path_info()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Path info must be set before sending nar content"); + } + + auto path_info = path_info_request.path_info(); + + ValidPathInfo info; + info.path = path_info.path().path(); + info.deriver = path_info.deriver().path(); + + if (!info.deriver.empty()) { + ASSERT_INPUT_STORE_PATH(info.deriver); + } + + auto nar_hash = Hash::deserialize(path_info.nar_hash(), htSHA256); + + if (!nar_hash.ok()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + std::string(nar_hash.status().message())); + } + + info.narHash = *nar_hash; + for (const auto& ref : path_info.references()) { + info.references.insert(ref); + } + info.registrationTime = + TimeUtil::TimestampToTimeT(path_info.registration_time()); + info.narSize = path_info.nar_size(); + info.ultimate = path_info.ultimate(); + for (const auto& sig : path_info.sigs()) { + info.sigs.insert(sig); + } + info.ca = path_info.ca(); + + auto repair = path_info.repair(); + auto check_sigs = path_info.check_sigs(); + + std::string saved; + RPCSource source(reader); + store_->addToStore(info, source, repair ? Repair : NoRepair, + check_sigs ? CheckSigs : NoCheckSigs, nullptr); + + return Status::OK; + }, + __FUNCTION__); + } + + Status AddTextToStore( + grpc::ServerContext*, + grpc::ServerReader<nix::proto::AddTextToStoreRequest>* reader, + nix::proto::StorePath* response) override { + return HandleExceptions( + [&]() -> Status { + proto::AddTextToStoreRequest request; + auto has_metadata = reader->Read(&request); + if (!has_metadata || !request.has_meta()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Metadata must be set before sending content"); + } + + proto::AddTextToStoreRequest_Metadata meta = request.meta(); + + PathSet references; + for (const auto& ref : meta.references()) { + references.insert(ref); + } + + std::string content; + content.reserve(meta.size()); + while (reader->Read(&request)) { + if (request.add_oneof_case() != request.kData) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "All requests except the first must contain data"); + } + + content.append(request.data()); + } + + auto path = store_->addTextToStore(meta.name(), content, references); + response->set_path(path); + return Status::OK; + }, + __FUNCTION__); + } + + Status BuildPaths( + grpc::ServerContext*, const nix::proto::BuildPathsRequest* request, + grpc::ServerWriter<nix::proto::BuildEvent>* writer) override { + return HandleExceptions( + [&]() -> Status { + PathSet drvs; + for (const auto& drv : request->drvs()) { + drvs.insert(drv); + } + auto mode = BuildModeFrom(request->mode()); + + if (!mode.has_value()) { + return Status(grpc::StatusCode::INTERNAL, "Invalid build mode"); + } + + BuildLogStreambuf log_buffer(writer); + std::ostream log_sink(&log_buffer); + + // TODO(grfn): If mode is repair and not trusted, we need to return an + // error here (but we can't yet because we don't know anything about + // trusted users) + return nix::util::proto::AbslToGRPCStatus( + store_->buildPaths(log_sink, drvs, mode.value())); + }, + __FUNCTION__); + } + + Status EnsurePath(grpc::ServerContext* context, + const nix::proto::StorePath* request, + google::protobuf::Empty*) override { + auto path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + return HandleExceptions( + [&]() -> Status { + store_->ensurePath(path); + return Status::OK; + }, + __FUNCTION__); + } + + Status AddTempRoot(grpc::ServerContext*, const nix::proto::StorePath* request, + google::protobuf::Empty*) override { + auto path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + + return HandleExceptions( + [&]() -> Status { + store_->addTempRoot(path); + return Status::OK; + }, + __FUNCTION__); + } + + Status AddIndirectRoot(grpc::ServerContext*, + const nix::proto::StorePath* request, + google::protobuf::Empty*) override { + auto path = std::filesystem::canonical(request->path()); + ASSERT_INPUT_STORE_PATH(path); + + return HandleExceptions( + [&]() -> Status { + store_->addIndirectRoot(path); + return Status::OK; + }, + __FUNCTION__); + } + + Status SyncWithGC(grpc::ServerContext*, const google::protobuf::Empty*, + google::protobuf::Empty*) override { + return HandleExceptions( + [&]() -> Status { + store_->syncWithGC(); + return Status::OK; + }, + __FUNCTION__); + } + + Status FindRoots(grpc::ServerContext*, const google::protobuf::Empty*, + nix::proto::FindRootsResponse* response) override { + return HandleExceptions( + [&]() -> Status { + auto roots = store_->findRoots(false); + for (const auto& [target, links] : roots) { + StorePaths link_paths; + for (const auto& link : links) { + link_paths.add_paths(link); + } + response->mutable_roots()->insert({target, link_paths}); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status CollectGarbage(grpc::ServerContext*, + const proto::CollectGarbageRequest* request, + proto::CollectGarbageResponse* response) override { + return HandleExceptions( + [&]() -> Status { + GCOptions options; + auto action = GCActionFromProto(request->action()); + if (!action.has_value()) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Invalid GC action"); + } + + options.action = action.value(); + for (const auto& path : request->paths_to_delete()) { + options.pathsToDelete.insert(path); + } + options.ignoreLiveness = request->ignore_liveness(); + options.maxFreed = request->max_freed(); + + if (options.ignoreLiveness) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "you are not allowed to ignore liveness"); + } + + GCResults results; + store_->collectGarbage(options, results); + + for (const auto& path : results.paths) { + response->add_deleted_paths(path); + } + response->set_bytes_freed(results.bytesFreed); + + return Status::OK; + }, + __FUNCTION__); + } + + Status QuerySubstitutablePathInfos( + grpc::ServerContext*, const StorePaths* request, + nix::proto::SubstitutablePathInfos* response) override { + return HandleExceptions( + [&]() -> Status { + SubstitutablePathInfos infos; + PathSet paths; + for (const auto& path : request->paths()) { + paths.insert(path); + } + store_->querySubstitutablePathInfos(paths, infos); + for (const auto& [path, path_info] : infos) { + auto proto_path_info = response->add_path_infos(); + proto_path_info->mutable_path()->set_path(path); + proto_path_info->mutable_deriver()->set_path(path_info.deriver); + for (const auto& ref : path_info.references) { + proto_path_info->add_references(ref); + } + proto_path_info->set_download_size(path_info.downloadSize); + proto_path_info->set_nar_size(path_info.narSize); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status QueryValidDerivers(grpc::ServerContext* context, + const StorePath* request, + StorePaths* response) override { + return HandleExceptions( + [&]() -> Status { + const auto& path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + + PathSet paths = store_->queryValidDerivers(path); + + for (const auto& path : paths) { + response->add_paths(path); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status QueryDerivationOutputs(grpc::ServerContext* context, + const StorePath* request, + StorePaths* response) override { + return HandleExceptions( + [&]() -> Status { + const auto& path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + + PathSet paths = store_->queryDerivationOutputs(path); + + for (const auto& path : paths) { + response->add_paths(path); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status QueryAllValidPaths(grpc::ServerContext* context, + const google::protobuf::Empty* request, + StorePaths* response) override { + return HandleExceptions( + [&]() -> Status { + const auto paths = store_->queryAllValidPaths(); + for (const auto& path : paths) { + response->add_paths(path); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status QueryPathInfo(grpc::ServerContext* context, const StorePath* request, + PathInfo* response) override { + return HandleExceptions( + [&]() -> Status { + auto path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + + response->mutable_path()->set_path(path); + try { + auto info = store_->queryPathInfo(path); + response->mutable_deriver()->set_path(info->deriver); + response->set_nar_hash( + reinterpret_cast<const char*>(&info->narHash.hash[0]), + info->narHash.hashSize); + + for (const auto& reference : info->references) { + response->add_references(reference); + } + + *response->mutable_registration_time() = + google::protobuf::util::TimeUtil::TimeTToTimestamp( + info->registrationTime); + + response->set_nar_size(info->narSize); + response->set_ultimate(info->ultimate); + + for (const auto& sig : info->sigs) { + response->add_sigs(sig); + } + + response->set_ca(info->ca); + + return Status::OK; + } catch (InvalidPath& e) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, e.msg()); + } + }, + __FUNCTION__); + } + + Status QueryDerivationOutputNames( + grpc::ServerContext* context, const StorePath* request, + nix::proto::DerivationOutputNames* response) override { + return HandleExceptions( + [&]() -> Status { + auto path = request->path(); + ASSERT_INPUT_STORE_PATH(path); + auto names = store_->queryDerivationOutputNames(path); + for (const auto& name : names) { + response->add_names(name); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status QueryPathFromHashPart(grpc::ServerContext* context, + const nix::proto::HashPart* request, + StorePath* response) override { + return HandleExceptions( + [&]() -> Status { + auto hash_part = request->hash_part(); + auto path = store_->queryPathFromHashPart(hash_part); + ASSERT_INPUT_STORE_PATH(path); + response->set_path(path); + return Status::OK; + }, + __FUNCTION__); + } + + Status QueryValidPaths(grpc::ServerContext* context, + const StorePaths* request, + StorePaths* response) override { + return HandleExceptions( + [&]() -> Status { + std::set<Path> paths; + for (const auto& path : request->paths()) { + ASSERT_INPUT_STORE_PATH(path); + paths.insert(path); + } + + auto res = store_->queryValidPaths(paths); + + for (const auto& path : res) { + response->add_paths(path); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status QuerySubstitutablePaths(grpc::ServerContext* context, + const StorePaths* request, + StorePaths* response) override { + return HandleExceptions( + [&]() -> Status { + std::set<Path> paths; + for (const auto& path : request->paths()) { + ASSERT_INPUT_STORE_PATH(path); + paths.insert(path); + } + + auto res = store_->querySubstitutablePaths(paths); + + for (const auto& path : res) { + response->add_paths(path); + } + + return Status::OK; + }, + __FUNCTION__); + } + + Status OptimiseStore(grpc::ServerContext* context, + const google::protobuf::Empty* request, + google::protobuf::Empty* response) override { + return HandleExceptions( + [&]() -> Status { + store_->optimiseStore(); + return Status::OK; + }, + __FUNCTION__); + } + + Status VerifyStore(grpc::ServerContext* context, + const nix::proto::VerifyStoreRequest* request, + nix::proto::VerifyStoreResponse* response) override { + return HandleExceptions( + [&]() -> Status { + auto errors = + store_->verifyStore(request->check_contents(), + static_cast<RepairFlag>(request->repair())); + + response->set_errors(errors); + + return Status::OK; + }, + __FUNCTION__); + } + + Status BuildDerivation( + grpc::ServerContext*, const nix::proto::BuildDerivationRequest* request, + grpc::ServerWriter<nix::proto::BuildEvent>* writer) override { + return HandleExceptions( + [&]() -> Status { + auto drv_path = request->drv_path().path(); + ASSERT_INPUT_STORE_PATH(drv_path); + auto drv = BasicDerivation::from_proto(&request->derivation()); + + auto build_mode = nix::BuildModeFrom(request->build_mode()); + if (!build_mode) { + return Status(grpc::StatusCode::INTERNAL, "Invalid build mode"); + } + + BuildLogStreambuf log_buffer(writer); + std::ostream log_sink(&log_buffer); + BuildResult res = + store_->buildDerivation(log_sink, drv_path, drv, *build_mode); + + proto::BuildResult proto_res{}; + proto_res.set_status(res.status_to_proto()); + + if (!res.errorMsg.empty()) { + proto_res.set_msg(res.errorMsg); + } + + proto::BuildEvent event{}; + *event.mutable_result() = proto_res; + + writer->Write(event); + return Status::OK; + }, + __FUNCTION__); + } + + Status AddSignatures(grpc::ServerContext* context, + const nix::proto::AddSignaturesRequest* request, + google::protobuf::Empty* response) override { + return HandleExceptions( + [&]() -> Status { + auto path = request->path().path(); + ASSERT_INPUT_STORE_PATH(path); + + StringSet sigs; + sigs.insert(request->sigs().sigs().begin(), + request->sigs().sigs().end()); + + store_->addSignatures(path, sigs); + + return Status::OK; + }, + __FUNCTION__); + } + + Status QueryMissing(grpc::ServerContext* context, const StorePaths* request, + nix::proto::QueryMissingResponse* response) override { + return HandleExceptions( + [&]() -> Status { + std::set<Path> targets; + for (auto& path : request->paths()) { + ASSERT_INPUT_STORE_PATH(path); + targets.insert(path); + } + PathSet will_build; + PathSet will_substitute; + PathSet unknown; + // TODO(grfn): Switch to concrete size type + unsigned long long download_size; + unsigned long long nar_size; + + store_->queryMissing(targets, will_build, will_substitute, unknown, + download_size, nar_size); + for (auto& path : will_build) { + response->add_will_build(path); + } + for (auto& path : will_substitute) { + response->add_will_substitute(path); + } + for (auto& path : unknown) { + response->add_unknown(path); + } + response->set_download_size(download_size); + response->set_nar_size(nar_size); + + return Status::OK; + }, + __FUNCTION__); + }; + + Status GetBuildLog(grpc::ServerContext* context, const StorePath* request, + proto::BuildLog* response) override { + return HandleExceptions( + [&]() -> Status { + const auto log = store_->getBuildLog(request->path()); + if (log) { + response->set_build_log(*log); + } + return Status::OK; + }, + __FUNCTION__); + } + + private: + Status HandleExceptions(std::function<Status(void)> fn, + absl::string_view methodName) { + try { + return fn(); + } catch (Unsupported& e) { + return Status(grpc::StatusCode::UNIMPLEMENTED, + absl::StrCat(methodName, " is not supported: ", e.what())); + } catch (Error& e) { + return Status(grpc::StatusCode::INTERNAL, e.what()); + } + // add more specific Error-Status mappings above + } + + ref<nix::Store> store_; +}; + +WorkerService::Service* NewWorkerService(nix::Store& store) { + return new WorkerServiceImpl(store); +} + +} // namespace nix::daemon diff --git a/third_party/nix/src/nix-daemon/nix-daemon-proto.hh b/third_party/nix/src/nix-daemon/nix-daemon-proto.hh new file mode 100644 index 000000000000..ca871213eb60 --- /dev/null +++ b/third_party/nix/src/nix-daemon/nix-daemon-proto.hh @@ -0,0 +1,12 @@ +#pragma once + +#include <memory> + +#include "libproto/worker.grpc.pb.h" +#include "libstore/store-api.hh" + +namespace nix::daemon { + +nix::proto::WorkerService::Service* NewWorkerService(nix::Store&); + +} // namespace nix::daemon diff --git a/third_party/nix/src/nix-daemon/nix-daemon.cc b/third_party/nix/src/nix-daemon/nix-daemon.cc new file mode 100644 index 000000000000..0551625a3e13 --- /dev/null +++ b/third_party/nix/src/nix-daemon/nix-daemon.cc @@ -0,0 +1,201 @@ +#include <filesystem> + +#include <absl/flags/flag.h> +#include <absl/flags/parse.h> +#include <absl/flags/usage_config.h> +#include <absl/strings/str_format.h> +#include <fcntl.h> +#include <glog/logging.h> +#include <grpcpp/security/server_credentials.h> +#include <grpcpp/server.h> +#include <grpcpp/server_builder.h> +#include <grpcpp/server_posix.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <systemd/sd-daemon.h> + +#include "libmain/shared.hh" // TODO(tazjin): can this be removed? +#include "libstore/globals.hh" +#include "libstore/store-api.hh" +#include "libutil/util.hh" +#include "nix-daemon-proto.hh" +#include "nix-daemon/nix-daemon-proto.hh" +#include "nix/legacy.hh" + +ABSL_FLAG(bool, pipe, false, "Use pipes for daemon communication"); + +namespace nix::daemon { + +using grpc::Server; +using grpc::ServerBuilder; + +namespace { + +// TODO(grfn): There has to be a better way to do this - this was ported +// verbatim from the old daemon implementation without much critical evaluation. +static int ForwardToSocket(nix::Path socket_path) { + // Forward on this connection to the real daemon + int sockfd = socket(PF_UNIX, SOCK_STREAM, 0); + if (sockfd == -1) { + throw SysError("creating Unix domain socket"); + } + + auto socketDir = dirOf(socket_path); + if (chdir(socketDir.c_str()) == -1) { + throw SysError(format("changing to socket directory '%1%'") % socketDir); + } + + auto socketName = baseNameOf(socket_path); + auto addr = sockaddr_un{}; + addr.sun_family = AF_UNIX; + if (socketName.size() + 1 >= sizeof(addr.sun_path)) { + throw Error(format("socket name %1% is too long") % socketName); + } + strncpy(addr.sun_path, socketName.c_str(), sizeof(addr.sun_family)); + + if (connect(sockfd, reinterpret_cast<struct sockaddr*>(&addr), + sizeof(addr)) == -1) { + throw SysError(format("cannot connect to daemon at %1%") % socket_path); + } + + auto nfds = (sockfd > STDIN_FILENO ? sockfd : STDIN_FILENO) + 1; + while (true) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(sockfd, &fds); + FD_SET(STDIN_FILENO, &fds); + if (select(nfds, &fds, nullptr, nullptr, nullptr) == -1) { + throw SysError("waiting for data from client or server"); + } + if (FD_ISSET(sockfd, &fds)) { + auto res = splice(sockfd, nullptr, STDOUT_FILENO, nullptr, SSIZE_MAX, + SPLICE_F_MOVE); + if (res == -1) { + throw SysError("splicing data from daemon socket to stdout"); + } + if (res == 0) { + throw EndOfFile("unexpected EOF from daemon socket"); + } + } + if (FD_ISSET(STDIN_FILENO, &fds)) { + auto res = splice(STDIN_FILENO, nullptr, sockfd, nullptr, SSIZE_MAX, + SPLICE_F_MOVE); + if (res == -1) { + throw SysError("splicing data from stdin to daemon socket"); + } + if (res == 0) { + return 0; + } + } + } +} + +void SetNonBlocking(int fd) { + int flags = fcntl(fd, F_GETFL); // NOLINT + PCHECK(flags != 0) << "Error getting socket flags"; + PCHECK(fcntl( // NOLINT + fd, F_SETFL, flags | O_NONBLOCK) == 0) + << "Could not set socket flags"; +} + +} // namespace + +int RunServer() { + Store::Params params; + params["path-info-cache-size"] = "0"; + auto store = openStore(settings.storeUri, params); + auto worker = NewWorkerService(*store); + ServerBuilder builder; + builder.RegisterService(worker); + + auto n_fds = sd_listen_fds(0); + + if (n_fds > 1) { + LOG(FATAL) << "Too many file descriptors (" << n_fds + << ") received from systemd socket activation"; + } + + std::filesystem::path socket_path; + + if (n_fds == 0) { + socket_path = settings.nixDaemonSocketFile; + std::filesystem::create_directories(socket_path.parent_path()); + auto socket_addr = absl::StrFormat("unix://%s", socket_path); + builder.AddListeningPort(socket_addr, grpc::InsecureServerCredentials()); + } + + std::unique_ptr<Server> server(builder.BuildAndStart()); + + if (!server) { + LOG(FATAL) << "Error building server"; + return 1; + } + + // We have been systemd socket-activated - instead of asking grpc to make the + // socket path for us, start our own accept loop and pass file descriptors to + // grpc. + // + // This approach was *somewhat* adapted from + // https://gist.github.com/yorickvP/8d523a4df2b10c5812fa7789e82b7c1b - at some + // point we'd like gRPC to do it for us, though - see + // https://github.com/grpc/grpc/issues/19133 + if (n_fds == 1) { + int socket_fd = SD_LISTEN_FDS_START; + // Only used for logging + socket_path = readLink(absl::StrFormat("/proc/self/fd/%d", socket_fd)); + + PCHECK(sd_notify(0, "READY=1") == 0) << "Error notifying systemd"; + for (;;) { + try { + struct sockaddr_un remote_addr {}; + socklen_t remote_addr_len = sizeof(remote_addr); + int remote_fd = + accept(socket_fd, + reinterpret_cast<struct sockaddr*>(&remote_addr), // NOLINT + &remote_addr_len); + checkInterrupt(); + if (!remote_fd) { + if (errno == EINTR) { + continue; + } + PCHECK(false) << "error accepting connection"; + } + + LOG(INFO) << "Accepted remote connection on fd " << remote_fd; + SetNonBlocking(remote_fd); + grpc::AddInsecureChannelFromFd(server.get(), remote_fd); + } catch (Interrupted& e) { + return -1; + } catch (Error& e) { + LOG(ERROR) << "error processing connection: " << e.msg(); + } + } + } + + LOG(INFO) << "Nix daemon listening at " << socket_path; + server->Wait(); + return 0; +} + +} // namespace nix::daemon + +int main(int argc, char** argv) { // NOLINT + FLAGS_logtostderr = true; + google::InitGoogleLogging(argv[0]); // NOLINT + + absl::SetFlagsUsageConfig({.version_string = [] { return nix::nixVersion; }}); + absl::ParseCommandLine(argc, argv); + + if (absl::GetFlag(FLAGS_pipe)) { + if (nix::getStoreType() == nix::tDaemon) { + return nix::daemon::ForwardToSocket(nix::settings.nixDaemonSocketFile); + } else { + // TODO(grfn): Need to launch a server on stdin here - upstream calls + // processConnection(true, "root", 0); + LOG(ERROR) << "not implemented"; + return 1; + } + } + + return nix::daemon::RunServer(); +} |