about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2017-08-28T12·17+0200
committerEelco Dolstra <edolstra@gmail.com>2017-08-28T12·17+0200
commit8fff3e7bb50377e48fb9c672e6551abae0fdf03d (patch)
tree6e5351a7b62c6ee24d6d85c4f02fc867d6327d08
parent94a0548dc420e23c64b18d36ea8d50a2afbce6fb (diff)
Make TunnelLogger thread-safe
Now that we use threads in lots of places, it's possible for
TunnelLogger::log() to be called asynchronously from other threads
than the main loop. So we need to ensure that STDERR_NEXT messages
don't clobber other messages.
-rw-r--r--src/nix-daemon/nix-daemon.cc238
1 files changed, 129 insertions, 109 deletions
diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc
index 7e6f3aa25b5e..3237333f4d06 100644
--- a/src/nix-daemon/nix-daemon.cc
+++ b/src/nix-daemon/nix-daemon.cc
@@ -8,6 +8,7 @@
 #include "globals.hh"
 #include "monitor-fd.hh"
 #include "derivations.hh"
+#include "finally.hh"
 
 #include <algorithm>
 
@@ -54,57 +55,73 @@ static ssize_t splice(int fd_in, void *off_in, int fd_out, void *off_out, size_t
 static FdSource from(STDIN_FILENO);
 static FdSink to(STDOUT_FILENO);
 
-static bool canSendStderr;
-
-static Logger * defaultLogger;
-
 
 /* Logger that forwards log messages to the client, *if* we're in a
    state where the protocol allows it (i.e., when canSendStderr is
    true). */
-class TunnelLogger : public Logger
+struct TunnelLogger : public Logger
 {
+    struct State
+    {
+        bool canSendStderr = false;
+        std::vector<std::string> pendingMsgs;
+    };
+
+    Sync<State> state_;
+
     void log(Verbosity lvl, const FormatOrString & fs) override
     {
         if (lvl > verbosity) return;
 
-        if (canSendStderr) {
+        auto state(state_.lock());
+
+        if (state->canSendStderr) {
             try {
                 to << STDERR_NEXT << (fs.s + "\n");
                 to.flush();
             } catch (...) {
                 /* Write failed; that means that the other side is
                    gone. */
-                canSendStderr = false;
+                state->canSendStderr = false;
                 throw;
             }
         } else
-            defaultLogger->log(lvl, fs);
+            state->pendingMsgs.push_back(fs.s);
     }
-};
 
+    /* startWork() means that we're starting an operation for which we
+      want to send out stderr to the client. */
+    void startWork()
+    {
+        std::vector<std::string> pendingMsgs;
 
-/* startWork() means that we're starting an operation for which we
-   want to send out stderr to the client. */
-static void startWork()
-{
-    canSendStderr = true;
-}
+        auto state(state_.lock());
+        state->canSendStderr = true;
 
+        for (auto & msg : state->pendingMsgs)
+            to << STDERR_NEXT << (msg + "\n");
 
-/* stopWork() means that we're done; stop sending stderr to the
-   client. */
-static void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
-{
-    canSendStderr = false;
+        state->pendingMsgs.clear();
 
-    if (success)
-        to << STDERR_LAST;
-    else {
-        to << STDERR_ERROR << msg;
-        if (status != 0) to << status;
+        to.flush();
     }
-}
+
+    /* stopWork() means that we're done; stop sending stderr to the
+       client. */
+    void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
+    {
+        auto state(state_.lock());
+
+        state->canSendStderr = false;
+
+        if (success)
+            to << STDERR_LAST;
+        else {
+            to << STDERR_ERROR << msg;
+            if (status != 0) to << status;
+        }
+    }
+};
 
 
 struct TunnelSink : Sink
@@ -160,7 +177,8 @@ struct RetrieveRegularNARSink : ParseSink
 };
 
 
-static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVersion,
+static void performOp(TunnelLogger * logger, ref<LocalStore> store,
+    bool trusted, unsigned int clientVersion,
     Source & from, Sink & to, unsigned int op)
 {
     switch (op) {
@@ -172,46 +190,46 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
            that the 'Error' exception handler doesn't close the
            connection.  */
         Path path = readString(from);
-        startWork();
+        logger->startWork();
         store->assertStorePath(path);
         bool result = store->isValidPath(path);
-        stopWork();
+        logger->stopWork();
         to << result;
         break;
     }
 
     case wopQueryValidPaths: {
         PathSet paths = readStorePaths<PathSet>(*store, from);
-        startWork();
+        logger->startWork();
         PathSet res = store->queryValidPaths(paths);
-        stopWork();
+        logger->stopWork();
         to << res;
         break;
     }
 
     case wopHasSubstitutes: {
         Path path = readStorePath(*store, from);
-        startWork();
+        logger->startWork();
         PathSet res = store->querySubstitutablePaths({path});
-        stopWork();
+        logger->stopWork();
         to << (res.find(path) != res.end());
         break;
     }
 
     case wopQuerySubstitutablePaths: {
         PathSet paths = readStorePaths<PathSet>(*store, from);
-        startWork();
+        logger->startWork();
         PathSet res = store->querySubstitutablePaths(paths);
-        stopWork();
+        logger->stopWork();
         to << res;
         break;
     }
 
     case wopQueryPathHash: {
         Path path = readStorePath(*store, from);
-        startWork();
+        logger->startWork();
         auto hash = store->queryPathInfo(path)->narHash;
-        stopWork();
+        logger->stopWork();
         to << hash.to_string(Base16, false);
         break;
     }
@@ -221,7 +239,7 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
     case wopQueryValidDerivers:
     case wopQueryDerivationOutputs: {
         Path path = readStorePath(*store, from);
-        startWork();
+        logger->startWork();
         PathSet paths;
         if (op == wopQueryReferences)
             paths = store->queryPathInfo(path)->references;
@@ -230,35 +248,35 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
         else if (op == wopQueryValidDerivers)
             paths = store->queryValidDerivers(path);
         else paths = store->queryDerivationOutputs(path);
-        stopWork();
+        logger->stopWork();
         to << paths;
         break;
     }
 
     case wopQueryDerivationOutputNames: {
         Path path = readStorePath(*store, from);
-        startWork();
+        logger->startWork();
         StringSet names;
         names = store->queryDerivationOutputNames(path);
-        stopWork();
+        logger->stopWork();
         to << names;
         break;
     }
 
     case wopQueryDeriver: {
         Path path = readStorePath(*store, from);
-        startWork();
+        logger->startWork();
         auto deriver = store->queryPathInfo(path)->deriver;
-        stopWork();
+        logger->stopWork();
         to << deriver;
         break;
     }
 
     case wopQueryPathFromHashPart: {
         string hashPart = readString(from);
-        startWork();
+        logger->startWork();
         Path path = store->queryPathFromHashPart(hashPart);
-        stopWork();
+        logger->stopWork();
         to << path;
         break;
     }
@@ -286,10 +304,10 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
         } else
             parseDump(savedRegular, from);
 
-        startWork();
+        logger->startWork();
         if (!savedRegular.regular) throw Error("regular file expected");
         Path path = store->addToStoreFromDump(recursive ? *savedNAR.data : savedRegular.s, baseName, recursive, hashAlgo);
-        stopWork();
+        logger->stopWork();
 
         to << path;
         break;
@@ -299,9 +317,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
         string suffix = readString(from);
         string s = readString(from);
         PathSet refs = readStorePaths<PathSet>(*store, from);
-        startWork();
+        logger->startWork();
         Path path = store->addTextToStore(suffix, s, refs, NoRepair);
-        stopWork();
+        logger->stopWork();
         to << path;
         break;
     }
@@ -309,20 +327,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
     case wopExportPath: {
         Path path = readStorePath(*store, from);
         readInt(from); // obsolete
-        startWork();
+        logger->startWork();
         TunnelSink sink(to);
         store->exportPath(path, sink);
-        stopWork();
+        logger->stopWork();
         to << 1;
         break;
     }
 
     case wopImportPaths: {
-        startWork();
+        logger->startWork();
         TunnelSource source(from);
         Paths paths = store->importPaths(source, nullptr,
             trusted ? NoCheckSigs : CheckSigs);
-        stopWork();
+        logger->stopWork();
         to << paths;
         break;
     }
@@ -338,9 +356,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
             if (mode == bmRepair && !trusted)
                 throw Error("repairing is not supported when building through the Nix daemon");
         }
-        startWork();
+        logger->startWork();
         store->buildPaths(drvs, mode);
-        stopWork();
+        logger->stopWork();
         to << 1;
         break;
     }
@@ -350,54 +368,54 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
         BasicDerivation drv;
         readDerivation(from, *store, drv);
         BuildMode buildMode = (BuildMode) readInt(from);
-        startWork();
+        logger->startWork();
         if (!trusted)
             throw Error("you are not privileged to build derivations");
         auto res = store->buildDerivation(drvPath, drv, buildMode);
-        stopWork();
+        logger->stopWork();
         to << res.status << res.errorMsg;
         break;
     }
 
     case wopEnsurePath: {
         Path path = readStorePath(*store, from);
-        startWork();
+        logger->startWork();
         store->ensurePath(path);
-        stopWork();
+        logger->stopWork();
         to << 1;
         break;
     }
 
     case wopAddTempRoot: {
         Path path = readStorePath(*store, from);
-        startWork();
+        logger->startWork();
         store->addTempRoot(path);
-        stopWork();
+        logger->stopWork();
         to << 1;
         break;
     }
 
     case wopAddIndirectRoot: {
         Path path = absPath(readString(from));
-        startWork();
+        logger->startWork();
         store->addIndirectRoot(path);
-        stopWork();
+        logger->stopWork();
         to << 1;
         break;
     }
 
     case wopSyncWithGC: {
-        startWork();
+        logger->startWork();
         store->syncWithGC();
-        stopWork();
+        logger->stopWork();
         to << 1;
         break;
     }
 
     case wopFindRoots: {
-        startWork();
+        logger->startWork();
         Roots roots = store->findRoots();
-        stopWork();
+        logger->stopWork();
         to << roots.size();
         for (auto & i : roots)
             to << i.first << i.second;
@@ -416,11 +434,11 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
 
         GCResults results;
 
-        startWork();
+        logger->startWork();
         if (options.ignoreLiveness)
             throw Error("you are not allowed to ignore liveness");
         store->collectGarbage(options, results);
-        stopWork();
+        logger->stopWork();
 
         to << results.paths << results.bytesFreed << 0 /* obsolete */;
 
@@ -451,7 +469,7 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
             }
         }
 
-        startWork();
+        logger->startWork();
 
         for (auto & i : overrides) {
             auto & name(i.first);
@@ -492,16 +510,16 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
             }
         }
 
-        stopWork();
+        logger->stopWork();
         break;
     }
 
     case wopQuerySubstitutablePathInfo: {
         Path path = absPath(readString(from));
-        startWork();
+        logger->startWork();
         SubstitutablePathInfos infos;
         store->querySubstitutablePathInfos({path}, infos);
-        stopWork();
+        logger->stopWork();
         SubstitutablePathInfos::iterator i = infos.find(path);
         if (i == infos.end())
             to << 0;
@@ -513,10 +531,10 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
 
     case wopQuerySubstitutablePathInfos: {
         PathSet paths = readStorePaths<PathSet>(*store, from);
-        startWork();
+        logger->startWork();
         SubstitutablePathInfos infos;
         store->querySubstitutablePathInfos(paths, infos);
-        stopWork();
+        logger->stopWork();
         to << infos.size();
         for (auto & i : infos) {
             to << i.first << i.second.deriver << i.second.references
@@ -526,9 +544,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
     }
 
     case wopQueryAllValidPaths: {
-        startWork();
+        logger->startWork();
         PathSet paths = store->queryAllValidPaths();
-        stopWork();
+        logger->stopWork();
         to << paths;
         break;
     }
@@ -536,13 +554,13 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
     case wopQueryPathInfo: {
         Path path = readStorePath(*store, from);
         std::shared_ptr<const ValidPathInfo> info;
-        startWork();
+        logger->startWork();
         try {
             info = store->queryPathInfo(path);
         } catch (InvalidPath &) {
             if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw;
         }
-        stopWork();
+        logger->stopWork();
         if (info) {
             if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
                 to << 1;
@@ -561,20 +579,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
     }
 
     case wopOptimiseStore:
-        startWork();
+        logger->startWork();
         store->optimiseStore();
-        stopWork();
+        logger->stopWork();
         to << 1;
         break;
 
     case wopVerifyStore: {
         bool checkContents, repair;
         from >> checkContents >> repair;
-        startWork();
+        logger->startWork();
         if (repair && !trusted)
             throw Error("you are not privileged to repair paths");
         bool errors = store->verifyStore(checkContents, (RepairFlag) repair);
-        stopWork();
+        logger->stopWork();
         to << errors;
         break;
     }
@@ -582,19 +600,19 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
     case wopAddSignatures: {
         Path path = readStorePath(*store, from);
         StringSet sigs = readStrings<StringSet>(from);
-        startWork();
+        logger->startWork();
         if (!trusted)
             throw Error("you are not privileged to add signatures");
         store->addSignatures(path, sigs);
-        stopWork();
+        logger->stopWork();
         to << 1;
         break;
     }
 
     case wopNarFromPath: {
         auto path = readStorePath(*store, from);
-        startWork();
-        stopWork();
+        logger->startWork();
+        logger->stopWork();
         dumpPath(path, to);
         break;
     }
@@ -619,20 +637,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
         TeeSink tee(from);
         parseDump(tee, tee.source);
 
-        startWork();
+        logger->startWork();
         store->addToStore(info, tee.source.data, (RepairFlag) repair,
             dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr);
-        stopWork();
+        logger->stopWork();
         break;
     }
 
     case wopQueryMissing: {
         PathSet targets = readStorePaths<PathSet>(*store, from);
-        startWork();
+        logger->startWork();
         PathSet willBuild, willSubstitute, unknown;
         unsigned long long downloadSize, narSize;
         store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
-        stopWork();
+        logger->stopWork();
         to << willBuild << willSubstitute << unknown << downloadSize << narSize;
         break;
     }
@@ -647,9 +665,17 @@ static void processConnection(bool trusted)
 {
     MonitorFdHup monitor(from.fd);
 
-    canSendStderr = false;
-    defaultLogger = logger;
-    logger = new TunnelLogger();
+    TunnelLogger tunnelLogger;
+    auto prevLogger = nix::logger;
+    logger = &tunnelLogger;
+
+    unsigned int opCount = 0;
+
+    Finally finally([&]() {
+        logger = prevLogger;
+        _isInterrupted = false;
+        debug("%d operations", opCount);
+    });
 
     /* Exchange the greeting. */
     unsigned int magic = readInt(from);
@@ -667,7 +693,7 @@ static void processConnection(bool trusted)
     readInt(from); // obsolete reserveSpace
 
     /* Send startup error messages to the client. */
-    startWork();
+    tunnelLogger.startWork();
 
     try {
 
@@ -687,12 +713,10 @@ static void processConnection(bool trusted)
         params["path-info-cache-size"] = "0";
         auto store = make_ref<LocalStore>(params);
 
-        stopWork();
+        tunnelLogger.stopWork();
         to.flush();
 
         /* Process client requests. */
-        unsigned int opCount = 0;
-
         while (true) {
             WorkerOp op;
             try {
@@ -706,32 +730,28 @@ static void processConnection(bool trusted)
             opCount++;
 
             try {
-                performOp(store, trusted, clientVersion, from, to, op);
+                performOp(&tunnelLogger, store, trusted, clientVersion, from, to, op);
             } catch (Error & e) {
                 /* If we're not in a state where we can send replies, then
                    something went wrong processing the input of the
                    client.  This can happen especially if I/O errors occur
                    during addTextToStore() / importPath().  If that
                    happens, just send the error message and exit. */
-                bool errorAllowed = canSendStderr;
-                stopWork(false, e.msg(), e.status);
+                bool errorAllowed = tunnelLogger.state_.lock()->canSendStderr;
+                tunnelLogger.stopWork(false, e.msg(), e.status);
                 if (!errorAllowed) throw;
             } catch (std::bad_alloc & e) {
-                stopWork(false, "Nix daemon out of memory", 1);
+                tunnelLogger.stopWork(false, "Nix daemon out of memory", 1);
                 throw;
             }
 
             to.flush();
 
-            assert(!canSendStderr);
+            assert(!tunnelLogger.state_.lock()->canSendStderr);
         };
 
-        canSendStderr = false;
-        _isInterrupted = false;
-        debug(format("%1% operations") % opCount);
-
-    } catch (Error & e) {
-        stopWork(false, e.msg(), 1);
+    } catch (std::exception & e) {
+        tunnelLogger.stopWork(false, e.what(), 1);
         to.flush();
         return;
     }