about summary refs log tree commit diff
path: root/src/libstore/remote-store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r--src/libstore/remote-store.cc255
1 files changed, 159 insertions, 96 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 080cef93d214..15faf78a526d 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -7,6 +7,7 @@
 #include "globals.hh"
 #include "derivations.hh"
 #include "pool.hh"
+#include "finally.hh"
 
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -160,7 +161,8 @@ void RemoteStore::initConnection(Connection & conn)
         if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 11)
             conn.to << false;
 
-        conn.processStderr();
+        auto ex = conn.processStderr();
+        if (ex) std::rethrow_exception(ex);
     }
     catch (Error & e) {
         throw Error("cannot open connection to remote store '%s': %s", getUri(), e.what());
@@ -187,28 +189,75 @@ void RemoteStore::setOptions(Connection & conn)
        << settings.useSubstitutes;
 
     if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) {
-        auto overrides = settings.getSettings(true);
+        std::map<std::string, Config::SettingInfo> overrides;
+        globalConfig.getSettings(overrides, true);
         conn.to << overrides.size();
         for (auto & i : overrides)
-            conn.to << i.first << i.second;
+            conn.to << i.first << i.second.value;
     }
 
-    conn.processStderr();
+    auto ex = conn.processStderr();
+    if (ex) std::rethrow_exception(ex);
+}
+
+
+/* A wrapper around Pool<RemoteStore::Connection>::Handle that marks
+   the connection as bad (causing it to be closed) if a non-daemon
+   exception is thrown before the handle is closed. Such an exception
+   causes a deviation from the expected protocol and therefore a
+   desynchronization between the client and daemon. */
+struct ConnectionHandle
+{
+    Pool<RemoteStore::Connection>::Handle handle;
+    bool daemonException = false;
+
+    ConnectionHandle(Pool<RemoteStore::Connection>::Handle && handle)
+        : handle(std::move(handle))
+    { }
+
+    ConnectionHandle(ConnectionHandle && h)
+        : handle(std::move(h.handle))
+    { }
+
+    ~ConnectionHandle()
+    {
+        if (!daemonException && std::uncaught_exception()) {
+            handle.markBad();
+            debug("closing daemon connection because of an exception");
+        }
+    }
+
+    RemoteStore::Connection * operator -> () { return &*handle; }
+
+    void processStderr(Sink * sink = 0, Source * source = 0)
+    {
+        auto ex = handle->processStderr(sink, source);
+        if (ex) {
+            daemonException = true;
+            std::rethrow_exception(ex);
+        }
+    }
+};
+
+
+ConnectionHandle RemoteStore::getConnection()
+{
+    return ConnectionHandle(connections->get());
 }
 
 
 bool RemoteStore::isValidPathUncached(const Path & path)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopIsValidPath << path;
-    conn->processStderr();
+    conn.processStderr();
     return readInt(conn->from);
 }
 
 
 PathSet RemoteStore::queryValidPaths(const PathSet & paths, SubstituteFlag maybeSubstitute)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
         PathSet res;
         for (auto & i : paths)
@@ -216,7 +265,7 @@ PathSet RemoteStore::queryValidPaths(const PathSet & paths, SubstituteFlag maybe
         return res;
     } else {
         conn->to << wopQueryValidPaths << paths;
-        conn->processStderr();
+        conn.processStderr();
         return readStorePaths<PathSet>(*this, conn->from);
     }
 }
@@ -224,27 +273,27 @@ PathSet RemoteStore::queryValidPaths(const PathSet & paths, SubstituteFlag maybe
 
 PathSet RemoteStore::queryAllValidPaths()
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopQueryAllValidPaths;
-    conn->processStderr();
+    conn.processStderr();
     return readStorePaths<PathSet>(*this, conn->from);
 }
 
 
 PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
         PathSet res;
         for (auto & i : paths) {
             conn->to << wopHasSubstitutes << i;
-            conn->processStderr();
+            conn.processStderr();
             if (readInt(conn->from)) res.insert(i);
         }
         return res;
     } else {
         conn->to << wopQuerySubstitutablePaths << paths;
-        conn->processStderr();
+        conn.processStderr();
         return readStorePaths<PathSet>(*this, conn->from);
     }
 }
@@ -255,14 +304,14 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
 {
     if (paths.empty()) return;
 
-    auto conn(connections->get());
+    auto conn(getConnection());
 
     if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
 
         for (auto & i : paths) {
             SubstitutablePathInfo info;
             conn->to << wopQuerySubstitutablePathInfo << i;
-            conn->processStderr();
+            conn.processStderr();
             unsigned int reply = readInt(conn->from);
             if (reply == 0) continue;
             info.deriver = readString(conn->from);
@@ -276,7 +325,7 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
     } else {
 
         conn->to << wopQuerySubstitutablePathInfos << paths;
-        conn->processStderr();
+        conn.processStderr();
         size_t count = readNum<size_t>(conn->from);
         for (size_t n = 0; n < count; n++) {
             Path path = readStorePath(*this, conn->from);
@@ -293,47 +342,49 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
 
 
 void RemoteStore::queryPathInfoUncached(const Path & path,
-    std::function<void(std::shared_ptr<ValidPathInfo>)> success,
-    std::function<void(std::exception_ptr exc)> failure)
-{
-    sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() {
-        auto conn(connections->get());
-        conn->to << wopQueryPathInfo << path;
-        try {
-            conn->processStderr();
-        } catch (Error & e) {
-            // Ugly backwards compatibility hack.
-            if (e.msg().find("is not valid") != std::string::npos)
-                throw InvalidPath(e.what());
-            throw;
-        }
-        if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) {
-            bool valid; conn->from >> valid;
-            if (!valid) throw InvalidPath(format("path '%s' is not valid") % path);
-        }
-        auto info = std::make_shared<ValidPathInfo>();
-        info->path = path;
-        info->deriver = readString(conn->from);
-        if (info->deriver != "") assertStorePath(info->deriver);
-        info->narHash = Hash(readString(conn->from), htSHA256);
-        info->references = readStorePaths<PathSet>(*this, conn->from);
-        conn->from >> info->registrationTime >> info->narSize;
-        if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
-            conn->from >> info->ultimate;
-            info->sigs = readStrings<StringSet>(conn->from);
-            conn->from >> info->ca;
+    Callback<std::shared_ptr<ValidPathInfo>> callback)
+{
+    try {
+        std::shared_ptr<ValidPathInfo> info;
+        {
+            auto conn(getConnection());
+            conn->to << wopQueryPathInfo << path;
+            try {
+                conn.processStderr();
+            } catch (Error & e) {
+                // Ugly backwards compatibility hack.
+                if (e.msg().find("is not valid") != std::string::npos)
+                    throw InvalidPath(e.what());
+                throw;
+            }
+            if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) {
+                bool valid; conn->from >> valid;
+                if (!valid) throw InvalidPath(format("path '%s' is not valid") % path);
+            }
+            info = std::make_shared<ValidPathInfo>();
+            info->path = path;
+            info->deriver = readString(conn->from);
+            if (info->deriver != "") assertStorePath(info->deriver);
+            info->narHash = Hash(readString(conn->from), htSHA256);
+            info->references = readStorePaths<PathSet>(*this, conn->from);
+            conn->from >> info->registrationTime >> info->narSize;
+            if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
+                conn->from >> info->ultimate;
+                info->sigs = readStrings<StringSet>(conn->from);
+                conn->from >> info->ca;
+            }
         }
-        return info;
-    });
+        callback(std::move(info));
+    } catch (...) { callback.rethrow(); }
 }
 
 
 void RemoteStore::queryReferrers(const Path & path,
     PathSet & referrers)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopQueryReferrers << path;
-    conn->processStderr();
+    conn.processStderr();
     PathSet referrers2 = readStorePaths<PathSet>(*this, conn->from);
     referrers.insert(referrers2.begin(), referrers2.end());
 }
@@ -341,36 +392,36 @@ void RemoteStore::queryReferrers(const Path & path,
 
 PathSet RemoteStore::queryValidDerivers(const Path & path)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopQueryValidDerivers << path;
-    conn->processStderr();
+    conn.processStderr();
     return readStorePaths<PathSet>(*this, conn->from);
 }
 
 
 PathSet RemoteStore::queryDerivationOutputs(const Path & path)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopQueryDerivationOutputs << path;
-    conn->processStderr();
+    conn.processStderr();
     return readStorePaths<PathSet>(*this, conn->from);
 }
 
 
 PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopQueryDerivationOutputNames << path;
-    conn->processStderr();
+    conn.processStderr();
     return readStrings<PathSet>(conn->from);
 }
 
 
 Path RemoteStore::queryPathFromHashPart(const string & hashPart)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopQueryPathFromHashPart << hashPart;
-    conn->processStderr();
+    conn.processStderr();
     Path path = readString(conn->from);
     if (!path.empty()) assertStorePath(path);
     return path;
@@ -380,7 +431,7 @@ Path RemoteStore::queryPathFromHashPart(const string & hashPart)
 void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
     RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
 
     if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) {
         conn->to << wopImportPaths;
@@ -399,7 +450,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
                 ;
         });
 
-        conn->processStderr(0, source2.get());
+        conn.processStderr(0, source2.get());
 
         auto importedPaths = readStorePaths<PathSet>(*this, conn->from);
         assert(importedPaths.size() <= 1);
@@ -411,8 +462,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
                  << info.references << info.registrationTime << info.narSize
                  << info.ultimate << info.sigs << info.ca
                  << repair << !checkSigs;
-        copyNAR(source, conn->to);
-        conn->processStderr();
+        bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
+        if (!tunnel) copyNAR(source, conn->to);
+        conn.processStderr(0, tunnel ? &source : nullptr);
     }
 }
 
@@ -422,7 +474,7 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
 {
     if (repair) throw Error("repairing is not supported when building through the Nix daemon");
 
-    auto conn(connections->get());
+    auto conn(getConnection());
 
     Path srcPath(absPath(_srcPath));
 
@@ -435,16 +487,18 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
         conn->to.written = 0;
         conn->to.warn = true;
         connections->incCapacity();
-        dumpPath(srcPath, conn->to, filter);
-        connections->decCapacity();
+        {
+            Finally cleanup([&]() { connections->decCapacity(); });
+            dumpPath(srcPath, conn->to, filter);
+        }
         conn->to.warn = false;
-        conn->processStderr();
+        conn.processStderr();
     } catch (SysError & e) {
         /* Daemon closed while we were sending the path. Probably OOM
            or I/O error. */
         if (e.errNo == EPIPE)
             try {
-                conn->processStderr();
+                conn.processStderr();
             } catch (EndOfFile & e) { }
         throw;
     }
@@ -458,17 +512,17 @@ Path RemoteStore::addTextToStore(const string & name, const string & s,
 {
     if (repair) throw Error("repairing is not supported when building through the Nix daemon");
 
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopAddTextToStore << name << s << references;
 
-    conn->processStderr();
+    conn.processStderr();
     return readStorePath(*this, conn->from);
 }
 
 
 void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopBuildPaths;
     if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
         conn->to << drvPaths;
@@ -487,7 +541,7 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
             drvPaths2.insert(string(i, 0, i.find('!')));
         conn->to << drvPaths2;
     }
-    conn->processStderr();
+    conn.processStderr();
     readInt(conn->from);
 }
 
@@ -495,9 +549,9 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
 BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
     BuildMode buildMode)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopBuildDerivation << drvPath << drv << buildMode;
-    conn->processStderr();
+    conn.processStderr();
     BuildResult res;
     unsigned int status;
     conn->from >> status >> res.errorMsg;
@@ -508,51 +562,51 @@ BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDeriva
 
 void RemoteStore::ensurePath(const Path & path)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopEnsurePath << path;
-    conn->processStderr();
+    conn.processStderr();
     readInt(conn->from);
 }
 
 
 void RemoteStore::addTempRoot(const Path & path)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopAddTempRoot << path;
-    conn->processStderr();
+    conn.processStderr();
     readInt(conn->from);
 }
 
 
 void RemoteStore::addIndirectRoot(const Path & path)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopAddIndirectRoot << path;
-    conn->processStderr();
+    conn.processStderr();
     readInt(conn->from);
 }
 
 
 void RemoteStore::syncWithGC()
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopSyncWithGC;
-    conn->processStderr();
+    conn.processStderr();
     readInt(conn->from);
 }
 
 
-Roots RemoteStore::findRoots()
+Roots RemoteStore::findRoots(bool censor)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopFindRoots;
-    conn->processStderr();
+    conn.processStderr();
     size_t count = readNum<size_t>(conn->from);
     Roots result;
     while (count--) {
         Path link = readString(conn->from);
         Path target = readStorePath(*this, conn->from);
-        result[link] = target;
+        result[target].emplace(link);
     }
     return result;
 }
@@ -560,7 +614,7 @@ Roots RemoteStore::findRoots()
 
 void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
 
     conn->to
         << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
@@ -568,7 +622,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
         /* removed options */
         << 0 << 0 << 0;
 
-    conn->processStderr();
+    conn.processStderr();
 
     results.paths = readStrings<PathSet>(conn->from);
     results.bytesFreed = readLongLong(conn->from);
@@ -583,27 +637,27 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
 
 void RemoteStore::optimiseStore()
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopOptimiseStore;
-    conn->processStderr();
+    conn.processStderr();
     readInt(conn->from);
 }
 
 
 bool RemoteStore::verifyStore(bool checkContents, RepairFlag repair)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopVerifyStore << checkContents << repair;
-    conn->processStderr();
+    conn.processStderr();
     return readInt(conn->from);
 }
 
 
 void RemoteStore::addSignatures(const Path & storePath, const StringSet & sigs)
 {
-    auto conn(connections->get());
+    auto conn(getConnection());
     conn->to << wopAddSignatures << storePath << sigs;
-    conn->processStderr();
+    conn.processStderr();
     readInt(conn->from);
 }
 
@@ -613,13 +667,13 @@ void RemoteStore::queryMissing(const PathSet & targets,
     unsigned long long & downloadSize, unsigned long long & narSize)
 {
     {
-        auto conn(connections->get());
+        auto conn(getConnection());
         if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19)
             // Don't hold the connection handle in the fallback case
             // to prevent a deadlock.
             goto fallback;
         conn->to << wopQueryMissing << targets;
-        conn->processStderr();
+        conn.processStderr();
         willBuild = readStorePaths<PathSet>(*this, conn->from);
         willSubstitute = readStorePaths<PathSet>(*this, conn->from);
         unknown = readStorePaths<PathSet>(*this, conn->from);
@@ -635,7 +689,14 @@ void RemoteStore::queryMissing(const PathSet & targets,
 
 void RemoteStore::connect()
 {
+    auto conn(getConnection());
+}
+
+
+unsigned int RemoteStore::getProtocol()
+{
     auto conn(connections->get());
+    return conn->daemonVersion;
 }
 
 
@@ -672,7 +733,7 @@ static Logger::Fields readFields(Source & from)
 }
 
 
-void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
+std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * source)
 {
     to.flush();
 
@@ -697,7 +758,7 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
         else if (msg == STDERR_ERROR) {
             string error = readString(from);
             unsigned int status = readInt(from);
-            throw Error(status, error);
+            return std::make_exception_ptr(Error(status, error));
         }
 
         else if (msg == STDERR_NEXT)
@@ -731,6 +792,8 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
         else
             throw Error("got unknown message type %x from Nix daemon", msg);
     }
+
+    return nullptr;
 }
 
 static std::string uriScheme = "unix://";