diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/libstore/build.cc | 6 | ||||
-rw-r--r-- | src/libstore/download.cc | 2 | ||||
-rw-r--r-- | src/libstore/misc.cc | 266 | ||||
-rw-r--r-- | src/libstore/nar-info.cc | 4 | ||||
-rw-r--r-- | src/libutil/hash.cc | 1 | ||||
-rw-r--r-- | src/libutil/thread-pool.cc | 6 | ||||
-rw-r--r-- | src/libutil/thread-pool.hh | 2 |
7 files changed, 167 insertions, 120 deletions
diff --git a/src/libstore/build.cc b/src/libstore/build.cc index 0a4df8ca7af4..af2c908c30db 100644 --- a/src/libstore/build.cc +++ b/src/libstore/build.cc @@ -1075,8 +1075,10 @@ void DerivationGoal::outputsSubstituted() { trace("all outputs substituted (maybe)"); - if (nrFailed > 0 && nrFailed > nrNoSubstituters + nrIncompleteClosure && !settings.tryFallback) - throw Error(format("some substitutes for the outputs of derivation ‘%1%’ failed (usually happens due to networking issues); try ‘--fallback’ to build derivation from source ") % drvPath); + if (nrFailed > 0 && nrFailed > nrNoSubstituters + nrIncompleteClosure && !settings.tryFallback) { + done(BuildResult::TransientFailure, (format("some substitutes for the outputs of derivation ‘%1%’ failed (usually happens due to networking issues); try ‘--fallback’ to build derivation from source ") % drvPath).str()); + return; + } /* If the substitutes form an incomplete closure, then we should build the dependencies of this derivation, but after that, we diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 6e39330e40d9..04a2b325c651 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -157,7 +157,7 @@ struct CurlDownloader : public Downloader curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0); } - data->clear(); + data = make_ref<std::string>(); if (requestHeaders) { curl_slist_free_all(requestHeaders); diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index 4bb043aaf17e..da654ba0d2c3 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -2,6 +2,7 @@ #include "globals.hh" #include "local-store.hh" #include "store-api.hh" +#include "thread-pool.hh" namespace nix { @@ -10,157 +11,196 @@ namespace nix { void Store::computeFSClosure(const Path & path, PathSet & paths, bool flipDirection, bool includeOutputs, bool includeDerivers) { - if (paths.find(path) != paths.end()) return; - paths.insert(path); + ThreadPool pool; - PathSet edges; + Sync<bool> state_; - if (flipDirection) { - queryReferrers(path, edges); + std::function<void(Path)> doPath; - if (includeOutputs) { - PathSet derivers = queryValidDerivers(path); - for (auto & i : derivers) - edges.insert(i); + doPath = [&](const Path & path) { + { + auto state(state_.lock()); + if (paths.count(path)) return; + paths.insert(path); } - if (includeDerivers && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i) && queryPathInfo(i)->deriver == path) - edges.insert(i); - } - - } else { auto info = queryPathInfo(path); - edges = info->references; - if (includeOutputs && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i)) edges.insert(i); + if (flipDirection) { + + PathSet referrers; + queryReferrers(path, referrers); + for (auto & ref : referrers) + if (ref != path) + pool.enqueue(std::bind(doPath, ref)); + + if (includeOutputs) { + PathSet derivers = queryValidDerivers(path); + for (auto & i : derivers) + pool.enqueue(std::bind(doPath, i)); + } + + if (includeDerivers && isDerivation(path)) { + PathSet outputs = queryDerivationOutputs(path); + for (auto & i : outputs) + if (isValidPath(i) && queryPathInfo(i)->deriver == path) + pool.enqueue(std::bind(doPath, i)); + } + + } else { + + for (auto & ref : info->references) + if (ref != path) + pool.enqueue(std::bind(doPath, ref)); + + if (includeOutputs && isDerivation(path)) { + PathSet outputs = queryDerivationOutputs(path); + for (auto & i : outputs) + if (isValidPath(i)) pool.enqueue(std::bind(doPath, i)); + } + + if (includeDerivers && isValidPath(info->deriver)) + pool.enqueue(std::bind(doPath, info->deriver)); + } + }; - if (includeDerivers && isValidPath(info->deriver)) - edges.insert(info->deriver); - } + pool.enqueue(std::bind(doPath, path)); - for (auto & i : edges) - computeFSClosure(i, paths, flipDirection, includeOutputs, includeDerivers); + pool.process(); } void Store::queryMissing(const PathSet & targets, - PathSet & willBuild, PathSet & willSubstitute, PathSet & unknown, - unsigned long long & downloadSize, unsigned long long & narSize) + PathSet & willBuild_, PathSet & willSubstitute_, PathSet & unknown_, + unsigned long long & downloadSize_, unsigned long long & narSize_) { - downloadSize = narSize = 0; + downloadSize_ = narSize_ = 0; - PathSet todo(targets.begin(), targets.end()), done; + ThreadPool pool; - /* Getting substitute info has high latency when using the binary - cache substituter. Thus it's essential to do substitute - queries in parallel as much as possible. To accomplish this - we do the following: - - - For all paths still to be processed (‘todo’), we add all - paths for which we need info to the set ‘query’. For an - unbuilt derivation this is the output paths; otherwise, it's - the path itself. - - - We get info about all paths in ‘query’ in parallel. + struct State + { + PathSet done; + PathSet & unknown, & willSubstitute, & willBuild; + unsigned long long & downloadSize; + unsigned long long & narSize; + }; - - We process the results and add new items to ‘todo’ if - necessary. E.g. if a path is substitutable, then we need to - get info on its references. + struct DrvState + { + size_t left; + bool done = false; + PathSet outPaths; + DrvState(size_t left) : left(left) { } + }; - - Repeat until ‘todo’ is empty. - */ + Sync<State> state_(State{PathSet(), unknown_, willSubstitute_, willBuild_, downloadSize_, narSize_}); - while (!todo.empty()) { + std::function<void(Path)> doPath; - PathSet query, todoDrv, todoNonDrv; + auto mustBuildDrv = [&](const Path & drvPath, const Derivation & drv) { + { + auto state(state_.lock()); + state->willBuild.insert(drvPath); + } - for (auto & i : todo) { - if (done.find(i) != done.end()) continue; - done.insert(i); + for (auto & i : drv.inputDrvs) + pool.enqueue(std::bind(doPath, makeDrvPathWithOutputs(i.first, i.second))); + }; - DrvPathWithOutputs i2 = parseDrvPathWithOutputs(i); + auto checkOutput = [&]( + const Path & drvPath, ref<Derivation> drv, const Path & outPath, ref<Sync<DrvState>> drvState_) + { + if (drvState_->lock()->done) return; - if (isDerivation(i2.first)) { - if (!isValidPath(i2.first)) { - // FIXME: we could try to substitute p. - unknown.insert(i); - continue; + SubstitutablePathInfos infos; + querySubstitutablePathInfos({outPath}, infos); + + if (infos.empty()) { + drvState_->lock()->done = true; + mustBuildDrv(drvPath, *drv); + } else { + { + auto drvState(drvState_->lock()); + if (drvState->done) return; + assert(drvState->left); + drvState->left--; + drvState->outPaths.insert(outPath); + if (!drvState->left) { + for (auto & path : drvState->outPaths) + pool.enqueue(std::bind(doPath, path)); } - Derivation drv = derivationFromPath(i2.first); - - PathSet invalid; - for (auto & j : drv.outputs) - if (wantOutput(j.first, i2.second) - && !isValidPath(j.second.path)) - invalid.insert(j.second.path); - if (invalid.empty()) continue; - - todoDrv.insert(i); - if (settings.useSubstitutes && drv.substitutesAllowed()) - query.insert(invalid.begin(), invalid.end()); - } - - else { - if (isValidPath(i)) continue; - query.insert(i); - todoNonDrv.insert(i); } } + }; - todo.clear(); + doPath = [&](const Path & path) { - SubstitutablePathInfos infos; - querySubstitutablePathInfos(query, infos); + { + auto state(state_.lock()); + if (state->done.count(path)) return; + state->done.insert(path); + } + + DrvPathWithOutputs i2 = parseDrvPathWithOutputs(path); - for (auto & i : todoDrv) { - DrvPathWithOutputs i2 = parseDrvPathWithOutputs(i); + if (isDerivation(i2.first)) { + if (!isValidPath(i2.first)) { + // FIXME: we could try to substitute the derivation. + auto state(state_.lock()); + state->unknown.insert(path); + return; + } - // FIXME: cache this Derivation drv = derivationFromPath(i2.first); - PathSet outputs; - bool mustBuild = false; + PathSet invalid; + for (auto & j : drv.outputs) + if (wantOutput(j.first, i2.second) + && !isValidPath(j.second.path)) + invalid.insert(j.second.path); + if (invalid.empty()) return; + if (settings.useSubstitutes && drv.substitutesAllowed()) { - for (auto & j : drv.outputs) { - if (!wantOutput(j.first, i2.second)) continue; - if (!isValidPath(j.second.path)) { - if (infos.find(j.second.path) == infos.end()) - mustBuild = true; - else - outputs.insert(j.second.path); - } - } + auto drvState = make_ref<Sync<DrvState>>(DrvState(invalid.size())); + for (auto & output : invalid) + pool.enqueue(std::bind(checkOutput, i2.first, make_ref<Derivation>(drv), output, drvState)); } else - mustBuild = true; + mustBuildDrv(i2.first, drv); - if (mustBuild) { - willBuild.insert(i2.first); - todo.insert(drv.inputSrcs.begin(), drv.inputSrcs.end()); - for (auto & j : drv.inputDrvs) - todo.insert(makeDrvPathWithOutputs(j.first, j.second)); - } else - todoNonDrv.insert(outputs.begin(), outputs.end()); - } + } else { - for (auto & i : todoNonDrv) { - done.insert(i); - SubstitutablePathInfos::iterator info = infos.find(i); - if (info != infos.end()) { - willSubstitute.insert(i); - downloadSize += info->second.downloadSize; - narSize += info->second.narSize; - todo.insert(info->second.references.begin(), info->second.references.end()); - } else - unknown.insert(i); + if (isValidPath(path)) return; + + SubstitutablePathInfos infos; + querySubstitutablePathInfos({path}, infos); + + if (infos.empty()) { + auto state(state_.lock()); + state->unknown.insert(path); + return; + } + + auto info = infos.find(path); + assert(info != infos.end()); + + { + auto state(state_.lock()); + state->willSubstitute.insert(path); + state->downloadSize += info->second.downloadSize; + state->narSize += info->second.narSize; + } + + for (auto & ref : info->second.references) + pool.enqueue(std::bind(doPath, ref)); } - } + }; + + for (auto & path : targets) + pool.enqueue(std::bind(doPath, path)); + + pool.process(); } diff --git a/src/libstore/nar-info.cc b/src/libstore/nar-info.cc index 1aacca84b800..b0a8d77c2fba 100644 --- a/src/libstore/nar-info.cc +++ b/src/libstore/nar-info.cc @@ -6,7 +6,7 @@ namespace nix { NarInfo::NarInfo(const Store & store, const std::string & s, const std::string & whence) { auto corrupt = [&]() { - throw Error("NAR info file ‘%1%’ is corrupt"); + throw Error(format("NAR info file ‘%1%’ is corrupt") % whence); }; auto parseHashField = [&](const string & s) { @@ -73,7 +73,7 @@ NarInfo::NarInfo(const Store & store, const std::string & s, const std::string & if (compression == "") compression = "bzip2"; - if (path.empty() || url.empty()) corrupt(); + if (path.empty() || url.empty() || narSize == 0 || !narHash) corrupt(); } std::string NarInfo::to_string() const diff --git a/src/libutil/hash.cc b/src/libutil/hash.cc index fa42587777cd..81aced0fde16 100644 --- a/src/libutil/hash.cc +++ b/src/libutil/hash.cc @@ -121,6 +121,7 @@ const string base32Chars = "0123456789abcdfghijklmnpqrsvwxyz"; string printHash32(const Hash & hash) { + assert(hash.hashSize); size_t len = hash.base32Len(); assert(len); diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index 32363ecf0098..696ecd6c38c8 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -36,7 +36,8 @@ ThreadPool::~ThreadPool() void ThreadPool::enqueue(const work_t & t) { auto state(state_.lock()); - assert(!state->quit); + if (state->quit) + throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down"); state->left.push(t); if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) state->workers.emplace_back(&ThreadPool::workerEntry, this); @@ -84,7 +85,8 @@ void ThreadPool::workerEntry() } catch (std::exception & e) { auto state(state_.lock()); if (state->exception) { - if (!dynamic_cast<Interrupted*>(&e)) + if (!dynamic_cast<Interrupted*>(&e) && + !dynamic_cast<ThreadPoolShutDown*>(&e)) printMsg(lvlError, format("error: %s") % e.what()); } else { state->exception = std::current_exception(); diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 78b63467d62e..b64dc52d473e 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -10,6 +10,8 @@ namespace nix { +MakeError(ThreadPoolShutDown, Error) + /* A simple thread pool that executes a queue of work items (lambdas). */ class ThreadPool |