about summary refs log tree commit diff
path: root/src/libutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/local.mk2
-rw-r--r--src/libutil/sync.hh6
-rw-r--r--src/libutil/thread-pool.cc82
-rw-r--r--src/libutil/thread-pool.hh52
-rw-r--r--src/libutil/util.cc8
-rw-r--r--src/libutil/util.hh4
6 files changed, 146 insertions, 8 deletions
diff --git a/src/libutil/local.mk b/src/libutil/local.mk
index 4dae3305433f..2e5d2672e5f0 100644
--- a/src/libutil/local.mk
+++ b/src/libutil/local.mk
@@ -6,6 +6,6 @@ libutil_DIR := $(d)
 
 libutil_SOURCES := $(wildcard $(d)/*.cc)
 
-libutil_LDFLAGS = -llzma $(OPENSSL_LIBS)
+libutil_LDFLAGS = -llzma -pthread $(OPENSSL_LIBS)
 
 libutil_LIBS = libformat
diff --git a/src/libutil/sync.hh b/src/libutil/sync.hh
index c99c098ac9c6..ebe64ffbdab7 100644
--- a/src/libutil/sync.hh
+++ b/src/libutil/sync.hh
@@ -22,11 +22,11 @@ namespace nix {
    scope.
 */
 
-template<class T>
+template<class T, class M = std::mutex>
 class Sync
 {
 private:
-    std::mutex mutex;
+    M mutex;
     T data;
 
 public:
@@ -38,7 +38,7 @@ public:
     {
     private:
         Sync * s;
-        std::unique_lock<std::mutex> lk;
+        std::unique_lock<M> lk;
         friend Sync;
         Lock(Sync * s) : s(s), lk(s->mutex) { }
     public:
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
new file mode 100644
index 000000000000..743038b588a7
--- /dev/null
+++ b/src/libutil/thread-pool.cc
@@ -0,0 +1,82 @@
+#include "thread-pool.hh"
+
+namespace nix {
+
+ThreadPool::ThreadPool(size_t _nrThreads)
+    : nrThreads(_nrThreads)
+{
+    if (!nrThreads) {
+        nrThreads = std::thread::hardware_concurrency();
+        if (!nrThreads) nrThreads = 1;
+    }
+}
+
+void ThreadPool::enqueue(const work_t & t)
+{
+    auto state_(state.lock());
+    state_->left.push(t);
+    wakeup.notify_one();
+}
+
+void ThreadPool::process()
+{
+    printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads);
+
+    std::vector<std::thread> workers;
+
+    for (size_t n = 0; n < nrThreads; n++)
+        workers.push_back(std::thread([&]() {
+            bool first = true;
+
+            while (true) {
+                work_t work;
+                {
+                    auto state_(state.lock());
+                    if (state_->exception) return;
+                    if (!first) {
+                        assert(state_->pending);
+                        state_->pending--;
+                    }
+                    first = false;
+                    while (state_->left.empty()) {
+                        if (!state_->pending) {
+                            wakeup.notify_all();
+                            return;
+                        }
+                        if (state_->exception) return;
+                        state_.wait(wakeup);
+                    }
+                    work = state_->left.front();
+                    state_->left.pop();
+                    state_->pending++;
+                }
+
+                try {
+                    work();
+                } catch (std::exception & e) {
+                    auto state_(state.lock());
+                    if (state_->exception) {
+                        if (!dynamic_cast<Interrupted*>(&e))
+                            printMsg(lvlError, format("error: %s") % e.what());
+                    } else {
+                        state_->exception = std::current_exception();
+                        wakeup.notify_all();
+                    }
+                }
+            }
+
+        }));
+
+    for (auto & thr : workers)
+        thr.join();
+
+    {
+        auto state_(state.lock());
+        if (state_->exception)
+            std::rethrow_exception(state_->exception);
+    }
+}
+
+}
+
+
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
new file mode 100644
index 000000000000..77641d88ba4e
--- /dev/null
+++ b/src/libutil/thread-pool.hh
@@ -0,0 +1,52 @@
+#pragma once
+
+#include "sync.hh"
+#include "util.hh"
+
+#include <queue>
+#include <functional>
+#include <thread>
+
+namespace nix {
+
+/* A simple thread pool that executes a queue of work items
+   (lambdas). */
+class ThreadPool
+{
+public:
+
+    ThreadPool(size_t nrThreads = 0);
+
+    // FIXME: use std::packaged_task?
+    typedef std::function<void()> work_t;
+
+    /* Enqueue a function to be executed by the thread pool. */
+    void enqueue(const work_t & t);
+
+    /* Execute work items until the queue is empty. Note that work
+       items are allowed to add new items to the queue; this is
+       handled correctly. Queue processing stops prematurely if any
+       work item throws an exception. This exception is propagated to
+       the calling thread. If multiple work items throw an exception
+       concurrently, only one item is propagated; the others are
+       printed on stderr and otherwise ignored. */
+    void process();
+
+private:
+
+    size_t nrThreads;
+
+    struct State
+    {
+        std::queue<work_t> left;
+        size_t pending = 0;
+        std::exception_ptr exception;
+    };
+
+    Sync<State> state;
+
+    std::condition_variable wakeup;
+
+};
+
+}
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index 25246a3e89a9..55d490992108 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -548,7 +548,7 @@ void writeToStderr(const string & s)
 }
 
 
-void (*_writeToStderr) (const unsigned char * buf, size_t count) = 0;
+std::function<void(const unsigned char * buf, size_t count)> _writeToStderr;
 
 
 void readFull(int fd, unsigned char * buf, size_t count)
@@ -1062,13 +1062,15 @@ void restoreSIGPIPE()
 
 volatile sig_atomic_t _isInterrupted = 0;
 
+thread_local bool interruptThrown = false;
+
 void _interrupted()
 {
     /* Block user interrupts while an exception is being handled.
        Throwing an exception while another exception is being handled
        kills the program! */
-    if (!std::uncaught_exception()) {
-        _isInterrupted = 0;
+    if (!interruptThrown && !std::uncaught_exception()) {
+        interruptThrown = true;
         throw Interrupted("interrupted by the user");
     }
 }
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index 3606f6ec9eb2..20bd62a0e752 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -167,7 +167,7 @@ void warnOnce(bool & haveWarned, const FormatOrString & fs);
 
 void writeToStderr(const string & s);
 
-extern void (*_writeToStderr) (const unsigned char * buf, size_t count);
+extern std::function<void(const unsigned char * buf, size_t count)> _writeToStderr;
 
 
 /* Wrappers arount read()/write() that read/write exactly the
@@ -316,6 +316,8 @@ void restoreSIGPIPE();
 
 extern volatile sig_atomic_t _isInterrupted;
 
+extern thread_local bool interruptThrown;
+
 void _interrupted();
 
 void inline checkInterrupt()