From 5f862658c3f8e518fb631d0536f2b38f107970e1 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 24 Feb 2016 11:39:56 +0100 Subject: Remove bad daemon connections from the pool This is necessary for long-running processes like hydra-queue-runner: if a nix-daemon worker is killed, we need to stop reusing that connection. --- src/libstore/remote-store.cc | 6 +++++- src/libutil/pool.hh | 18 ++++++++++++++---- src/libutil/serialise.cc | 22 +++++++++++++++++++--- src/libutil/serialise.hh | 23 +++++++++++++++++------ 4 files changed, 55 insertions(+), 14 deletions(-) diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index f6ec3fffb614..2f540c640e0b 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -40,7 +40,11 @@ template PathSet readStorePaths(Source & from); RemoteStore::RemoteStore(size_t maxConnections) - : connections(make_ref>(maxConnections, [this]() { return openConnection(); })) + : connections(make_ref>( + maxConnections, + [this]() { return openConnection(); }, + [](const ref & r) { return r->to.good() && r->from.good(); } + )) { } diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh index 4b865a19389e..b75a3cbf854f 100644 --- a/src/libutil/pool.hh +++ b/src/libutil/pool.hh @@ -33,11 +33,17 @@ class Pool { public: + /* A function that produces new instances of R on demand. */ typedef std::function()> Factory; + /* A function that checks whether an instance of R is still + usable. Unusable instances are removed from the pool. */ + typedef std::function &)> Validator; + private: Factory factory; + Validator validator; struct State { @@ -53,8 +59,10 @@ private: public: Pool(size_t max = std::numeric_limits::max, - const Factory & factory = []() { return make_ref(); }) + const Factory & factory = []() { return make_ref(); }, + const Validator & validator = [](ref r) { return true; }) : factory(factory) + , validator(validator) { auto state_(state.lock()); state_->max = max; @@ -109,11 +117,13 @@ public: while (state_->idle.empty() && state_->inUse >= state_->max) state_.wait(wakeup); - if (!state_->idle.empty()) { + while (!state_->idle.empty()) { auto p = state_->idle.back(); state_->idle.pop_back(); - state_->inUse++; - return Handle(*this, p); + if (validator(p)) { + state_->inUse++; + return Handle(*this, p); + } } state_->inUse++; diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index f136a13248ba..c9620e2bf32a 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -72,7 +72,17 @@ void FdSink::write(const unsigned char * data, size_t len) warned = true; } } - writeFull(fd, data, len); + try { + writeFull(fd, data, len); + } catch (SysError & e) { + _good = true; + } +} + + +bool FdSink::good() +{ + return _good; } @@ -119,12 +129,18 @@ size_t FdSource::readUnbuffered(unsigned char * data, size_t len) checkInterrupt(); n = ::read(fd, (char *) data, bufSize); } while (n == -1 && errno == EINTR); - if (n == -1) throw SysError("reading from file"); - if (n == 0) throw EndOfFile("unexpected end-of-file"); + if (n == -1) { _good = false; throw SysError("reading from file"); } + if (n == 0) { _good = false; throw EndOfFile("unexpected end-of-file"); } return n; } +bool FdSource::good() +{ + return _good; +} + + size_t StringSource::read(unsigned char * data, size_t len) { if (pos == s.size()) throw EndOfFile("end of string reached"); diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 979ff849fcaf..9e269f3923ea 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -12,6 +12,7 @@ struct Sink { virtual ~Sink() { } virtual void operator () (const unsigned char * data, size_t len) = 0; + virtual bool good() { return true; } }; @@ -25,7 +26,7 @@ struct BufferedSink : Sink : bufSize(bufSize), bufPos(0), buffer(0) { } ~BufferedSink(); - void operator () (const unsigned char * data, size_t len); + void operator () (const unsigned char * data, size_t len) override; void flush(); @@ -47,6 +48,8 @@ struct Source return the number of bytes stored. If blocks until at least one byte is available. */ virtual size_t read(unsigned char * data, size_t len) = 0; + + virtual bool good() { return true; } }; @@ -60,7 +63,7 @@ struct BufferedSource : Source : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { } ~BufferedSource(); - size_t read(unsigned char * data, size_t len); + size_t read(unsigned char * data, size_t len) override; /* Underlying read call, to be overridden. */ virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0; @@ -80,7 +83,12 @@ struct FdSink : BufferedSink FdSink(int fd) : fd(fd), warn(false), written(0) { } ~FdSink(); - void write(const unsigned char * data, size_t len); + void write(const unsigned char * data, size_t len) override; + + bool good() override; + +private: + bool _good = true; }; @@ -90,7 +98,10 @@ struct FdSource : BufferedSource int fd; FdSource() : fd(-1) { } FdSource(int fd) : fd(fd) { } - size_t readUnbuffered(unsigned char * data, size_t len); + size_t readUnbuffered(unsigned char * data, size_t len) override; + bool good() override; +private: + bool _good = true; }; @@ -98,7 +109,7 @@ struct FdSource : BufferedSource struct StringSink : Sink { string s; - void operator () (const unsigned char * data, size_t len); + void operator () (const unsigned char * data, size_t len) override; }; @@ -108,7 +119,7 @@ struct StringSource : Source const string & s; size_t pos; StringSource(const string & _s) : s(_s), pos(0) { } - size_t read(unsigned char * data, size_t len); + size_t read(unsigned char * data, size_t len) override; }; -- cgit 1.4.1