diff options
author | Eelco Dolstra <edolstra@gmail.com> | 2017-08-28T12·17+0200 |
---|---|---|
committer | Eelco Dolstra <edolstra@gmail.com> | 2017-08-28T12·17+0200 |
commit | 8fff3e7bb50377e48fb9c672e6551abae0fdf03d (patch) | |
tree | 6e5351a7b62c6ee24d6d85c4f02fc867d6327d08 | |
parent | 94a0548dc420e23c64b18d36ea8d50a2afbce6fb (diff) |
Make TunnelLogger thread-safe
Now that we use threads in lots of places, it's possible for TunnelLogger::log() to be called asynchronously from other threads than the main loop. So we need to ensure that STDERR_NEXT messages don't clobber other messages.
-rw-r--r-- | src/nix-daemon/nix-daemon.cc | 238 |
1 files changed, 129 insertions, 109 deletions
diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc index 7e6f3aa25b5e..3237333f4d06 100644 --- a/src/nix-daemon/nix-daemon.cc +++ b/src/nix-daemon/nix-daemon.cc @@ -8,6 +8,7 @@ #include "globals.hh" #include "monitor-fd.hh" #include "derivations.hh" +#include "finally.hh" #include <algorithm> @@ -54,57 +55,73 @@ static ssize_t splice(int fd_in, void *off_in, int fd_out, void *off_out, size_t static FdSource from(STDIN_FILENO); static FdSink to(STDOUT_FILENO); -static bool canSendStderr; - -static Logger * defaultLogger; - /* Logger that forwards log messages to the client, *if* we're in a state where the protocol allows it (i.e., when canSendStderr is true). */ -class TunnelLogger : public Logger +struct TunnelLogger : public Logger { + struct State + { + bool canSendStderr = false; + std::vector<std::string> pendingMsgs; + }; + + Sync<State> state_; + void log(Verbosity lvl, const FormatOrString & fs) override { if (lvl > verbosity) return; - if (canSendStderr) { + auto state(state_.lock()); + + if (state->canSendStderr) { try { to << STDERR_NEXT << (fs.s + "\n"); to.flush(); } catch (...) { /* Write failed; that means that the other side is gone. */ - canSendStderr = false; + state->canSendStderr = false; throw; } } else - defaultLogger->log(lvl, fs); + state->pendingMsgs.push_back(fs.s); } -}; + /* startWork() means that we're starting an operation for which we + want to send out stderr to the client. */ + void startWork() + { + std::vector<std::string> pendingMsgs; -/* startWork() means that we're starting an operation for which we - want to send out stderr to the client. */ -static void startWork() -{ - canSendStderr = true; -} + auto state(state_.lock()); + state->canSendStderr = true; + for (auto & msg : state->pendingMsgs) + to << STDERR_NEXT << (msg + "\n"); -/* stopWork() means that we're done; stop sending stderr to the - client. */ -static void stopWork(bool success = true, const string & msg = "", unsigned int status = 0) -{ - canSendStderr = false; + state->pendingMsgs.clear(); - if (success) - to << STDERR_LAST; - else { - to << STDERR_ERROR << msg; - if (status != 0) to << status; + to.flush(); } -} + + /* stopWork() means that we're done; stop sending stderr to the + client. */ + void stopWork(bool success = true, const string & msg = "", unsigned int status = 0) + { + auto state(state_.lock()); + + state->canSendStderr = false; + + if (success) + to << STDERR_LAST; + else { + to << STDERR_ERROR << msg; + if (status != 0) to << status; + } + } +}; struct TunnelSink : Sink @@ -160,7 +177,8 @@ struct RetrieveRegularNARSink : ParseSink }; -static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVersion, +static void performOp(TunnelLogger * logger, ref<LocalStore> store, + bool trusted, unsigned int clientVersion, Source & from, Sink & to, unsigned int op) { switch (op) { @@ -172,46 +190,46 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe that the 'Error' exception handler doesn't close the connection. */ Path path = readString(from); - startWork(); + logger->startWork(); store->assertStorePath(path); bool result = store->isValidPath(path); - stopWork(); + logger->stopWork(); to << result; break; } case wopQueryValidPaths: { PathSet paths = readStorePaths<PathSet>(*store, from); - startWork(); + logger->startWork(); PathSet res = store->queryValidPaths(paths); - stopWork(); + logger->stopWork(); to << res; break; } case wopHasSubstitutes: { Path path = readStorePath(*store, from); - startWork(); + logger->startWork(); PathSet res = store->querySubstitutablePaths({path}); - stopWork(); + logger->stopWork(); to << (res.find(path) != res.end()); break; } case wopQuerySubstitutablePaths: { PathSet paths = readStorePaths<PathSet>(*store, from); - startWork(); + logger->startWork(); PathSet res = store->querySubstitutablePaths(paths); - stopWork(); + logger->stopWork(); to << res; break; } case wopQueryPathHash: { Path path = readStorePath(*store, from); - startWork(); + logger->startWork(); auto hash = store->queryPathInfo(path)->narHash; - stopWork(); + logger->stopWork(); to << hash.to_string(Base16, false); break; } @@ -221,7 +239,7 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe case wopQueryValidDerivers: case wopQueryDerivationOutputs: { Path path = readStorePath(*store, from); - startWork(); + logger->startWork(); PathSet paths; if (op == wopQueryReferences) paths = store->queryPathInfo(path)->references; @@ -230,35 +248,35 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe else if (op == wopQueryValidDerivers) paths = store->queryValidDerivers(path); else paths = store->queryDerivationOutputs(path); - stopWork(); + logger->stopWork(); to << paths; break; } case wopQueryDerivationOutputNames: { Path path = readStorePath(*store, from); - startWork(); + logger->startWork(); StringSet names; names = store->queryDerivationOutputNames(path); - stopWork(); + logger->stopWork(); to << names; break; } case wopQueryDeriver: { Path path = readStorePath(*store, from); - startWork(); + logger->startWork(); auto deriver = store->queryPathInfo(path)->deriver; - stopWork(); + logger->stopWork(); to << deriver; break; } case wopQueryPathFromHashPart: { string hashPart = readString(from); - startWork(); + logger->startWork(); Path path = store->queryPathFromHashPart(hashPart); - stopWork(); + logger->stopWork(); to << path; break; } @@ -286,10 +304,10 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe } else parseDump(savedRegular, from); - startWork(); + logger->startWork(); if (!savedRegular.regular) throw Error("regular file expected"); Path path = store->addToStoreFromDump(recursive ? *savedNAR.data : savedRegular.s, baseName, recursive, hashAlgo); - stopWork(); + logger->stopWork(); to << path; break; @@ -299,9 +317,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe string suffix = readString(from); string s = readString(from); PathSet refs = readStorePaths<PathSet>(*store, from); - startWork(); + logger->startWork(); Path path = store->addTextToStore(suffix, s, refs, NoRepair); - stopWork(); + logger->stopWork(); to << path; break; } @@ -309,20 +327,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe case wopExportPath: { Path path = readStorePath(*store, from); readInt(from); // obsolete - startWork(); + logger->startWork(); TunnelSink sink(to); store->exportPath(path, sink); - stopWork(); + logger->stopWork(); to << 1; break; } case wopImportPaths: { - startWork(); + logger->startWork(); TunnelSource source(from); Paths paths = store->importPaths(source, nullptr, trusted ? NoCheckSigs : CheckSigs); - stopWork(); + logger->stopWork(); to << paths; break; } @@ -338,9 +356,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe if (mode == bmRepair && !trusted) throw Error("repairing is not supported when building through the Nix daemon"); } - startWork(); + logger->startWork(); store->buildPaths(drvs, mode); - stopWork(); + logger->stopWork(); to << 1; break; } @@ -350,54 +368,54 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe BasicDerivation drv; readDerivation(from, *store, drv); BuildMode buildMode = (BuildMode) readInt(from); - startWork(); + logger->startWork(); if (!trusted) throw Error("you are not privileged to build derivations"); auto res = store->buildDerivation(drvPath, drv, buildMode); - stopWork(); + logger->stopWork(); to << res.status << res.errorMsg; break; } case wopEnsurePath: { Path path = readStorePath(*store, from); - startWork(); + logger->startWork(); store->ensurePath(path); - stopWork(); + logger->stopWork(); to << 1; break; } case wopAddTempRoot: { Path path = readStorePath(*store, from); - startWork(); + logger->startWork(); store->addTempRoot(path); - stopWork(); + logger->stopWork(); to << 1; break; } case wopAddIndirectRoot: { Path path = absPath(readString(from)); - startWork(); + logger->startWork(); store->addIndirectRoot(path); - stopWork(); + logger->stopWork(); to << 1; break; } case wopSyncWithGC: { - startWork(); + logger->startWork(); store->syncWithGC(); - stopWork(); + logger->stopWork(); to << 1; break; } case wopFindRoots: { - startWork(); + logger->startWork(); Roots roots = store->findRoots(); - stopWork(); + logger->stopWork(); to << roots.size(); for (auto & i : roots) to << i.first << i.second; @@ -416,11 +434,11 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe GCResults results; - startWork(); + logger->startWork(); if (options.ignoreLiveness) throw Error("you are not allowed to ignore liveness"); store->collectGarbage(options, results); - stopWork(); + logger->stopWork(); to << results.paths << results.bytesFreed << 0 /* obsolete */; @@ -451,7 +469,7 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe } } - startWork(); + logger->startWork(); for (auto & i : overrides) { auto & name(i.first); @@ -492,16 +510,16 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe } } - stopWork(); + logger->stopWork(); break; } case wopQuerySubstitutablePathInfo: { Path path = absPath(readString(from)); - startWork(); + logger->startWork(); SubstitutablePathInfos infos; store->querySubstitutablePathInfos({path}, infos); - stopWork(); + logger->stopWork(); SubstitutablePathInfos::iterator i = infos.find(path); if (i == infos.end()) to << 0; @@ -513,10 +531,10 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe case wopQuerySubstitutablePathInfos: { PathSet paths = readStorePaths<PathSet>(*store, from); - startWork(); + logger->startWork(); SubstitutablePathInfos infos; store->querySubstitutablePathInfos(paths, infos); - stopWork(); + logger->stopWork(); to << infos.size(); for (auto & i : infos) { to << i.first << i.second.deriver << i.second.references @@ -526,9 +544,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe } case wopQueryAllValidPaths: { - startWork(); + logger->startWork(); PathSet paths = store->queryAllValidPaths(); - stopWork(); + logger->stopWork(); to << paths; break; } @@ -536,13 +554,13 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe case wopQueryPathInfo: { Path path = readStorePath(*store, from); std::shared_ptr<const ValidPathInfo> info; - startWork(); + logger->startWork(); try { info = store->queryPathInfo(path); } catch (InvalidPath &) { if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw; } - stopWork(); + logger->stopWork(); if (info) { if (GET_PROTOCOL_MINOR(clientVersion) >= 17) to << 1; @@ -561,20 +579,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe } case wopOptimiseStore: - startWork(); + logger->startWork(); store->optimiseStore(); - stopWork(); + logger->stopWork(); to << 1; break; case wopVerifyStore: { bool checkContents, repair; from >> checkContents >> repair; - startWork(); + logger->startWork(); if (repair && !trusted) throw Error("you are not privileged to repair paths"); bool errors = store->verifyStore(checkContents, (RepairFlag) repair); - stopWork(); + logger->stopWork(); to << errors; break; } @@ -582,19 +600,19 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe case wopAddSignatures: { Path path = readStorePath(*store, from); StringSet sigs = readStrings<StringSet>(from); - startWork(); + logger->startWork(); if (!trusted) throw Error("you are not privileged to add signatures"); store->addSignatures(path, sigs); - stopWork(); + logger->stopWork(); to << 1; break; } case wopNarFromPath: { auto path = readStorePath(*store, from); - startWork(); - stopWork(); + logger->startWork(); + logger->stopWork(); dumpPath(path, to); break; } @@ -619,20 +637,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe TeeSink tee(from); parseDump(tee, tee.source); - startWork(); + logger->startWork(); store->addToStore(info, tee.source.data, (RepairFlag) repair, dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr); - stopWork(); + logger->stopWork(); break; } case wopQueryMissing: { PathSet targets = readStorePaths<PathSet>(*store, from); - startWork(); + logger->startWork(); PathSet willBuild, willSubstitute, unknown; unsigned long long downloadSize, narSize; store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize); - stopWork(); + logger->stopWork(); to << willBuild << willSubstitute << unknown << downloadSize << narSize; break; } @@ -647,9 +665,17 @@ static void processConnection(bool trusted) { MonitorFdHup monitor(from.fd); - canSendStderr = false; - defaultLogger = logger; - logger = new TunnelLogger(); + TunnelLogger tunnelLogger; + auto prevLogger = nix::logger; + logger = &tunnelLogger; + + unsigned int opCount = 0; + + Finally finally([&]() { + logger = prevLogger; + _isInterrupted = false; + debug("%d operations", opCount); + }); /* Exchange the greeting. */ unsigned int magic = readInt(from); @@ -667,7 +693,7 @@ static void processConnection(bool trusted) readInt(from); // obsolete reserveSpace /* Send startup error messages to the client. */ - startWork(); + tunnelLogger.startWork(); try { @@ -687,12 +713,10 @@ static void processConnection(bool trusted) params["path-info-cache-size"] = "0"; auto store = make_ref<LocalStore>(params); - stopWork(); + tunnelLogger.stopWork(); to.flush(); /* Process client requests. */ - unsigned int opCount = 0; - while (true) { WorkerOp op; try { @@ -706,32 +730,28 @@ static void processConnection(bool trusted) opCount++; try { - performOp(store, trusted, clientVersion, from, to, op); + performOp(&tunnelLogger, store, trusted, clientVersion, from, to, op); } catch (Error & e) { /* If we're not in a state where we can send replies, then something went wrong processing the input of the client. This can happen especially if I/O errors occur during addTextToStore() / importPath(). If that happens, just send the error message and exit. */ - bool errorAllowed = canSendStderr; - stopWork(false, e.msg(), e.status); + bool errorAllowed = tunnelLogger.state_.lock()->canSendStderr; + tunnelLogger.stopWork(false, e.msg(), e.status); if (!errorAllowed) throw; } catch (std::bad_alloc & e) { - stopWork(false, "Nix daemon out of memory", 1); + tunnelLogger.stopWork(false, "Nix daemon out of memory", 1); throw; } to.flush(); - assert(!canSendStderr); + assert(!tunnelLogger.state_.lock()->canSendStderr); }; - canSendStderr = false; - _isInterrupted = false; - debug(format("%1% operations") % opCount); - - } catch (Error & e) { - stopWork(false, e.msg(), 1); + } catch (std::exception & e) { + tunnelLogger.stopWork(false, e.what(), 1); to.flush(); return; } |