about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libmain/shared.cc1
-rw-r--r--src/libstore/remote-store.cc30
-rw-r--r--src/libstore/remote-store.hh7
-rw-r--r--src/libutil/pool.hh69
4 files changed, 74 insertions, 33 deletions
diff --git a/src/libmain/shared.cc b/src/libmain/shared.cc
index 8f2aa842036a..c27302227304 100644
--- a/src/libmain/shared.cc
+++ b/src/libmain/shared.cc
@@ -126,6 +126,7 @@ void initNix()
     std::cerr.rdbuf()->pubsetbuf(buf, sizeof(buf));
 #endif
 
+    // FIXME: do we need this? It's not thread-safe.
     std::ios::sync_with_stdio(false);
 
     if (getEnv("IN_SYSTEMD") == "1")
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 847da107a111..f6ec3fffb614 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -14,8 +14,8 @@
 #include <sys/un.h>
 #include <errno.h>
 #include <fcntl.h>
-
 #include <unistd.h>
+
 #include <cstring>
 
 namespace nix {
@@ -39,8 +39,8 @@ template<class T> T readStorePaths(Source & from)
 template PathSet readStorePaths(Source & from);
 
 
-RemoteStore::RemoteStore()
-    : connections(make_ref<Pool<Connection>>([this]() { return openConnection(); }))
+RemoteStore::RemoteStore(size_t maxConnections)
+    : connections(make_ref<Pool<Connection>>(maxConnections, [this]() { return openConnection(); }))
 {
 }
 
@@ -116,18 +116,6 @@ ref<RemoteStore::Connection> RemoteStore::openConnection(bool reserveSpace)
 }
 
 
-RemoteStore::~RemoteStore()
-{
-    try {
-        //to.flush();
-        //fdSocket.close();
-        // FIXME: close pool
-    } catch (...) {
-        ignoreException();
-    }
-}
-
-
 void RemoteStore::setOptions(ref<Connection> conn)
 {
     conn->to << wopSetOptions
@@ -568,6 +556,18 @@ bool RemoteStore::verifyStore(bool checkContents, bool repair)
     return readInt(conn->from) != 0;
 }
 
+
+RemoteStore::Connection::~Connection()
+{
+    try {
+        to.flush();
+        fd.close();
+    } catch (...) {
+        ignoreException();
+    }
+}
+
+
 void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
 {
     to.flush();
diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh
index b16a6b51db0f..af417b84ff9c 100644
--- a/src/libstore/remote-store.hh
+++ b/src/libstore/remote-store.hh
@@ -1,5 +1,6 @@
 #pragma once
 
+#include <limits>
 #include <string>
 
 #include "store-api.hh"
@@ -19,9 +20,7 @@ class RemoteStore : public Store
 {
 public:
 
-    RemoteStore();
-
-    ~RemoteStore();
+    RemoteStore(size_t maxConnections = std::numeric_limits<size_t>::max());
 
     /* Implementations of abstract store API methods. */
 
@@ -100,6 +99,8 @@ private:
         FdSource from;
         unsigned int daemonVersion;
 
+        ~Connection();
+
         void processStderr(Sink * sink = 0, Source * source = 0);
     };
 
diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh
index d63912e28aa9..4b865a19389e 100644
--- a/src/libutil/pool.hh
+++ b/src/libutil/pool.hh
@@ -1,8 +1,10 @@
 #pragma once
 
-#include <memory>
-#include <list>
 #include <functional>
+#include <limits>
+#include <list>
+#include <memory>
+#include <cassert>
 
 #include "sync.hh"
 #include "ref.hh"
@@ -39,37 +41,58 @@ private:
 
     struct State
     {
-        unsigned int count = 0;
-        std::list<ref<R>> idle;
+        size_t inUse = 0;
+        size_t max;
+        std::vector<ref<R>> idle;
     };
 
     Sync<State> state;
 
+    std::condition_variable_any wakeup;
+
 public:
 
-    Pool(const Factory & factory = []() { return make_ref<R>(); })
+    Pool(size_t max = std::numeric_limits<size_t>::max,
+        const Factory & factory = []() { return make_ref<R>(); })
         : factory(factory)
-    { }
+    {
+        auto state_(state.lock());
+        state_->max = max;
+    }
+
+    ~Pool()
+    {
+        auto state_(state.lock());
+        assert(!state_->inUse);
+        state_->max = 0;
+        state_->idle.clear();
+    }
 
     class Handle
     {
     private:
         Pool & pool;
-        ref<R> r;
+        std::shared_ptr<R> r;
 
         friend Pool;
 
         Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
 
     public:
-        Handle(Handle && h) : pool(h.pool), r(h.r) { abort(); }
+        Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); }
 
         Handle(const Handle & l) = delete;
 
         ~Handle()
         {
-            auto state_(pool.state.lock());
-            state_->idle.push_back(r);
+            if (!r) return;
+            {
+                auto state_(pool.state.lock());
+                state_->idle.push_back(ref<R>(r));
+                assert(state_->inUse);
+                state_->inUse--;
+            }
+            pool.wakeup.notify_one();
         }
 
         R * operator -> () { return &*r; }
@@ -80,22 +103,38 @@ public:
     {
         {
             auto state_(state.lock());
+
+            /* If we're over the maximum number of instance, we need
+               to wait until a slot becomes available. */
+            while (state_->idle.empty() && state_->inUse >= state_->max)
+                state_.wait(wakeup);
+
             if (!state_->idle.empty()) {
                 auto p = state_->idle.back();
                 state_->idle.pop_back();
+                state_->inUse++;
                 return Handle(*this, p);
             }
-            state_->count++;
+
+            state_->inUse++;
+        }
+
+        /* We need to create a new instance. Because that might take a
+           while, we don't hold the lock in the meantime. */
+        try {
+            Handle h(*this, factory());
+            return h;
+        } catch (...) {
+            auto state_(state.lock());
+            state_->inUse--;
+            throw;
         }
-        /* Note: we don't hold the lock while creating a new instance,
-           because creation might take a long time. */
-        return Handle(*this, factory());
     }
 
     unsigned int count()
     {
         auto state_(state.lock());
-        return state_->count;
+        return state_->count + state_->inUse;
     }
 };