diff options
-rw-r--r-- | src/libmain/shared.cc | 1 | ||||
-rw-r--r-- | src/libstore/remote-store.cc | 465 | ||||
-rw-r--r-- | src/libstore/remote-store.hh | 29 | ||||
-rw-r--r-- | src/libutil/pool.hh | 151 | ||||
-rw-r--r-- | src/libutil/ref.hh | 67 | ||||
-rw-r--r-- | src/libutil/serialise.cc | 22 | ||||
-rw-r--r-- | src/libutil/serialise.hh | 23 | ||||
-rw-r--r-- | src/libutil/sync.hh | 78 | ||||
-rw-r--r-- | src/libutil/types.hh | 61 |
9 files changed, 584 insertions, 313 deletions
diff --git a/src/libmain/shared.cc b/src/libmain/shared.cc index 8f2aa842036a..c27302227304 100644 --- a/src/libmain/shared.cc +++ b/src/libmain/shared.cc @@ -126,6 +126,7 @@ void initNix() std::cerr.rdbuf()->pubsetbuf(buf, sizeof(buf)); #endif + // FIXME: do we need this? It's not thread-safe. std::ios::sync_with_stdio(false); if (getEnv("IN_SYSTEMD") == "1") diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index ab2ebb9aecc3..2f540c640e0b 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -6,6 +6,7 @@ #include "affinity.hh" #include "globals.hh" #include "derivations.hh" +#include "pool.hh" #include <sys/types.h> #include <sys/stat.h> @@ -13,9 +14,8 @@ #include <sys/un.h> #include <errno.h> #include <fcntl.h> - -#include <iostream> #include <unistd.h> + #include <cstring> namespace nix { @@ -39,62 +39,25 @@ template<class T> T readStorePaths(Source & from) template PathSet readStorePaths(Source & from); -RemoteStore::RemoteStore() +RemoteStore::RemoteStore(size_t maxConnections) + : connections(make_ref<Pool<Connection>>( + maxConnections, + [this]() { return openConnection(); }, + [](const ref<Connection> & r) { return r->to.good() && r->from.good(); } + )) { - initialised = false; } -void RemoteStore::openConnection(bool reserveSpace) +ref<RemoteStore::Connection> RemoteStore::openConnection(bool reserveSpace) { - if (initialised) return; - initialised = true; + auto conn = make_ref<Connection>(); /* Connect to a daemon that does the privileged work for us. */ - connectToDaemon(); - - from.fd = fdSocket; - to.fd = fdSocket; - - /* Send the magic greeting, check for the reply. */ - try { - to << WORKER_MAGIC_1; - to.flush(); - unsigned int magic = readInt(from); - if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch"); - - daemonVersion = readInt(from); - if (GET_PROTOCOL_MAJOR(daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION)) - throw Error("Nix daemon protocol version not supported"); - to << PROTOCOL_VERSION; - - if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) { - int cpu = settings.lockCPU ? lockToCurrentCPU() : -1; - if (cpu != -1) - to << 1 << cpu; - else - to << 0; - } - - if (GET_PROTOCOL_MINOR(daemonVersion) >= 11) - to << reserveSpace; - - processStderr(); - } - catch (Error & e) { - throw Error(format("cannot start daemon worker: %1%") % e.msg()); - } - - setOptions(); -} - - -void RemoteStore::connectToDaemon() -{ - fdSocket = socket(PF_UNIX, SOCK_STREAM, 0); - if (fdSocket == -1) + conn->fd = socket(PF_UNIX, SOCK_STREAM, 0); + if (conn->fd == -1) throw SysError("cannot create Unix domain socket"); - closeOnExec(fdSocket); + closeOnExec(conn->fd); string socketPath = settings.nixDaemonSocketFile; @@ -111,111 +74,135 @@ void RemoteStore::connectToDaemon() addr.sun_family = AF_UNIX; if (socketPathRel.size() >= sizeof(addr.sun_path)) throw Error(format("socket path ‘%1%’ is too long") % socketPathRel); - using namespace std; strcpy(addr.sun_path, socketPathRel.c_str()); - if (connect(fdSocket, (struct sockaddr *) &addr, sizeof(addr)) == -1) + if (connect(conn->fd, (struct sockaddr *) &addr, sizeof(addr)) == -1) throw SysError(format("cannot connect to daemon at ‘%1%’") % socketPath); if (fchdir(fdPrevDir) == -1) throw SysError("couldn't change back to previous directory"); -} + conn->from.fd = conn->fd; + conn->to.fd = conn->fd; -RemoteStore::~RemoteStore() -{ + /* Send the magic greeting, check for the reply. */ try { - to.flush(); - fdSocket.close(); - } catch (...) { - ignoreException(); + conn->to << WORKER_MAGIC_1; + conn->to.flush(); + unsigned int magic = readInt(conn->from); + if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch"); + + conn->daemonVersion = readInt(conn->from); + if (GET_PROTOCOL_MAJOR(conn->daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION)) + throw Error("Nix daemon protocol version not supported"); + conn->to << PROTOCOL_VERSION; + + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 14) { + int cpu = settings.lockCPU ? lockToCurrentCPU() : -1; + if (cpu != -1) + conn->to << 1 << cpu; + else + conn->to << 0; + } + + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 11) + conn->to << reserveSpace; + + conn->processStderr(); } + catch (Error & e) { + throw Error(format("cannot start daemon worker: %1%") % e.msg()); + } + + setOptions(conn); + + return conn; } -void RemoteStore::setOptions() +void RemoteStore::setOptions(ref<Connection> conn) { - to << wopSetOptions + conn->to << wopSetOptions << settings.keepFailed << settings.keepGoing << settings.tryFallback << verbosity << settings.maxBuildJobs << settings.maxSilentTime; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 2) - to << settings.useBuildHook; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 4) - to << settings.buildVerbosity + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 2) + conn->to << settings.useBuildHook; + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 4) + conn->to << settings.buildVerbosity << logType << settings.printBuildTrace; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 6) - to << settings.buildCores; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 10) - to << settings.useSubstitutes; + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 6) + conn->to << settings.buildCores; + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 10) + conn->to << settings.useSubstitutes; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 12) { + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 12) { Settings::SettingsMap overrides = settings.getOverrides(); if (overrides["ssh-auth-sock"] == "") overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK"); - to << overrides.size(); + conn->to << overrides.size(); for (auto & i : overrides) - to << i.first << i.second; + conn->to << i.first << i.second; } - processStderr(); + conn->processStderr(); } bool RemoteStore::isValidPath(const Path & path) { - openConnection(); - to << wopIsValidPath << path; - processStderr(); - unsigned int reply = readInt(from); + auto conn(connections->get()); + conn->to << wopIsValidPath << path; + conn->processStderr(); + unsigned int reply = readInt(conn->from); return reply != 0; } PathSet RemoteStore::queryValidPaths(const PathSet & paths) { - openConnection(); - if (GET_PROTOCOL_MINOR(daemonVersion) < 12) { + auto conn(connections->get()); + if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { PathSet res; for (auto & i : paths) if (isValidPath(i)) res.insert(i); return res; } else { - to << wopQueryValidPaths << paths; - processStderr(); - return readStorePaths<PathSet>(from); + conn->to << wopQueryValidPaths << paths; + conn->processStderr(); + return readStorePaths<PathSet>(conn->from); } } PathSet RemoteStore::queryAllValidPaths() { - openConnection(); - to << wopQueryAllValidPaths; - processStderr(); - return readStorePaths<PathSet>(from); + auto conn(connections->get()); + conn->to << wopQueryAllValidPaths; + conn->processStderr(); + return readStorePaths<PathSet>(conn->from); } PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths) { - openConnection(); - if (GET_PROTOCOL_MINOR(daemonVersion) < 12) { + auto conn(connections->get()); + if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { PathSet res; for (auto & i : paths) { - to << wopHasSubstitutes << i; - processStderr(); - if (readInt(from)) res.insert(i); + conn->to << wopHasSubstitutes << i; + conn->processStderr(); + if (readInt(conn->from)) res.insert(i); } return res; } else { - to << wopQuerySubstitutablePaths << paths; - processStderr(); - return readStorePaths<PathSet>(from); + conn->to << wopQuerySubstitutablePaths << paths; + conn->processStderr(); + return readStorePaths<PathSet>(conn->from); } } @@ -225,39 +212,39 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, { if (paths.empty()) return; - openConnection(); + auto conn(connections->get()); - if (GET_PROTOCOL_MINOR(daemonVersion) < 3) return; + if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 3) return; - if (GET_PROTOCOL_MINOR(daemonVersion) < 12) { + if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { for (auto & i : paths) { SubstitutablePathInfo info; - to << wopQuerySubstitutablePathInfo << i; - processStderr(); - unsigned int reply = readInt(from); + conn->to << wopQuerySubstitutablePathInfo << i; + conn->processStderr(); + unsigned int reply = readInt(conn->from); if (reply == 0) continue; - info.deriver = readString(from); + info.deriver = readString(conn->from); if (info.deriver != "") assertStorePath(info.deriver); - info.references = readStorePaths<PathSet>(from); - info.downloadSize = readLongLong(from); - info.narSize = GET_PROTOCOL_MINOR(daemonVersion) >= 7 ? readLongLong(from) : 0; + info.references = readStorePaths<PathSet>(conn->from); + info.downloadSize = readLongLong(conn->from); + info.narSize = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 7 ? readLongLong(conn->from) : 0; infos[i] = info; } } else { - to << wopQuerySubstitutablePathInfos << paths; - processStderr(); - unsigned int count = readInt(from); + conn->to << wopQuerySubstitutablePathInfos << paths; + conn->processStderr(); + unsigned int count = readInt(conn->from); for (unsigned int n = 0; n < count; n++) { - Path path = readStorePath(from); + Path path = readStorePath(conn->from); SubstitutablePathInfo & info(infos[path]); - info.deriver = readString(from); + info.deriver = readString(conn->from); if (info.deriver != "") assertStorePath(info.deriver); - info.references = readStorePaths<PathSet>(from); - info.downloadSize = readLongLong(from); - info.narSize = readLongLong(from); + info.references = readStorePaths<PathSet>(conn->from); + info.downloadSize = readLongLong(conn->from); + info.narSize = readLongLong(conn->from); } } @@ -266,27 +253,27 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, ValidPathInfo RemoteStore::queryPathInfo(const Path & path) { - openConnection(); - to << wopQueryPathInfo << path; - processStderr(); + auto conn(connections->get()); + conn->to << wopQueryPathInfo << path; + conn->processStderr(); ValidPathInfo info; info.path = path; - info.deriver = readString(from); + info.deriver = readString(conn->from); if (info.deriver != "") assertStorePath(info.deriver); - info.narHash = parseHash(htSHA256, readString(from)); - info.references = readStorePaths<PathSet>(from); - info.registrationTime = readInt(from); - info.narSize = readLongLong(from); + info.narHash = parseHash(htSHA256, readString(conn->from)); + info.references = readStorePaths<PathSet>(conn->from); + info.registrationTime = readInt(conn->from); + info.narSize = readLongLong(conn->from); return info; } Hash RemoteStore::queryPathHash(const Path & path) { - openConnection(); - to << wopQueryPathHash << path; - processStderr(); - string hash = readString(from); + auto conn(connections->get()); + conn->to << wopQueryPathHash << path; + conn->processStderr(); + string hash = readString(conn->from); return parseHash(htSHA256, hash); } @@ -294,10 +281,10 @@ Hash RemoteStore::queryPathHash(const Path & path) void RemoteStore::queryReferences(const Path & path, PathSet & references) { - openConnection(); - to << wopQueryReferences << path; - processStderr(); - PathSet references2 = readStorePaths<PathSet>(from); + auto conn(connections->get()); + conn->to << wopQueryReferences << path; + conn->processStderr(); + PathSet references2 = readStorePaths<PathSet>(conn->from); references.insert(references2.begin(), references2.end()); } @@ -305,20 +292,20 @@ void RemoteStore::queryReferences(const Path & path, void RemoteStore::queryReferrers(const Path & path, PathSet & referrers) { - openConnection(); - to << wopQueryReferrers << path; - processStderr(); - PathSet referrers2 = readStorePaths<PathSet>(from); + auto conn(connections->get()); + conn->to << wopQueryReferrers << path; + conn->processStderr(); + PathSet referrers2 = readStorePaths<PathSet>(conn->from); referrers.insert(referrers2.begin(), referrers2.end()); } Path RemoteStore::queryDeriver(const Path & path) { - openConnection(); - to << wopQueryDeriver << path; - processStderr(); - Path drvPath = readString(from); + auto conn(connections->get()); + conn->to << wopQueryDeriver << path; + conn->processStderr(); + Path drvPath = readString(conn->from); if (drvPath != "") assertStorePath(drvPath); return drvPath; } @@ -326,37 +313,37 @@ Path RemoteStore::queryDeriver(const Path & path) PathSet RemoteStore::queryValidDerivers(const Path & path) { - openConnection(); - to << wopQueryValidDerivers << path; - processStderr(); - return readStorePaths<PathSet>(from); + auto conn(connections->get()); + conn->to << wopQueryValidDerivers << path; + conn->processStderr(); + return readStorePaths<PathSet>(conn->from); } PathSet RemoteStore::queryDerivationOutputs(const Path & path) { - openConnection(); - to << wopQueryDerivationOutputs << path; - processStderr(); - return readStorePaths<PathSet>(from); + auto conn(connections->get()); + conn->to << wopQueryDerivationOutputs << path; + conn->processStderr(); + return readStorePaths<PathSet>(conn->from); } PathSet RemoteStore::queryDerivationOutputNames(const Path & path) { - openConnection(); - to << wopQueryDerivationOutputNames << path; - processStderr(); - return readStrings<PathSet>(from); + auto conn(connections->get()); + conn->to << wopQueryDerivationOutputNames << path; + conn->processStderr(); + return readStrings<PathSet>(conn->from); } Path RemoteStore::queryPathFromHashPart(const string & hashPart) { - openConnection(); - to << wopQueryPathFromHashPart << hashPart; - processStderr(); - Path path = readString(from); + auto conn(connections->get()); + conn->to << wopQueryPathFromHashPart << hashPart; + conn->processStderr(); + Path path = readString(conn->from); if (!path.empty()) assertStorePath(path); return path; } @@ -367,32 +354,32 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath, { if (repair) throw Error("repairing is not supported when building through the Nix daemon"); - openConnection(); + auto conn(connections->get()); Path srcPath(absPath(_srcPath)); - to << wopAddToStore << name + conn->to << wopAddToStore << name << ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */ << (recursive ? 1 : 0) << printHashType(hashAlgo); try { - to.written = 0; - to.warn = true; - dumpPath(srcPath, to, filter); - to.warn = false; - processStderr(); + conn->to.written = 0; + conn->to.warn = true; + dumpPath(srcPath, conn->to, filter); + conn->to.warn = false; + conn->processStderr(); } catch (SysError & e) { /* Daemon closed while we were sending the path. Probably OOM or I/O error. */ if (e.errNo == EPIPE) try { - processStderr(); + conn->processStderr(); } catch (EndOfFile & e) { } throw; } - return readStorePath(from); + return readStorePath(conn->from); } @@ -401,43 +388,43 @@ Path RemoteStore::addTextToStore(const string & name, const string & s, { if (repair) throw Error("repairing is not supported when building through the Nix daemon"); - openConnection(); - to << wopAddTextToStore << name << s << references; + auto conn(connections->get()); + conn->to << wopAddTextToStore << name << s << references; - processStderr(); - return readStorePath(from); + conn->processStderr(); + return readStorePath(conn->from); } void RemoteStore::exportPath(const Path & path, bool sign, Sink & sink) { - openConnection(); - to << wopExportPath << path << (sign ? 1 : 0); - processStderr(&sink); /* sink receives the actual data */ - readInt(from); + auto conn(connections->get()); + conn->to << wopExportPath << path << (sign ? 1 : 0); + conn->processStderr(&sink); /* sink receives the actual data */ + readInt(conn->from); } Paths RemoteStore::importPaths(bool requireSignature, Source & source) { - openConnection(); - to << wopImportPaths; + auto conn(connections->get()); + conn->to << wopImportPaths; /* We ignore requireSignature, since the worker forces it to true anyway. */ - processStderr(0, &source); - return readStorePaths<Paths>(from); + conn->processStderr(0, &source); + return readStorePaths<Paths>(conn->from); } void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode) { - openConnection(); - to << wopBuildPaths; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 13) { - to << drvPaths; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 15) - to << buildMode; + auto conn(connections->get()); + conn->to << wopBuildPaths; + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) { + conn->to << drvPaths; + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) + conn->to << buildMode; else /* Old daemons did not take a 'buildMode' parameter, so we need to validate it here on the client side. */ @@ -449,22 +436,22 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode) PathSet drvPaths2; for (auto & i : drvPaths) drvPaths2.insert(string(i, 0, i.find('!'))); - to << drvPaths2; + conn->to << drvPaths2; } - processStderr(); - readInt(from); + conn->processStderr(); + readInt(conn->from); } BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv, BuildMode buildMode) { - openConnection(); - to << wopBuildDerivation << drvPath << drv << buildMode; - processStderr(); + auto conn(connections->get()); + conn->to << wopBuildDerivation << drvPath << drv << buildMode; + conn->processStderr(); BuildResult res; unsigned int status; - from >> status >> res.errorMsg; + conn->from >> status >> res.errorMsg; res.status = (BuildResult::Status) status; return res; } @@ -472,50 +459,50 @@ BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDeriva void RemoteStore::ensurePath(const Path & path) { - openConnection(); - to << wopEnsurePath << path; - processStderr(); - readInt(from); + auto conn(connections->get()); + conn->to << wopEnsurePath << path; + conn->processStderr(); + readInt(conn->from); } void RemoteStore::addTempRoot(const Path & path) { - openConnection(); - to << wopAddTempRoot << path; - processStderr(); - readInt(from); + auto conn(connections->get()); + conn->to << wopAddTempRoot << path; + conn->processStderr(); + readInt(conn->from); } void RemoteStore::addIndirectRoot(const Path & path) { - openConnection(); - to << wopAddIndirectRoot << path; - processStderr(); - readInt(from); + auto conn(connections->get()); + conn->to << wopAddIndirectRoot << path; + conn->processStderr(); + readInt(conn->from); } void RemoteStore::syncWithGC() { - openConnection(); - to << wopSyncWithGC; - processStderr(); - readInt(from); + auto conn(connections->get()); + conn->to << wopSyncWithGC; + conn->processStderr(); + readInt(conn->from); } Roots RemoteStore::findRoots() { - openConnection(); - to << wopFindRoots; - processStderr(); - unsigned int count = readInt(from); + auto conn(connections->get()); + conn->to << wopFindRoots; + conn->processStderr(); + unsigned int count = readInt(conn->from); Roots result; while (count--) { - Path link = readString(from); - Path target = readStorePath(from); + Path link = readString(conn->from); + Path target = readStorePath(conn->from); result[link] = target; } return result; @@ -524,56 +511,68 @@ Roots RemoteStore::findRoots() void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results) { - openConnection(false); + auto conn(connections->get()); - to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness + conn->to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness << options.maxFreed << 0; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 5) + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 5) /* removed options */ - to << 0 << 0; + conn->to << 0 << 0; - processStderr(); + conn->processStderr(); - results.paths = readStrings<PathSet>(from); - results.bytesFreed = readLongLong(from); - readLongLong(from); // obsolete + results.paths = readStrings<PathSet>(conn->from); + results.bytesFreed = readLongLong(conn->from); + readLongLong(conn->from); // obsolete } PathSet RemoteStore::queryFailedPaths() { - openConnection(); - to << wopQueryFailedPaths; - processStderr(); - return readStorePaths<PathSet>(from); + auto conn(connections->get()); + conn->to << wopQueryFailedPaths; + conn->processStderr(); + return readStorePaths<PathSet>(conn->from); } void RemoteStore::clearFailedPaths(const PathSet & paths) { - openConnection(); - to << wopClearFailedPaths << paths; - processStderr(); - readInt(from); + auto conn(connections->get()); + conn->to << wopClearFailedPaths << paths; + conn->processStderr(); + readInt(conn->from); } void RemoteStore::optimiseStore() { - openConnection(); - to << wopOptimiseStore; - processStderr(); - readInt(from); + auto conn(connections->get()); + conn->to << wopOptimiseStore; + conn->processStderr(); + readInt(conn->from); } bool RemoteStore::verifyStore(bool checkContents, bool repair) { - openConnection(); - to << wopVerifyStore << checkContents << repair; - processStderr(); - return readInt(from) != 0; + auto conn(connections->get()); + conn->to << wopVerifyStore << checkContents << repair; + conn->processStderr(); + return readInt(conn->from) != 0; +} + + +RemoteStore::Connection::~Connection() +{ + try { + to.flush(); + fd.close(); + } catch (...) { + ignoreException(); + } } -void RemoteStore::processStderr(Sink * sink, Source * source) + +void RemoteStore::Connection::processStderr(Sink * sink, Source * source) { to.flush(); unsigned int msg; diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index f15182285e8c..af417b84ff9c 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -1,5 +1,6 @@ #pragma once +#include <limits> #include <string> #include "store-api.hh" @@ -12,15 +13,14 @@ class Pipe; class Pid; struct FdSink; struct FdSource; +template<typename T> class Pool; class RemoteStore : public Store { public: - RemoteStore(); - - ~RemoteStore(); + RemoteStore(size_t maxConnections = std::numeric_limits<size_t>::max()); /* Implementations of abstract store API methods. */ @@ -91,19 +91,24 @@ public: bool verifyStore(bool checkContents, bool repair) override; private: - AutoCloseFD fdSocket; - FdSink to; - FdSource from; - unsigned int daemonVersion; - bool initialised; - void openConnection(bool reserveSpace = true); + struct Connection + { + AutoCloseFD fd; + FdSink to; + FdSource from; + unsigned int daemonVersion; + + ~Connection(); + + void processStderr(Sink * sink = 0, Source * source = 0); + }; - void processStderr(Sink * sink = 0, Source * source = 0); + ref<Pool<Connection>> connections; - void connectToDaemon(); + ref<Connection> openConnection(bool reserveSpace = true); - void setOptions(); + void setOptions(ref<Connection> conn); }; diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh new file mode 100644 index 000000000000..f291cd578388 --- /dev/null +++ b/src/libutil/pool.hh @@ -0,0 +1,151 @@ +#pragma once + +#include <functional> +#include <limits> +#include <list> +#include <memory> +#include <cassert> + +#include "sync.hh" +#include "ref.hh" + +namespace nix { + +/* This template class implements a simple pool manager of resources + of some type R, such as database connections. It is used as + follows: + + class Connection { ... }; + + Pool<Connection> pool; + + { + auto conn(pool.get()); + conn->exec("select ..."); + } + + Here, the Connection object referenced by ‘conn’ is automatically + returned to the pool when ‘conn’ goes out of scope. +*/ + +template <class R> +class Pool +{ +public: + + /* A function that produces new instances of R on demand. */ + typedef std::function<ref<R>()> Factory; + + /* A function that checks whether an instance of R is still + usable. Unusable instances are removed from the pool. */ + typedef std::function<bool(const ref<R> &)> Validator; + +private: + + Factory factory; + Validator validator; + + struct State + { + size_t inUse = 0; + size_t max; + std::vector<ref<R>> idle; + }; + + Sync<State> state; + + std::condition_variable wakeup; + +public: + + Pool(size_t max = std::numeric_limits<size_t>::max(), + const Factory & factory = []() { return make_ref<R>(); }, + const Validator & validator = [](ref<R> r) { return true; }) + : factory(factory) + , validator(validator) + { + auto state_(state.lock()); + state_->max = max; + } + + ~Pool() + { + auto state_(state.lock()); + assert(!state_->inUse); + state_->max = 0; + state_->idle.clear(); + } + + class Handle + { + private: + Pool & pool; + std::shared_ptr<R> r; + + friend Pool; + + Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { } + + public: + Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); } + + Handle(const Handle & l) = delete; + + ~Handle() + { + if (!r) return; + { + auto state_(pool.state.lock()); + state_->idle.push_back(ref<R>(r)); + assert(state_->inUse); + state_->inUse--; + } + pool.wakeup.notify_one(); + } + + R * operator -> () { return &*r; } + R & operator * () { return *r; } + }; + + Handle get() + { + { + auto state_(state.lock()); + + /* If we're over the maximum number of instance, we need + to wait until a slot becomes available. */ + while (state_->idle.empty() && state_->inUse >= state_->max) + state_.wait(wakeup); + + while (!state_->idle.empty()) { + auto p = state_->idle.back(); + state_->idle.pop_back(); + if (validator(p)) { + state_->inUse++; + return Handle(*this, p); + } + } + + state_->inUse++; + } + + /* We need to create a new instance. Because that might take a + while, we don't hold the lock in the meantime. */ + try { + Handle h(*this, factory()); + return h; + } catch (...) { + auto state_(state.lock()); + state_->inUse--; + throw; + } + } + + unsigned int count() + { + auto state_(state.lock()); + return state_->idle.size() + state_->inUse; + } +}; + +} diff --git a/src/libutil/ref.hh b/src/libutil/ref.hh new file mode 100644 index 000000000000..a6d338d79622 --- /dev/null +++ b/src/libutil/ref.hh @@ -0,0 +1,67 @@ +#pragma once + +#include <memory> +#include <exception> + +namespace nix { + +/* A simple non-nullable reference-counted pointer. Actually a wrapper + around std::shared_ptr that prevents non-null constructions. */ +template<typename T> +class ref +{ +private: + + std::shared_ptr<T> p; + +public: + + ref<T>(const ref<T> & r) + : p(r.p) + { } + + explicit ref<T>(const std::shared_ptr<T> & p) + : p(p) + { + if (!p) + throw std::invalid_argument("null pointer cast to ref"); + } + + T* operator ->() const + { + return &*p; + } + + T& operator *() const + { + return *p; + } + + operator std::shared_ptr<T> () + { + return p; + } + + template<typename T2> + operator ref<T2> () + { + return ref<T2>((std::shared_ptr<T2>) p); + } + +private: + + template<typename T2, typename... Args> + friend ref<T2> + make_ref(Args&&... args); + +}; + +template<typename T, typename... Args> +inline ref<T> +make_ref(Args&&... args) +{ + auto p = std::make_shared<T>(std::forward<Args>(args)...); + return ref<T>(p); +} + +} diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index f136a13248ba..c9620e2bf32a 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -72,7 +72,17 @@ void FdSink::write(const unsigned char * data, size_t len) warned = true; } } - writeFull(fd, data, len); + try { + writeFull(fd, data, len); + } catch (SysError & e) { + _good = true; + } +} + + +bool FdSink::good() +{ + return _good; } @@ -119,12 +129,18 @@ size_t FdSource::readUnbuffered(unsigned char * data, size_t len) checkInterrupt(); n = ::read(fd, (char *) data, bufSize); } while (n == -1 && errno == EINTR); - if (n == -1) throw SysError("reading from file"); - if (n == 0) throw EndOfFile("unexpected end-of-file"); + if (n == -1) { _good = false; throw SysError("reading from file"); } + if (n == 0) { _good = false; throw EndOfFile("unexpected end-of-file"); } return n; } +bool FdSource::good() +{ + return _good; +} + + size_t StringSource::read(unsigned char * data, size_t len) { if (pos == s.size()) throw EndOfFile("end of string reached"); diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 979ff849fcaf..9e269f3923ea 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -12,6 +12,7 @@ struct Sink { virtual ~Sink() { } virtual void operator () (const unsigned char * data, size_t len) = 0; + virtual bool good() { return true; } }; @@ -25,7 +26,7 @@ struct BufferedSink : Sink : bufSize(bufSize), bufPos(0), buffer(0) { } ~BufferedSink(); - void operator () (const unsigned char * data, size_t len); + void operator () (const unsigned char * data, size_t len) override; void flush(); @@ -47,6 +48,8 @@ struct Source return the number of bytes stored. If blocks until at least one byte is available. */ virtual size_t read(unsigned char * data, size_t len) = 0; + + virtual bool good() { return true; } }; @@ -60,7 +63,7 @@ struct BufferedSource : Source : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { } ~BufferedSource(); - size_t read(unsigned char * data, size_t len); + size_t read(unsigned char * data, size_t len) override; /* Underlying read call, to be overridden. */ virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0; @@ -80,7 +83,12 @@ struct FdSink : BufferedSink FdSink(int fd) : fd(fd), warn(false), written(0) { } ~FdSink(); - void write(const unsigned char * data, size_t len); + void write(const unsigned char * data, size_t len) override; + + bool good() override; + +private: + bool _good = true; }; @@ -90,7 +98,10 @@ struct FdSource : BufferedSource int fd; FdSource() : fd(-1) { } FdSource(int fd) : fd(fd) { } - size_t readUnbuffered(unsigned char * data, size_t len); + size_t readUnbuffered(unsigned char * data, size_t len) override; + bool good() override; +private: + bool _good = true; }; @@ -98,7 +109,7 @@ struct FdSource : BufferedSource struct StringSink : Sink { string s; - void operator () (const unsigned char * data, size_t len); + void operator () (const unsigned char * data, size_t len) override; }; @@ -108,7 +119,7 @@ struct StringSource : Source const string & s; size_t pos; StringSource(const string & _s) : s(_s), pos(0) { } - size_t read(unsigned char * data, size_t len); + size_t read(unsigned char * data, size_t len) override; }; diff --git a/src/libutil/sync.hh b/src/libutil/sync.hh new file mode 100644 index 000000000000..c99c098ac9c6 --- /dev/null +++ b/src/libutil/sync.hh @@ -0,0 +1,78 @@ +#pragma once + +#include <mutex> +#include <condition_variable> +#include <cassert> + +namespace nix { + +/* This template class ensures synchronized access to a value of type + T. It is used as follows: + + struct Data { int x; ... }; + + Sync<Data> data; + + { + auto data_(data.lock()); + data_->x = 123; + } + + Here, "data" is automatically unlocked when "data_" goes out of + scope. +*/ + +template<class T> +class Sync +{ +private: + std::mutex mutex; + T data; + +public: + + Sync() { } + Sync(const T & data) : data(data) { } + + class Lock + { + private: + Sync * s; + std::unique_lock<std::mutex> lk; + friend Sync; + Lock(Sync * s) : s(s), lk(s->mutex) { } + public: + Lock(Lock && l) : s(l.s) { abort(); } + Lock(const Lock & l) = delete; + ~Lock() { } + T * operator -> () { return &s->data; } + T & operator * () { return s->data; } + + void wait(std::condition_variable & cv) + { + assert(s); + cv.wait(lk); + } + + template<class Rep, class Period, class Predicate> + bool wait_for(std::condition_variable & cv, + const std::chrono::duration<Rep, Period> & duration, + Predicate pred) + { + assert(s); + return cv.wait_for(lk, duration, pred); + } + + template<class Clock, class Duration> + std::cv_status wait_until(std::condition_variable & cv, + const std::chrono::time_point<Clock, Duration> & duration) + { + assert(s); + return cv.wait_until(lk, duration); + } + }; + + Lock lock() { return Lock(this); } +}; + +} diff --git a/src/libutil/types.hh b/src/libutil/types.hh index 0eae46c5fe93..33aaf5fc9c4d 100644 --- a/src/libutil/types.hh +++ b/src/libutil/types.hh @@ -2,6 +2,8 @@ #include "config.h" +#include "ref.hh" + #include <string> #include <list> #include <set> @@ -97,63 +99,4 @@ typedef enum { } Verbosity; -/* A simple non-nullable reference-counted pointer. Actually a wrapper - around std::shared_ptr that prevents non-null constructions. */ -template<typename T> -class ref -{ -private: - - std::shared_ptr<T> p; - -public: - - ref<T>(const ref<T> & r) - : p(r.p) - { } - - explicit ref<T>(const std::shared_ptr<T> & p) - : p(p) - { - if (!p) - throw std::invalid_argument("null pointer cast to ref"); - } - - T* operator ->() const - { - return &*p; - } - - T& operator *() const - { - return *p; - } - - operator std::shared_ptr<T> () - { - return p; - } - - template<typename T2> - operator ref<T2> () - { - return ref<T2>((std::shared_ptr<T2>) p); - } - -private: - - template<typename T2, typename... Args> - friend ref<T2> - make_ref(Args&&... args); - -}; - -template<typename T, typename... Args> -inline ref<T> -make_ref(Args&&... args) -{ - auto p = std::make_shared<T>(std::forward<Args>(args)...); - return ref<T>(p); -} - } |