diff options
author | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-09-16T16·54+0200 |
---|---|---|
committer | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-09-16T16·54+0200 |
commit | 75989bdca773eedb8b8d1cc8a7675900358acd25 (patch) | |
tree | 2d1dce1431662f441cead67d8754e96eb4db6807 /src/libstore/misc.cc | |
parent | 054be5025762c5e1c7e853c4fa5d7eed8da1727f (diff) |
Make computeFSClosure() single-threaded again
The fact that queryPathInfo() is synchronous meant that we needed a thread for every concurrent binary cache lookup, even though they end up being handled by the same download thread. Requiring hundreds of threads is not a good idea. So now there is an asynchronous version of queryPathInfo() that takes a callback function to process the result. Similarly, enqueueDownload() now takes a callback rather than returning a future. Thus, a command like nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5 that returns 4941 paths now takes 1.87s using only 2 threads (the main thread and the downloader thread). (This is with a prewarmed CloudFront.)
Diffstat (limited to 'src/libstore/misc.cc')
-rw-r--r-- | src/libstore/misc.cc | 104 |
1 files changed, 64 insertions, 40 deletions
diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index da654ba0d2c3..0c2c49e5531f 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -8,66 +8,90 @@ namespace nix { -void Store::computeFSClosure(const Path & path, - PathSet & paths, bool flipDirection, bool includeOutputs, bool includeDerivers) +void Store::computeFSClosure(const Path & startPath, + PathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) { - ThreadPool pool; + struct State + { + size_t pending; + PathSet & paths; + std::exception_ptr exc; + }; - Sync<bool> state_; + Sync<State> state_(State{0, paths_, 0}); - std::function<void(Path)> doPath; + std::function<void(const Path &)> enqueue; - doPath = [&](const Path & path) { + std::condition_variable done; + + enqueue = [&](const Path & path) -> void { { auto state(state_.lock()); - if (paths.count(path)) return; - paths.insert(path); + if (state->exc) return; + if (state->paths.count(path)) return; + state->paths.insert(path); + state->pending++; } - auto info = queryPathInfo(path); + queryPathInfo(path, + [&, path](ref<ValidPathInfo> info) { + // FIXME: calls to isValidPath() should be async - if (flipDirection) { + if (flipDirection) { - PathSet referrers; - queryReferrers(path, referrers); - for (auto & ref : referrers) - if (ref != path) - pool.enqueue(std::bind(doPath, ref)); + PathSet referrers; + queryReferrers(path, referrers); + for (auto & ref : referrers) + if (ref != path) + enqueue(ref); - if (includeOutputs) { - PathSet derivers = queryValidDerivers(path); - for (auto & i : derivers) - pool.enqueue(std::bind(doPath, i)); - } + if (includeOutputs) + for (auto & i : queryValidDerivers(path)) + enqueue(i); - if (includeDerivers && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i) && queryPathInfo(i)->deriver == path) - pool.enqueue(std::bind(doPath, i)); - } + if (includeDerivers && isDerivation(path)) + for (auto & i : queryDerivationOutputs(path)) + if (isValidPath(i) && queryPathInfo(i)->deriver == path) + enqueue(i); - } else { + } else { - for (auto & ref : info->references) - if (ref != path) - pool.enqueue(std::bind(doPath, ref)); + for (auto & ref : info->references) + if (ref != path) + enqueue(ref); - if (includeOutputs && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i)) pool.enqueue(std::bind(doPath, i)); - } + if (includeOutputs && isDerivation(path)) + for (auto & i : queryDerivationOutputs(path)) + if (isValidPath(i)) enqueue(i); - if (includeDerivers && isValidPath(info->deriver)) - pool.enqueue(std::bind(doPath, info->deriver)); + if (includeDerivers && isValidPath(info->deriver)) + enqueue(info->deriver); - } + } + + { + auto state(state_.lock()); + assert(state->pending); + if (!--state->pending) done.notify_one(); + } + + }, + + [&, path](std::exception_ptr exc) { + auto state(state_.lock()); + if (!state->exc) state->exc = exc; + assert(state->pending); + if (!--state->pending) done.notify_one(); + }); }; - pool.enqueue(std::bind(doPath, path)); + enqueue(startPath); - pool.process(); + { + auto state(state_.lock()); + while (state->pending) state.wait(done); + if (state->exc) std::rethrow_exception(state->exc); + } } |