diff options
author | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-02-23T15·40+0100 |
---|---|---|
committer | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-02-23T15·40+0100 |
commit | d5626bf4c14f725136f2c5b6ac8bf818627352f0 (patch) | |
tree | a8fde81ecedfd2f8ba640e07bf6ea1bb0564c438 /src | |
parent | e292144d46e3fbb24ee9ee67f1682b268373921b (diff) |
Pool<T>: Allow a maximum pool size
Diffstat (limited to 'src')
-rw-r--r-- | src/libmain/shared.cc | 1 | ||||
-rw-r--r-- | src/libstore/remote-store.cc | 30 | ||||
-rw-r--r-- | src/libstore/remote-store.hh | 7 | ||||
-rw-r--r-- | src/libutil/pool.hh | 69 |
4 files changed, 74 insertions, 33 deletions
diff --git a/src/libmain/shared.cc b/src/libmain/shared.cc index 8f2aa842036a..c27302227304 100644 --- a/src/libmain/shared.cc +++ b/src/libmain/shared.cc @@ -126,6 +126,7 @@ void initNix() std::cerr.rdbuf()->pubsetbuf(buf, sizeof(buf)); #endif + // FIXME: do we need this? It's not thread-safe. std::ios::sync_with_stdio(false); if (getEnv("IN_SYSTEMD") == "1") diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 847da107a111..f6ec3fffb614 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -14,8 +14,8 @@ #include <sys/un.h> #include <errno.h> #include <fcntl.h> - #include <unistd.h> + #include <cstring> namespace nix { @@ -39,8 +39,8 @@ template<class T> T readStorePaths(Source & from) template PathSet readStorePaths(Source & from); -RemoteStore::RemoteStore() - : connections(make_ref<Pool<Connection>>([this]() { return openConnection(); })) +RemoteStore::RemoteStore(size_t maxConnections) + : connections(make_ref<Pool<Connection>>(maxConnections, [this]() { return openConnection(); })) { } @@ -116,18 +116,6 @@ ref<RemoteStore::Connection> RemoteStore::openConnection(bool reserveSpace) } -RemoteStore::~RemoteStore() -{ - try { - //to.flush(); - //fdSocket.close(); - // FIXME: close pool - } catch (...) { - ignoreException(); - } -} - - void RemoteStore::setOptions(ref<Connection> conn) { conn->to << wopSetOptions @@ -568,6 +556,18 @@ bool RemoteStore::verifyStore(bool checkContents, bool repair) return readInt(conn->from) != 0; } + +RemoteStore::Connection::~Connection() +{ + try { + to.flush(); + fd.close(); + } catch (...) { + ignoreException(); + } +} + + void RemoteStore::Connection::processStderr(Sink * sink, Source * source) { to.flush(); diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index b16a6b51db0f..af417b84ff9c 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -1,5 +1,6 @@ #pragma once +#include <limits> #include <string> #include "store-api.hh" @@ -19,9 +20,7 @@ class RemoteStore : public Store { public: - RemoteStore(); - - ~RemoteStore(); + RemoteStore(size_t maxConnections = std::numeric_limits<size_t>::max()); /* Implementations of abstract store API methods. */ @@ -100,6 +99,8 @@ private: FdSource from; unsigned int daemonVersion; + ~Connection(); + void processStderr(Sink * sink = 0, Source * source = 0); }; diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh index d63912e28aa9..4b865a19389e 100644 --- a/src/libutil/pool.hh +++ b/src/libutil/pool.hh @@ -1,8 +1,10 @@ #pragma once -#include <memory> -#include <list> #include <functional> +#include <limits> +#include <list> +#include <memory> +#include <cassert> #include "sync.hh" #include "ref.hh" @@ -39,37 +41,58 @@ private: struct State { - unsigned int count = 0; - std::list<ref<R>> idle; + size_t inUse = 0; + size_t max; + std::vector<ref<R>> idle; }; Sync<State> state; + std::condition_variable_any wakeup; + public: - Pool(const Factory & factory = []() { return make_ref<R>(); }) + Pool(size_t max = std::numeric_limits<size_t>::max, + const Factory & factory = []() { return make_ref<R>(); }) : factory(factory) - { } + { + auto state_(state.lock()); + state_->max = max; + } + + ~Pool() + { + auto state_(state.lock()); + assert(!state_->inUse); + state_->max = 0; + state_->idle.clear(); + } class Handle { private: Pool & pool; - ref<R> r; + std::shared_ptr<R> r; friend Pool; Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { } public: - Handle(Handle && h) : pool(h.pool), r(h.r) { abort(); } + Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); } Handle(const Handle & l) = delete; ~Handle() { - auto state_(pool.state.lock()); - state_->idle.push_back(r); + if (!r) return; + { + auto state_(pool.state.lock()); + state_->idle.push_back(ref<R>(r)); + assert(state_->inUse); + state_->inUse--; + } + pool.wakeup.notify_one(); } R * operator -> () { return &*r; } @@ -80,22 +103,38 @@ public: { { auto state_(state.lock()); + + /* If we're over the maximum number of instance, we need + to wait until a slot becomes available. */ + while (state_->idle.empty() && state_->inUse >= state_->max) + state_.wait(wakeup); + if (!state_->idle.empty()) { auto p = state_->idle.back(); state_->idle.pop_back(); + state_->inUse++; return Handle(*this, p); } - state_->count++; + + state_->inUse++; + } + + /* We need to create a new instance. Because that might take a + while, we don't hold the lock in the meantime. */ + try { + Handle h(*this, factory()); + return h; + } catch (...) { + auto state_(state.lock()); + state_->inUse--; + throw; } - /* Note: we don't hold the lock while creating a new instance, - because creation might take a long time. */ - return Handle(*this, factory()); } unsigned int count() { auto state_(state.lock()); - return state_->count; + return state_->count + state_->inUse; } }; |