about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libmain/shared.cc1
-rw-r--r--src/libstore/remote-store.cc465
-rw-r--r--src/libstore/remote-store.hh29
-rw-r--r--src/libutil/pool.hh151
-rw-r--r--src/libutil/ref.hh67
-rw-r--r--src/libutil/serialise.cc22
-rw-r--r--src/libutil/serialise.hh23
-rw-r--r--src/libutil/sync.hh78
-rw-r--r--src/libutil/types.hh61
9 files changed, 584 insertions, 313 deletions
diff --git a/src/libmain/shared.cc b/src/libmain/shared.cc
index 8f2aa842036a..c27302227304 100644
--- a/src/libmain/shared.cc
+++ b/src/libmain/shared.cc
@@ -126,6 +126,7 @@ void initNix()
     std::cerr.rdbuf()->pubsetbuf(buf, sizeof(buf));
 #endif
 
+    // FIXME: do we need this? It's not thread-safe.
     std::ios::sync_with_stdio(false);
 
     if (getEnv("IN_SYSTEMD") == "1")
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index ab2ebb9aecc3..2f540c640e0b 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -6,6 +6,7 @@
 #include "affinity.hh"
 #include "globals.hh"
 #include "derivations.hh"
+#include "pool.hh"
 
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -13,9 +14,8 @@
 #include <sys/un.h>
 #include <errno.h>
 #include <fcntl.h>
-
-#include <iostream>
 #include <unistd.h>
+
 #include <cstring>
 
 namespace nix {
@@ -39,62 +39,25 @@ template<class T> T readStorePaths(Source & from)
 template PathSet readStorePaths(Source & from);
 
 
-RemoteStore::RemoteStore()
+RemoteStore::RemoteStore(size_t maxConnections)
+    : connections(make_ref<Pool<Connection>>(
+            maxConnections,
+            [this]() { return openConnection(); },
+            [](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
+            ))
 {
-    initialised = false;
 }
 
 
-void RemoteStore::openConnection(bool reserveSpace)
+ref<RemoteStore::Connection> RemoteStore::openConnection(bool reserveSpace)
 {
-    if (initialised) return;
-    initialised = true;
+    auto conn = make_ref<Connection>();
 
     /* Connect to a daemon that does the privileged work for us. */
-    connectToDaemon();
-
-    from.fd = fdSocket;
-    to.fd = fdSocket;
-
-    /* Send the magic greeting, check for the reply. */
-    try {
-        to << WORKER_MAGIC_1;
-        to.flush();
-        unsigned int magic = readInt(from);
-        if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
-
-        daemonVersion = readInt(from);
-        if (GET_PROTOCOL_MAJOR(daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
-            throw Error("Nix daemon protocol version not supported");
-        to << PROTOCOL_VERSION;
-
-        if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) {
-            int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
-            if (cpu != -1)
-                to << 1 << cpu;
-            else
-                to << 0;
-        }
-
-        if (GET_PROTOCOL_MINOR(daemonVersion) >= 11)
-            to << reserveSpace;
-
-        processStderr();
-    }
-    catch (Error & e) {
-        throw Error(format("cannot start daemon worker: %1%") % e.msg());
-    }
-
-    setOptions();
-}
-
-
-void RemoteStore::connectToDaemon()
-{
-    fdSocket = socket(PF_UNIX, SOCK_STREAM, 0);
-    if (fdSocket == -1)
+    conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
+    if (conn->fd == -1)
         throw SysError("cannot create Unix domain socket");
-    closeOnExec(fdSocket);
+    closeOnExec(conn->fd);
 
     string socketPath = settings.nixDaemonSocketFile;
 
@@ -111,111 +74,135 @@ void RemoteStore::connectToDaemon()
     addr.sun_family = AF_UNIX;
     if (socketPathRel.size() >= sizeof(addr.sun_path))
         throw Error(format("socket path ‘%1%’ is too long") % socketPathRel);
-    using namespace std;
     strcpy(addr.sun_path, socketPathRel.c_str());
 
-    if (connect(fdSocket, (struct sockaddr *) &addr, sizeof(addr)) == -1)
+    if (connect(conn->fd, (struct sockaddr *) &addr, sizeof(addr)) == -1)
         throw SysError(format("cannot connect to daemon at ‘%1%’") % socketPath);
 
     if (fchdir(fdPrevDir) == -1)
         throw SysError("couldn't change back to previous directory");
-}
 
+    conn->from.fd = conn->fd;
+    conn->to.fd = conn->fd;
 
-RemoteStore::~RemoteStore()
-{
+    /* Send the magic greeting, check for the reply. */
     try {
-        to.flush();
-        fdSocket.close();
-    } catch (...) {
-        ignoreException();
+        conn->to << WORKER_MAGIC_1;
+        conn->to.flush();
+        unsigned int magic = readInt(conn->from);
+        if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
+
+        conn->daemonVersion = readInt(conn->from);
+        if (GET_PROTOCOL_MAJOR(conn->daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
+            throw Error("Nix daemon protocol version not supported");
+        conn->to << PROTOCOL_VERSION;
+
+        if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 14) {
+            int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
+            if (cpu != -1)
+                conn->to << 1 << cpu;
+            else
+                conn->to << 0;
+        }
+
+        if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 11)
+            conn->to << reserveSpace;
+
+        conn->processStderr();
     }
+    catch (Error & e) {
+        throw Error(format("cannot start daemon worker: %1%") % e.msg());
+    }
+
+    setOptions(conn);
+
+    return conn;
 }
 
 
-void RemoteStore::setOptions()
+void RemoteStore::setOptions(ref<Connection> conn)
 {
-    to << wopSetOptions
+    conn->to << wopSetOptions
        << settings.keepFailed
        << settings.keepGoing
        << settings.tryFallback
        << verbosity
        << settings.maxBuildJobs
        << settings.maxSilentTime;
-    if (GET_PROTOCOL_MINOR(daemonVersion) >= 2)
-        to << settings.useBuildHook;
-    if (GET_PROTOCOL_MINOR(daemonVersion) >= 4)
-        to << settings.buildVerbosity
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 2)
+        conn->to << settings.useBuildHook;
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 4)
+        conn->to << settings.buildVerbosity
            << logType
            << settings.printBuildTrace;
-    if (GET_PROTOCOL_MINOR(daemonVersion) >= 6)
-        to << settings.buildCores;
-    if (GET_PROTOCOL_MINOR(daemonVersion) >= 10)
-        to << settings.useSubstitutes;
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 6)
+        conn->to << settings.buildCores;
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 10)
+        conn->to << settings.useSubstitutes;
 
-    if (GET_PROTOCOL_MINOR(daemonVersion) >= 12) {
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 12) {
         Settings::SettingsMap overrides = settings.getOverrides();
         if (overrides["ssh-auth-sock"] == "")
             overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK");
-        to << overrides.size();
+        conn->to << overrides.size();
         for (auto & i : overrides)
-            to << i.first << i.second;
+            conn->to << i.first << i.second;
     }
 
-    processStderr();
+    conn->processStderr();
 }
 
 
 bool RemoteStore::isValidPath(const Path & path)
 {
-    openConnection();
-    to << wopIsValidPath << path;
-    processStderr();
-    unsigned int reply = readInt(from);
+    auto conn(connections->get());
+    conn->to << wopIsValidPath << path;
+    conn->processStderr();
+    unsigned int reply = readInt(conn->from);
     return reply != 0;
 }
 
 
 PathSet RemoteStore::queryValidPaths(const PathSet & paths)
 {
-    openConnection();
-    if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+    auto conn(connections->get());
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
         PathSet res;
         for (auto & i : paths)
             if (isValidPath(i)) res.insert(i);
         return res;
     } else {
-        to << wopQueryValidPaths << paths;
-        processStderr();
-        return readStorePaths<PathSet>(from);
+        conn->to << wopQueryValidPaths << paths;
+        conn->processStderr();
+        return readStorePaths<PathSet>(conn->from);
     }
 }
 
 
 PathSet RemoteStore::queryAllValidPaths()
 {
-    openConnection();
-    to << wopQueryAllValidPaths;
-    processStderr();
-    return readStorePaths<PathSet>(from);
+    auto conn(connections->get());
+    conn->to << wopQueryAllValidPaths;
+    conn->processStderr();
+    return readStorePaths<PathSet>(conn->from);
 }
 
 
 PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
 {
-    openConnection();
-    if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+    auto conn(connections->get());
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
         PathSet res;
         for (auto & i : paths) {
-            to << wopHasSubstitutes << i;
-            processStderr();
-            if (readInt(from)) res.insert(i);
+            conn->to << wopHasSubstitutes << i;
+            conn->processStderr();
+            if (readInt(conn->from)) res.insert(i);
         }
         return res;
     } else {
-        to << wopQuerySubstitutablePaths << paths;
-        processStderr();
-        return readStorePaths<PathSet>(from);
+        conn->to << wopQuerySubstitutablePaths << paths;
+        conn->processStderr();
+        return readStorePaths<PathSet>(conn->from);
     }
 }
 
@@ -225,39 +212,39 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
 {
     if (paths.empty()) return;
 
-    openConnection();
+    auto conn(connections->get());
 
-    if (GET_PROTOCOL_MINOR(daemonVersion) < 3) return;
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 3) return;
 
-    if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
 
         for (auto & i : paths) {
             SubstitutablePathInfo info;
-            to << wopQuerySubstitutablePathInfo << i;
-            processStderr();
-            unsigned int reply = readInt(from);
+            conn->to << wopQuerySubstitutablePathInfo << i;
+            conn->processStderr();
+            unsigned int reply = readInt(conn->from);
             if (reply == 0) continue;
-            info.deriver = readString(from);
+            info.deriver = readString(conn->from);
             if (info.deriver != "") assertStorePath(info.deriver);
-            info.references = readStorePaths<PathSet>(from);
-            info.downloadSize = readLongLong(from);
-            info.narSize = GET_PROTOCOL_MINOR(daemonVersion) >= 7 ? readLongLong(from) : 0;
+            info.references = readStorePaths<PathSet>(conn->from);
+            info.downloadSize = readLongLong(conn->from);
+            info.narSize = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 7 ? readLongLong(conn->from) : 0;
             infos[i] = info;
         }
 
     } else {
 
-        to << wopQuerySubstitutablePathInfos << paths;
-        processStderr();
-        unsigned int count = readInt(from);
+        conn->to << wopQuerySubstitutablePathInfos << paths;
+        conn->processStderr();
+        unsigned int count = readInt(conn->from);
         for (unsigned int n = 0; n < count; n++) {
-            Path path = readStorePath(from);
+            Path path = readStorePath(conn->from);
             SubstitutablePathInfo & info(infos[path]);
-            info.deriver = readString(from);
+            info.deriver = readString(conn->from);
             if (info.deriver != "") assertStorePath(info.deriver);
-            info.references = readStorePaths<PathSet>(from);
-            info.downloadSize = readLongLong(from);
-            info.narSize = readLongLong(from);
+            info.references = readStorePaths<PathSet>(conn->from);
+            info.downloadSize = readLongLong(conn->from);
+            info.narSize = readLongLong(conn->from);
         }
 
     }
@@ -266,27 +253,27 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
 
 ValidPathInfo RemoteStore::queryPathInfo(const Path & path)
 {
-    openConnection();
-    to << wopQueryPathInfo << path;
-    processStderr();
+    auto conn(connections->get());
+    conn->to << wopQueryPathInfo << path;
+    conn->processStderr();
     ValidPathInfo info;
     info.path = path;
-    info.deriver = readString(from);
+    info.deriver = readString(conn->from);
     if (info.deriver != "") assertStorePath(info.deriver);
-    info.narHash = parseHash(htSHA256, readString(from));
-    info.references = readStorePaths<PathSet>(from);
-    info.registrationTime = readInt(from);
-    info.narSize = readLongLong(from);
+    info.narHash = parseHash(htSHA256, readString(conn->from));
+    info.references = readStorePaths<PathSet>(conn->from);
+    info.registrationTime = readInt(conn->from);
+    info.narSize = readLongLong(conn->from);
     return info;
 }
 
 
 Hash RemoteStore::queryPathHash(const Path & path)
 {
-    openConnection();
-    to << wopQueryPathHash << path;
-    processStderr();
-    string hash = readString(from);
+    auto conn(connections->get());
+    conn->to << wopQueryPathHash << path;
+    conn->processStderr();
+    string hash = readString(conn->from);
     return parseHash(htSHA256, hash);
 }
 
@@ -294,10 +281,10 @@ Hash RemoteStore::queryPathHash(const Path & path)
 void RemoteStore::queryReferences(const Path & path,
     PathSet & references)
 {
-    openConnection();
-    to << wopQueryReferences << path;
-    processStderr();
-    PathSet references2 = readStorePaths<PathSet>(from);
+    auto conn(connections->get());
+    conn->to << wopQueryReferences << path;
+    conn->processStderr();
+    PathSet references2 = readStorePaths<PathSet>(conn->from);
     references.insert(references2.begin(), references2.end());
 }
 
@@ -305,20 +292,20 @@ void RemoteStore::queryReferences(const Path & path,
 void RemoteStore::queryReferrers(const Path & path,
     PathSet & referrers)
 {
-    openConnection();
-    to << wopQueryReferrers << path;
-    processStderr();
-    PathSet referrers2 = readStorePaths<PathSet>(from);
+    auto conn(connections->get());
+    conn->to << wopQueryReferrers << path;
+    conn->processStderr();
+    PathSet referrers2 = readStorePaths<PathSet>(conn->from);
     referrers.insert(referrers2.begin(), referrers2.end());
 }
 
 
 Path RemoteStore::queryDeriver(const Path & path)
 {
-    openConnection();
-    to << wopQueryDeriver << path;
-    processStderr();
-    Path drvPath = readString(from);
+    auto conn(connections->get());
+    conn->to << wopQueryDeriver << path;
+    conn->processStderr();
+    Path drvPath = readString(conn->from);
     if (drvPath != "") assertStorePath(drvPath);
     return drvPath;
 }
@@ -326,37 +313,37 @@ Path RemoteStore::queryDeriver(const Path & path)
 
 PathSet RemoteStore::queryValidDerivers(const Path & path)
 {
-    openConnection();
-    to << wopQueryValidDerivers << path;
-    processStderr();
-    return readStorePaths<PathSet>(from);
+    auto conn(connections->get());
+    conn->to << wopQueryValidDerivers << path;
+    conn->processStderr();
+    return readStorePaths<PathSet>(conn->from);
 }
 
 
 PathSet RemoteStore::queryDerivationOutputs(const Path & path)
 {
-    openConnection();
-    to << wopQueryDerivationOutputs << path;
-    processStderr();
-    return readStorePaths<PathSet>(from);
+    auto conn(connections->get());
+    conn->to << wopQueryDerivationOutputs << path;
+    conn->processStderr();
+    return readStorePaths<PathSet>(conn->from);
 }
 
 
 PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
 {
-    openConnection();
-    to << wopQueryDerivationOutputNames << path;
-    processStderr();
-    return readStrings<PathSet>(from);
+    auto conn(connections->get());
+    conn->to << wopQueryDerivationOutputNames << path;
+    conn->processStderr();
+    return readStrings<PathSet>(conn->from);
 }
 
 
 Path RemoteStore::queryPathFromHashPart(const string & hashPart)
 {
-    openConnection();
-    to << wopQueryPathFromHashPart << hashPart;
-    processStderr();
-    Path path = readString(from);
+    auto conn(connections->get());
+    conn->to << wopQueryPathFromHashPart << hashPart;
+    conn->processStderr();
+    Path path = readString(conn->from);
     if (!path.empty()) assertStorePath(path);
     return path;
 }
@@ -367,32 +354,32 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
 {
     if (repair) throw Error("repairing is not supported when building through the Nix daemon");
 
-    openConnection();
+    auto conn(connections->get());
 
     Path srcPath(absPath(_srcPath));
 
-    to << wopAddToStore << name
+    conn->to << wopAddToStore << name
        << ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */
        << (recursive ? 1 : 0)
        << printHashType(hashAlgo);
 
     try {
-        to.written = 0;
-        to.warn = true;
-        dumpPath(srcPath, to, filter);
-        to.warn = false;
-        processStderr();
+        conn->to.written = 0;
+        conn->to.warn = true;
+        dumpPath(srcPath, conn->to, filter);
+        conn->to.warn = false;
+        conn->processStderr();
     } catch (SysError & e) {
         /* Daemon closed while we were sending the path. Probably OOM
            or I/O error. */
         if (e.errNo == EPIPE)
             try {
-                processStderr();
+                conn->processStderr();
             } catch (EndOfFile & e) { }
         throw;
     }
 
-    return readStorePath(from);
+    return readStorePath(conn->from);
 }
 
 
@@ -401,43 +388,43 @@ Path RemoteStore::addTextToStore(const string & name, const string & s,
 {
     if (repair) throw Error("repairing is not supported when building through the Nix daemon");
 
-    openConnection();
-    to << wopAddTextToStore << name << s << references;
+    auto conn(connections->get());
+    conn->to << wopAddTextToStore << name << s << references;
 
-    processStderr();
-    return readStorePath(from);
+    conn->processStderr();
+    return readStorePath(conn->from);
 }
 
 
 void RemoteStore::exportPath(const Path & path, bool sign,
     Sink & sink)
 {
-    openConnection();
-    to << wopExportPath << path << (sign ? 1 : 0);
-    processStderr(&sink); /* sink receives the actual data */
-    readInt(from);
+    auto conn(connections->get());
+    conn->to << wopExportPath << path << (sign ? 1 : 0);
+    conn->processStderr(&sink); /* sink receives the actual data */
+    readInt(conn->from);
 }
 
 
 Paths RemoteStore::importPaths(bool requireSignature, Source & source)
 {
-    openConnection();
-    to << wopImportPaths;
+    auto conn(connections->get());
+    conn->to << wopImportPaths;
     /* We ignore requireSignature, since the worker forces it to true
        anyway. */
-    processStderr(0, &source);
-    return readStorePaths<Paths>(from);
+    conn->processStderr(0, &source);
+    return readStorePaths<Paths>(conn->from);
 }
 
 
 void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
 {
-    openConnection();
-    to << wopBuildPaths;
-    if (GET_PROTOCOL_MINOR(daemonVersion) >= 13) {
-        to << drvPaths;
-        if (GET_PROTOCOL_MINOR(daemonVersion) >= 15)
-            to << buildMode;
+    auto conn(connections->get());
+    conn->to << wopBuildPaths;
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
+        conn->to << drvPaths;
+        if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
+            conn->to << buildMode;
         else
             /* Old daemons did not take a 'buildMode' parameter, so we
                need to validate it here on the client side.  */
@@ -449,22 +436,22 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
         PathSet drvPaths2;
         for (auto & i : drvPaths)
             drvPaths2.insert(string(i, 0, i.find('!')));
-        to << drvPaths2;
+        conn->to << drvPaths2;
     }
-    processStderr();
-    readInt(from);
+    conn->processStderr();
+    readInt(conn->from);
 }
 
 
 BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
     BuildMode buildMode)
 {
-    openConnection();
-    to << wopBuildDerivation << drvPath << drv << buildMode;
-    processStderr();
+    auto conn(connections->get());
+    conn->to << wopBuildDerivation << drvPath << drv << buildMode;
+    conn->processStderr();
     BuildResult res;
     unsigned int status;
-    from >> status >> res.errorMsg;
+    conn->from >> status >> res.errorMsg;
     res.status = (BuildResult::Status) status;
     return res;
 }
@@ -472,50 +459,50 @@ BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDeriva
 
 void RemoteStore::ensurePath(const Path & path)
 {
-    openConnection();
-    to << wopEnsurePath << path;
-    processStderr();
-    readInt(from);
+    auto conn(connections->get());
+    conn->to << wopEnsurePath << path;
+    conn->processStderr();
+    readInt(conn->from);
 }
 
 
 void RemoteStore::addTempRoot(const Path & path)
 {
-    openConnection();
-    to << wopAddTempRoot << path;
-    processStderr();
-    readInt(from);
+    auto conn(connections->get());
+    conn->to << wopAddTempRoot << path;
+    conn->processStderr();
+    readInt(conn->from);
 }
 
 
 void RemoteStore::addIndirectRoot(const Path & path)
 {
-    openConnection();
-    to << wopAddIndirectRoot << path;
-    processStderr();
-    readInt(from);
+    auto conn(connections->get());
+    conn->to << wopAddIndirectRoot << path;
+    conn->processStderr();
+    readInt(conn->from);
 }
 
 
 void RemoteStore::syncWithGC()
 {
-    openConnection();
-    to << wopSyncWithGC;
-    processStderr();
-    readInt(from);
+    auto conn(connections->get());
+    conn->to << wopSyncWithGC;
+    conn->processStderr();
+    readInt(conn->from);
 }
 
 
 Roots RemoteStore::findRoots()
 {
-    openConnection();
-    to << wopFindRoots;
-    processStderr();
-    unsigned int count = readInt(from);
+    auto conn(connections->get());
+    conn->to << wopFindRoots;
+    conn->processStderr();
+    unsigned int count = readInt(conn->from);
     Roots result;
     while (count--) {
-        Path link = readString(from);
-        Path target = readStorePath(from);
+        Path link = readString(conn->from);
+        Path target = readStorePath(conn->from);
         result[link] = target;
     }
     return result;
@@ -524,56 +511,68 @@ Roots RemoteStore::findRoots()
 
 void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
 {
-    openConnection(false);
+    auto conn(connections->get());
 
-    to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
+    conn->to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
        << options.maxFreed << 0;
-    if (GET_PROTOCOL_MINOR(daemonVersion) >= 5)
+    if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 5)
         /* removed options */
-        to << 0 << 0;
+        conn->to << 0 << 0;
 
-    processStderr();
+    conn->processStderr();
 
-    results.paths = readStrings<PathSet>(from);
-    results.bytesFreed = readLongLong(from);
-    readLongLong(from); // obsolete
+    results.paths = readStrings<PathSet>(conn->from);
+    results.bytesFreed = readLongLong(conn->from);
+    readLongLong(conn->from); // obsolete
 }
 
 
 PathSet RemoteStore::queryFailedPaths()
 {
-    openConnection();
-    to << wopQueryFailedPaths;
-    processStderr();
-    return readStorePaths<PathSet>(from);
+    auto conn(connections->get());
+    conn->to << wopQueryFailedPaths;
+    conn->processStderr();
+    return readStorePaths<PathSet>(conn->from);
 }
 
 
 void RemoteStore::clearFailedPaths(const PathSet & paths)
 {
-    openConnection();
-    to << wopClearFailedPaths << paths;
-    processStderr();
-    readInt(from);
+    auto conn(connections->get());
+    conn->to << wopClearFailedPaths << paths;
+    conn->processStderr();
+    readInt(conn->from);
 }
 
 void RemoteStore::optimiseStore()
 {
-    openConnection();
-    to << wopOptimiseStore;
-    processStderr();
-    readInt(from);
+    auto conn(connections->get());
+    conn->to << wopOptimiseStore;
+    conn->processStderr();
+    readInt(conn->from);
 }
 
 bool RemoteStore::verifyStore(bool checkContents, bool repair)
 {
-    openConnection();
-    to << wopVerifyStore << checkContents << repair;
-    processStderr();
-    return readInt(from) != 0;
+    auto conn(connections->get());
+    conn->to << wopVerifyStore << checkContents << repair;
+    conn->processStderr();
+    return readInt(conn->from) != 0;
+}
+
+
+RemoteStore::Connection::~Connection()
+{
+    try {
+        to.flush();
+        fd.close();
+    } catch (...) {
+        ignoreException();
+    }
 }
 
-void RemoteStore::processStderr(Sink * sink, Source * source)
+
+void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
 {
     to.flush();
     unsigned int msg;
diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh
index f15182285e8c..af417b84ff9c 100644
--- a/src/libstore/remote-store.hh
+++ b/src/libstore/remote-store.hh
@@ -1,5 +1,6 @@
 #pragma once
 
+#include <limits>
 #include <string>
 
 #include "store-api.hh"
@@ -12,15 +13,14 @@ class Pipe;
 class Pid;
 struct FdSink;
 struct FdSource;
+template<typename T> class Pool;
 
 
 class RemoteStore : public Store
 {
 public:
 
-    RemoteStore();
-
-    ~RemoteStore();
+    RemoteStore(size_t maxConnections = std::numeric_limits<size_t>::max());
 
     /* Implementations of abstract store API methods. */
 
@@ -91,19 +91,24 @@ public:
     bool verifyStore(bool checkContents, bool repair) override;
 
 private:
-    AutoCloseFD fdSocket;
-    FdSink to;
-    FdSource from;
-    unsigned int daemonVersion;
-    bool initialised;
 
-    void openConnection(bool reserveSpace = true);
+    struct Connection
+    {
+        AutoCloseFD fd;
+        FdSink to;
+        FdSource from;
+        unsigned int daemonVersion;
+
+        ~Connection();
+
+        void processStderr(Sink * sink = 0, Source * source = 0);
+    };
 
-    void processStderr(Sink * sink = 0, Source * source = 0);
+    ref<Pool<Connection>> connections;
 
-    void connectToDaemon();
+    ref<Connection> openConnection(bool reserveSpace = true);
 
-    void setOptions();
+    void setOptions(ref<Connection> conn);
 };
 
 
diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh
new file mode 100644
index 000000000000..f291cd578388
--- /dev/null
+++ b/src/libutil/pool.hh
@@ -0,0 +1,151 @@
+#pragma once
+
+#include <functional>
+#include <limits>
+#include <list>
+#include <memory>
+#include <cassert>
+
+#include "sync.hh"
+#include "ref.hh"
+
+namespace nix {
+
+/* This template class implements a simple pool manager of resources
+   of some type R, such as database connections. It is used as
+   follows:
+
+     class Connection { ... };
+
+     Pool<Connection> pool;
+
+     {
+       auto conn(pool.get());
+       conn->exec("select ...");
+     }
+
+   Here, the Connection object referenced by ‘conn’ is automatically
+   returned to the pool when ‘conn’ goes out of scope.
+*/
+
+template <class R>
+class Pool
+{
+public:
+
+    /* A function that produces new instances of R on demand. */
+    typedef std::function<ref<R>()> Factory;
+
+    /* A function that checks whether an instance of R is still
+       usable. Unusable instances are removed from the pool. */
+    typedef std::function<bool(const ref<R> &)> Validator;
+
+private:
+
+    Factory factory;
+    Validator validator;
+
+    struct State
+    {
+        size_t inUse = 0;
+        size_t max;
+        std::vector<ref<R>> idle;
+    };
+
+    Sync<State> state;
+
+    std::condition_variable wakeup;
+
+public:
+
+    Pool(size_t max = std::numeric_limits<size_t>::max(),
+        const Factory & factory = []() { return make_ref<R>(); },
+        const Validator & validator = [](ref<R> r) { return true; })
+        : factory(factory)
+        , validator(validator)
+    {
+        auto state_(state.lock());
+        state_->max = max;
+    }
+
+    ~Pool()
+    {
+        auto state_(state.lock());
+        assert(!state_->inUse);
+        state_->max = 0;
+        state_->idle.clear();
+    }
+
+    class Handle
+    {
+    private:
+        Pool & pool;
+        std::shared_ptr<R> r;
+
+        friend Pool;
+
+        Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
+
+    public:
+        Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); }
+
+        Handle(const Handle & l) = delete;
+
+        ~Handle()
+        {
+            if (!r) return;
+            {
+                auto state_(pool.state.lock());
+                state_->idle.push_back(ref<R>(r));
+                assert(state_->inUse);
+                state_->inUse--;
+            }
+            pool.wakeup.notify_one();
+        }
+
+        R * operator -> () { return &*r; }
+        R & operator * () { return *r; }
+    };
+
+    Handle get()
+    {
+        {
+            auto state_(state.lock());
+
+            /* If we're over the maximum number of instance, we need
+               to wait until a slot becomes available. */
+            while (state_->idle.empty() && state_->inUse >= state_->max)
+                state_.wait(wakeup);
+
+            while (!state_->idle.empty()) {
+                auto p = state_->idle.back();
+                state_->idle.pop_back();
+                if (validator(p)) {
+                    state_->inUse++;
+                    return Handle(*this, p);
+                }
+            }
+
+            state_->inUse++;
+        }
+
+        /* We need to create a new instance. Because that might take a
+           while, we don't hold the lock in the meantime. */
+        try {
+            Handle h(*this, factory());
+            return h;
+        } catch (...) {
+            auto state_(state.lock());
+            state_->inUse--;
+            throw;
+        }
+    }
+
+    unsigned int count()
+    {
+        auto state_(state.lock());
+        return state_->idle.size() + state_->inUse;
+    }
+};
+
+}
diff --git a/src/libutil/ref.hh b/src/libutil/ref.hh
new file mode 100644
index 000000000000..a6d338d79622
--- /dev/null
+++ b/src/libutil/ref.hh
@@ -0,0 +1,67 @@
+#pragma once
+
+#include <memory>
+#include <exception>
+
+namespace nix {
+
+/* A simple non-nullable reference-counted pointer. Actually a wrapper
+   around std::shared_ptr that prevents non-null constructions. */
+template<typename T>
+class ref
+{
+private:
+
+    std::shared_ptr<T> p;
+
+public:
+
+    ref<T>(const ref<T> & r)
+        : p(r.p)
+    { }
+
+    explicit ref<T>(const std::shared_ptr<T> & p)
+        : p(p)
+    {
+        if (!p)
+            throw std::invalid_argument("null pointer cast to ref");
+    }
+
+    T* operator ->() const
+    {
+        return &*p;
+    }
+
+    T& operator *() const
+    {
+        return *p;
+    }
+
+    operator std::shared_ptr<T> ()
+    {
+        return p;
+    }
+
+    template<typename T2>
+    operator ref<T2> ()
+    {
+        return ref<T2>((std::shared_ptr<T2>) p);
+    }
+
+private:
+
+    template<typename T2, typename... Args>
+    friend ref<T2>
+    make_ref(Args&&... args);
+
+};
+
+template<typename T, typename... Args>
+inline ref<T>
+make_ref(Args&&... args)
+{
+    auto p = std::make_shared<T>(std::forward<Args>(args)...);
+    return ref<T>(p);
+}
+
+}
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index f136a13248ba..c9620e2bf32a 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -72,7 +72,17 @@ void FdSink::write(const unsigned char * data, size_t len)
             warned = true;
         }
     }
-    writeFull(fd, data, len);
+    try {
+        writeFull(fd, data, len);
+    } catch (SysError & e) {
+        _good = true;
+    }
+}
+
+
+bool FdSink::good()
+{
+    return _good;
 }
 
 
@@ -119,12 +129,18 @@ size_t FdSource::readUnbuffered(unsigned char * data, size_t len)
         checkInterrupt();
         n = ::read(fd, (char *) data, bufSize);
     } while (n == -1 && errno == EINTR);
-    if (n == -1) throw SysError("reading from file");
-    if (n == 0) throw EndOfFile("unexpected end-of-file");
+    if (n == -1) { _good = false; throw SysError("reading from file"); }
+    if (n == 0) { _good = false; throw EndOfFile("unexpected end-of-file"); }
     return n;
 }
 
 
+bool FdSource::good()
+{
+    return _good;
+}
+
+
 size_t StringSource::read(unsigned char * data, size_t len)
 {
     if (pos == s.size()) throw EndOfFile("end of string reached");
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 979ff849fcaf..9e269f3923ea 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -12,6 +12,7 @@ struct Sink
 {
     virtual ~Sink() { }
     virtual void operator () (const unsigned char * data, size_t len) = 0;
+    virtual bool good() { return true; }
 };
 
 
@@ -25,7 +26,7 @@ struct BufferedSink : Sink
         : bufSize(bufSize), bufPos(0), buffer(0) { }
     ~BufferedSink();
 
-    void operator () (const unsigned char * data, size_t len);
+    void operator () (const unsigned char * data, size_t len) override;
 
     void flush();
 
@@ -47,6 +48,8 @@ struct Source
        return the number of bytes stored.  If blocks until at least
        one byte is available. */
     virtual size_t read(unsigned char * data, size_t len) = 0;
+
+    virtual bool good() { return true; }
 };
 
 
@@ -60,7 +63,7 @@ struct BufferedSource : Source
         : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
     ~BufferedSource();
 
-    size_t read(unsigned char * data, size_t len);
+    size_t read(unsigned char * data, size_t len) override;
 
     /* Underlying read call, to be overridden. */
     virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0;
@@ -80,7 +83,12 @@ struct FdSink : BufferedSink
     FdSink(int fd) : fd(fd), warn(false), written(0) { }
     ~FdSink();
 
-    void write(const unsigned char * data, size_t len);
+    void write(const unsigned char * data, size_t len) override;
+
+    bool good() override;
+
+private:
+    bool _good = true;
 };
 
 
@@ -90,7 +98,10 @@ struct FdSource : BufferedSource
     int fd;
     FdSource() : fd(-1) { }
     FdSource(int fd) : fd(fd) { }
-    size_t readUnbuffered(unsigned char * data, size_t len);
+    size_t readUnbuffered(unsigned char * data, size_t len) override;
+    bool good() override;
+private:
+    bool _good = true;
 };
 
 
@@ -98,7 +109,7 @@ struct FdSource : BufferedSource
 struct StringSink : Sink
 {
     string s;
-    void operator () (const unsigned char * data, size_t len);
+    void operator () (const unsigned char * data, size_t len) override;
 };
 
 
@@ -108,7 +119,7 @@ struct StringSource : Source
     const string & s;
     size_t pos;
     StringSource(const string & _s) : s(_s), pos(0) { }
-    size_t read(unsigned char * data, size_t len);
+    size_t read(unsigned char * data, size_t len) override;
 };
 
 
diff --git a/src/libutil/sync.hh b/src/libutil/sync.hh
new file mode 100644
index 000000000000..c99c098ac9c6
--- /dev/null
+++ b/src/libutil/sync.hh
@@ -0,0 +1,78 @@
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+#include <cassert>
+
+namespace nix {
+
+/* This template class ensures synchronized access to a value of type
+   T. It is used as follows:
+
+     struct Data { int x; ... };
+
+     Sync<Data> data;
+
+     {
+       auto data_(data.lock());
+       data_->x = 123;
+     }
+
+   Here, "data" is automatically unlocked when "data_" goes out of
+   scope.
+*/
+
+template<class T>
+class Sync
+{
+private:
+    std::mutex mutex;
+    T data;
+
+public:
+
+    Sync() { }
+    Sync(const T & data) : data(data) { }
+
+    class Lock
+    {
+    private:
+        Sync * s;
+        std::unique_lock<std::mutex> lk;
+        friend Sync;
+        Lock(Sync * s) : s(s), lk(s->mutex) { }
+    public:
+        Lock(Lock && l) : s(l.s) { abort(); }
+        Lock(const Lock & l) = delete;
+        ~Lock() { }
+        T * operator -> () { return &s->data; }
+        T & operator * () { return s->data; }
+
+        void wait(std::condition_variable & cv)
+        {
+            assert(s);
+            cv.wait(lk);
+        }
+
+        template<class Rep, class Period, class Predicate>
+        bool wait_for(std::condition_variable & cv,
+            const std::chrono::duration<Rep, Period> & duration,
+            Predicate pred)
+        {
+            assert(s);
+            return cv.wait_for(lk, duration, pred);
+        }
+
+        template<class Clock, class Duration>
+        std::cv_status wait_until(std::condition_variable & cv,
+            const std::chrono::time_point<Clock, Duration> & duration)
+        {
+            assert(s);
+            return cv.wait_until(lk, duration);
+        }
+    };
+
+    Lock lock() { return Lock(this); }
+};
+
+}
diff --git a/src/libutil/types.hh b/src/libutil/types.hh
index 0eae46c5fe93..33aaf5fc9c4d 100644
--- a/src/libutil/types.hh
+++ b/src/libutil/types.hh
@@ -2,6 +2,8 @@
 
 #include "config.h"
 
+#include "ref.hh"
+
 #include <string>
 #include <list>
 #include <set>
@@ -97,63 +99,4 @@ typedef enum {
 } Verbosity;
 
 
-/* A simple non-nullable reference-counted pointer. Actually a wrapper
-   around std::shared_ptr that prevents non-null constructions. */
-template<typename T>
-class ref
-{
-private:
-
-    std::shared_ptr<T> p;
-
-public:
-
-    ref<T>(const ref<T> & r)
-        : p(r.p)
-    { }
-
-    explicit ref<T>(const std::shared_ptr<T> & p)
-        : p(p)
-    {
-        if (!p)
-            throw std::invalid_argument("null pointer cast to ref");
-    }
-
-    T* operator ->() const
-    {
-        return &*p;
-    }
-
-    T& operator *() const
-    {
-        return *p;
-    }
-
-    operator std::shared_ptr<T> ()
-    {
-        return p;
-    }
-
-    template<typename T2>
-    operator ref<T2> ()
-    {
-        return ref<T2>((std::shared_ptr<T2>) p);
-    }
-
-private:
-
-    template<typename T2, typename... Args>
-    friend ref<T2>
-    make_ref(Args&&... args);
-
-};
-
-template<typename T, typename... Args>
-inline ref<T>
-make_ref(Args&&... args)
-{
-    auto p = std::make_shared<T>(std::forward<Args>(args)...);
-    return ref<T>(p);
-}
-
 }