diff options
Diffstat (limited to 'src/libutil')
-rw-r--r-- | src/libutil/local.mk | 2 | ||||
-rw-r--r-- | src/libutil/sync.hh | 6 | ||||
-rw-r--r-- | src/libutil/thread-pool.cc | 82 | ||||
-rw-r--r-- | src/libutil/thread-pool.hh | 52 | ||||
-rw-r--r-- | src/libutil/util.cc | 8 | ||||
-rw-r--r-- | src/libutil/util.hh | 4 |
6 files changed, 146 insertions, 8 deletions
diff --git a/src/libutil/local.mk b/src/libutil/local.mk index 4dae3305433f..2e5d2672e5f0 100644 --- a/src/libutil/local.mk +++ b/src/libutil/local.mk @@ -6,6 +6,6 @@ libutil_DIR := $(d) libutil_SOURCES := $(wildcard $(d)/*.cc) -libutil_LDFLAGS = -llzma $(OPENSSL_LIBS) +libutil_LDFLAGS = -llzma -pthread $(OPENSSL_LIBS) libutil_LIBS = libformat diff --git a/src/libutil/sync.hh b/src/libutil/sync.hh index c99c098ac9c6..ebe64ffbdab7 100644 --- a/src/libutil/sync.hh +++ b/src/libutil/sync.hh @@ -22,11 +22,11 @@ namespace nix { scope. */ -template<class T> +template<class T, class M = std::mutex> class Sync { private: - std::mutex mutex; + M mutex; T data; public: @@ -38,7 +38,7 @@ public: { private: Sync * s; - std::unique_lock<std::mutex> lk; + std::unique_lock<M> lk; friend Sync; Lock(Sync * s) : s(s), lk(s->mutex) { } public: diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc new file mode 100644 index 000000000000..743038b588a7 --- /dev/null +++ b/src/libutil/thread-pool.cc @@ -0,0 +1,82 @@ +#include "thread-pool.hh" + +namespace nix { + +ThreadPool::ThreadPool(size_t _nrThreads) + : nrThreads(_nrThreads) +{ + if (!nrThreads) { + nrThreads = std::thread::hardware_concurrency(); + if (!nrThreads) nrThreads = 1; + } +} + +void ThreadPool::enqueue(const work_t & t) +{ + auto state_(state.lock()); + state_->left.push(t); + wakeup.notify_one(); +} + +void ThreadPool::process() +{ + printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads); + + std::vector<std::thread> workers; + + for (size_t n = 0; n < nrThreads; n++) + workers.push_back(std::thread([&]() { + bool first = true; + + while (true) { + work_t work; + { + auto state_(state.lock()); + if (state_->exception) return; + if (!first) { + assert(state_->pending); + state_->pending--; + } + first = false; + while (state_->left.empty()) { + if (!state_->pending) { + wakeup.notify_all(); + return; + } + if (state_->exception) return; + state_.wait(wakeup); + } + work = state_->left.front(); + state_->left.pop(); + state_->pending++; + } + + try { + work(); + } catch (std::exception & e) { + auto state_(state.lock()); + if (state_->exception) { + if (!dynamic_cast<Interrupted*>(&e)) + printMsg(lvlError, format("error: %s") % e.what()); + } else { + state_->exception = std::current_exception(); + wakeup.notify_all(); + } + } + } + + })); + + for (auto & thr : workers) + thr.join(); + + { + auto state_(state.lock()); + if (state_->exception) + std::rethrow_exception(state_->exception); + } +} + +} + + diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh new file mode 100644 index 000000000000..77641d88ba4e --- /dev/null +++ b/src/libutil/thread-pool.hh @@ -0,0 +1,52 @@ +#pragma once + +#include "sync.hh" +#include "util.hh" + +#include <queue> +#include <functional> +#include <thread> + +namespace nix { + +/* A simple thread pool that executes a queue of work items + (lambdas). */ +class ThreadPool +{ +public: + + ThreadPool(size_t nrThreads = 0); + + // FIXME: use std::packaged_task? + typedef std::function<void()> work_t; + + /* Enqueue a function to be executed by the thread pool. */ + void enqueue(const work_t & t); + + /* Execute work items until the queue is empty. Note that work + items are allowed to add new items to the queue; this is + handled correctly. Queue processing stops prematurely if any + work item throws an exception. This exception is propagated to + the calling thread. If multiple work items throw an exception + concurrently, only one item is propagated; the others are + printed on stderr and otherwise ignored. */ + void process(); + +private: + + size_t nrThreads; + + struct State + { + std::queue<work_t> left; + size_t pending = 0; + std::exception_ptr exception; + }; + + Sync<State> state; + + std::condition_variable wakeup; + +}; + +} diff --git a/src/libutil/util.cc b/src/libutil/util.cc index 25246a3e89a9..55d490992108 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -548,7 +548,7 @@ void writeToStderr(const string & s) } -void (*_writeToStderr) (const unsigned char * buf, size_t count) = 0; +std::function<void(const unsigned char * buf, size_t count)> _writeToStderr; void readFull(int fd, unsigned char * buf, size_t count) @@ -1062,13 +1062,15 @@ void restoreSIGPIPE() volatile sig_atomic_t _isInterrupted = 0; +thread_local bool interruptThrown = false; + void _interrupted() { /* Block user interrupts while an exception is being handled. Throwing an exception while another exception is being handled kills the program! */ - if (!std::uncaught_exception()) { - _isInterrupted = 0; + if (!interruptThrown && !std::uncaught_exception()) { + interruptThrown = true; throw Interrupted("interrupted by the user"); } } diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 3606f6ec9eb2..20bd62a0e752 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -167,7 +167,7 @@ void warnOnce(bool & haveWarned, const FormatOrString & fs); void writeToStderr(const string & s); -extern void (*_writeToStderr) (const unsigned char * buf, size_t count); +extern std::function<void(const unsigned char * buf, size_t count)> _writeToStderr; /* Wrappers arount read()/write() that read/write exactly the @@ -316,6 +316,8 @@ void restoreSIGPIPE(); extern volatile sig_atomic_t _isInterrupted; +extern thread_local bool interruptThrown; + void _interrupted(); void inline checkInterrupt() |