about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEelco Dolstra <eelco.dolstra@logicblox.com>2016-02-24T10·39+0100
committerEelco Dolstra <eelco.dolstra@logicblox.com>2016-02-24T10·39+0100
commit5f862658c3f8e518fb631d0536f2b38f107970e1 (patch)
tree34a38a2405fda244f547b06c6db264f3986e1aab
parentd5626bf4c14f725136f2c5b6ac8bf818627352f0 (diff)
Remove bad daemon connections from the pool
This is necessary for long-running processes like hydra-queue-runner:
if a nix-daemon worker is killed, we need to stop reusing that
connection.
-rw-r--r--src/libstore/remote-store.cc6
-rw-r--r--src/libutil/pool.hh18
-rw-r--r--src/libutil/serialise.cc22
-rw-r--r--src/libutil/serialise.hh23
4 files changed, 55 insertions, 14 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index f6ec3fffb6..2f540c640e 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -40,7 +40,11 @@ template PathSet readStorePaths(Source & from);
 
 
 RemoteStore::RemoteStore(size_t maxConnections)
-    : connections(make_ref<Pool<Connection>>(maxConnections, [this]() { return openConnection(); }))
+    : connections(make_ref<Pool<Connection>>(
+            maxConnections,
+            [this]() { return openConnection(); },
+            [](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
+            ))
 {
 }
 
diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh
index 4b865a1938..b75a3cbf85 100644
--- a/src/libutil/pool.hh
+++ b/src/libutil/pool.hh
@@ -33,11 +33,17 @@ class Pool
 {
 public:
 
+    /* A function that produces new instances of R on demand. */
     typedef std::function<ref<R>()> Factory;
 
+    /* A function that checks whether an instance of R is still
+       usable. Unusable instances are removed from the pool. */
+    typedef std::function<bool(const ref<R> &)> Validator;
+
 private:
 
     Factory factory;
+    Validator validator;
 
     struct State
     {
@@ -53,8 +59,10 @@ private:
 public:
 
     Pool(size_t max = std::numeric_limits<size_t>::max,
-        const Factory & factory = []() { return make_ref<R>(); })
+        const Factory & factory = []() { return make_ref<R>(); },
+        const Validator & validator = [](ref<R> r) { return true; })
         : factory(factory)
+        , validator(validator)
     {
         auto state_(state.lock());
         state_->max = max;
@@ -109,11 +117,13 @@ public:
             while (state_->idle.empty() && state_->inUse >= state_->max)
                 state_.wait(wakeup);
 
-            if (!state_->idle.empty()) {
+            while (!state_->idle.empty()) {
                 auto p = state_->idle.back();
                 state_->idle.pop_back();
-                state_->inUse++;
-                return Handle(*this, p);
+                if (validator(p)) {
+                    state_->inUse++;
+                    return Handle(*this, p);
+                }
             }
 
             state_->inUse++;
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index f136a13248..c9620e2bf3 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -72,7 +72,17 @@ void FdSink::write(const unsigned char * data, size_t len)
             warned = true;
         }
     }
-    writeFull(fd, data, len);
+    try {
+        writeFull(fd, data, len);
+    } catch (SysError & e) {
+        _good = true;
+    }
+}
+
+
+bool FdSink::good()
+{
+    return _good;
 }
 
 
@@ -119,12 +129,18 @@ size_t FdSource::readUnbuffered(unsigned char * data, size_t len)
         checkInterrupt();
         n = ::read(fd, (char *) data, bufSize);
     } while (n == -1 && errno == EINTR);
-    if (n == -1) throw SysError("reading from file");
-    if (n == 0) throw EndOfFile("unexpected end-of-file");
+    if (n == -1) { _good = false; throw SysError("reading from file"); }
+    if (n == 0) { _good = false; throw EndOfFile("unexpected end-of-file"); }
     return n;
 }
 
 
+bool FdSource::good()
+{
+    return _good;
+}
+
+
 size_t StringSource::read(unsigned char * data, size_t len)
 {
     if (pos == s.size()) throw EndOfFile("end of string reached");
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 979ff849fc..9e269f3923 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -12,6 +12,7 @@ struct Sink
 {
     virtual ~Sink() { }
     virtual void operator () (const unsigned char * data, size_t len) = 0;
+    virtual bool good() { return true; }
 };
 
 
@@ -25,7 +26,7 @@ struct BufferedSink : Sink
         : bufSize(bufSize), bufPos(0), buffer(0) { }
     ~BufferedSink();
 
-    void operator () (const unsigned char * data, size_t len);
+    void operator () (const unsigned char * data, size_t len) override;
 
     void flush();
 
@@ -47,6 +48,8 @@ struct Source
        return the number of bytes stored.  If blocks until at least
        one byte is available. */
     virtual size_t read(unsigned char * data, size_t len) = 0;
+
+    virtual bool good() { return true; }
 };
 
 
@@ -60,7 +63,7 @@ struct BufferedSource : Source
         : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
     ~BufferedSource();
 
-    size_t read(unsigned char * data, size_t len);
+    size_t read(unsigned char * data, size_t len) override;
 
     /* Underlying read call, to be overridden. */
     virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0;
@@ -80,7 +83,12 @@ struct FdSink : BufferedSink
     FdSink(int fd) : fd(fd), warn(false), written(0) { }
     ~FdSink();
 
-    void write(const unsigned char * data, size_t len);
+    void write(const unsigned char * data, size_t len) override;
+
+    bool good() override;
+
+private:
+    bool _good = true;
 };
 
 
@@ -90,7 +98,10 @@ struct FdSource : BufferedSource
     int fd;
     FdSource() : fd(-1) { }
     FdSource(int fd) : fd(fd) { }
-    size_t readUnbuffered(unsigned char * data, size_t len);
+    size_t readUnbuffered(unsigned char * data, size_t len) override;
+    bool good() override;
+private:
+    bool _good = true;
 };
 
 
@@ -98,7 +109,7 @@ struct FdSource : BufferedSource
 struct StringSink : Sink
 {
     string s;
-    void operator () (const unsigned char * data, size_t len);
+    void operator () (const unsigned char * data, size_t len) override;
 };
 
 
@@ -108,7 +119,7 @@ struct StringSource : Source
     const string & s;
     size_t pos;
     StringSource(const string & _s) : s(_s), pos(0) { }
-    size_t read(unsigned char * data, size_t len);
+    size_t read(unsigned char * data, size_t len) override;
 };