about summary refs log tree commit diff
path: root/src/libutil/thread-pool.cc
blob: 0a3a407240f7bfc19a8ad0e6890cc24fb07417ef (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include "thread-pool.hh"
#include "affinity.hh"

namespace nix {

ThreadPool::ThreadPool(size_t _maxThreads)
    : maxThreads(_maxThreads)
{
    restoreAffinity(); // FIXME

    if (!maxThreads) {
        maxThreads = std::thread::hardware_concurrency();
        if (!maxThreads) maxThreads = 1;
    }

    debug(format("starting pool of %d threads") % maxThreads);
}

ThreadPool::~ThreadPool()
{
    std::vector<std::thread> workers;
    {
        auto state(state_.lock());
        state->quit = true;
        std::swap(workers, state->workers);
    }

    debug(format("reaping %d worker threads") % workers.size());

    work.notify_all();

    for (auto & thr : workers)
        thr.join();
}

void ThreadPool::enqueue(const work_t & t)
{
    auto state(state_.lock());
    if (state->quit)
        throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
    state->left.push(t);
    if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
        state->workers.emplace_back(&ThreadPool::workerEntry, this);
    work.notify_one();
}

void ThreadPool::process()
{
    while (true) {
        auto state(state_.lock());
        if (state->exception)
            std::rethrow_exception(state->exception);
        if (state->left.empty() && !state->pending) break;
        state.wait(done);
    }
}

void ThreadPool::workerEntry()
{
    bool didWork = false;

    while (true) {
        work_t w;
        {
            auto state(state_.lock());
            while (true) {
                if (state->quit || state->exception) return;
                if (didWork) {
                    assert(state->pending);
                    state->pending--;
                    didWork = false;
                }
                if (!state->left.empty()) break;
                if (!state->pending)
                    done.notify_all();
                state.wait(work);
            }
            w = state->left.front();
            state->left.pop();
            state->pending++;
        }

        try {
            w();
        } catch (std::exception & e) {
            auto state(state_.lock());
            if (state->exception) {
                if (!dynamic_cast<Interrupted*>(&e) &&
                    !dynamic_cast<ThreadPoolShutDown*>(&e))
                    printError(format("error: %s") % e.what());
            } else {
                state->exception = std::current_exception();
                work.notify_all();
                done.notify_all();
            }
        }

        didWork = true;
    }
}

}