diff options
-rw-r--r-- | src/libstore/remote-store.cc | 4 | ||||
-rw-r--r-- | src/libutil/serialise.cc | 25 | ||||
-rw-r--r-- | src/libutil/serialise.hh | 19 | ||||
-rw-r--r-- | src/nix-worker/nix-worker.cc | 16 |
4 files changed, 50 insertions, 14 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 0c6a1c37d1ec..8269b6a83168 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -65,6 +65,7 @@ void RemoteStore::openConnection() /* Send the magic greeting, check for the reply. */ try { writeInt(WORKER_MAGIC_1, to); + to.flush(); unsigned int magic = readInt(from); if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch"); @@ -166,6 +167,7 @@ void RemoteStore::connectToDaemon() RemoteStore::~RemoteStore() { try { + to.flush(); fdSocket.close(); if (child != -1) child.wait(true); @@ -488,6 +490,7 @@ void RemoteStore::clearFailedPaths(const PathSet & paths) void RemoteStore::processStderr(Sink * sink, Source * source) { + to.flush(); unsigned int msg; while ((msg = readInt(from)) == STDERR_NEXT || msg == STDERR_READ || msg == STDERR_WRITE) { @@ -503,6 +506,7 @@ void RemoteStore::processStderr(Sink * sink, Source * source) AutoDeleteArray<unsigned char> d(buf); (*source)(buf, len); writeString(string((const char *) buf, len), to); + to.flush(); } else { string s = readString(from); diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index 9b422271323f..66a64a6be4c8 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -9,7 +9,30 @@ namespace nix { void FdSink::operator () (const unsigned char * data, unsigned int len) { - writeFull(fd, data, len); + if (!buffer) buffer = new unsigned char[bufSize]; + + while (len) { + /* Optimisation: bypass the buffer if the data exceeds the + buffer size and there is no unflushed data. */ + if (bufPos == 0 && len >= bufSize) { + writeFull(fd, data, len); + break; + } + /* Otherwise, copy the bytes to the buffer. Flush the buffer + when it's full. */ + size_t n = bufPos + len > bufSize ? bufSize - bufPos : len; + memcpy(buffer + bufPos, data, n); + data += n; bufPos += n; len -= n; + if (bufPos == bufSize) flush(); + } +} + + +void FdSink::flush() +{ + if (fd == -1 || bufPos == 0) return; + writeFull(fd, buffer, bufPos); + bufPos = 0; } diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 0e797d63bca9..711bd5e6c7ce 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -28,22 +28,29 @@ struct Source }; -/* A sink that writes data to a file descriptor. */ +/* A sink that writes data to a file descriptor (using a buffer). */ struct FdSink : Sink { int fd; + unsigned int bufSize, bufPos; + unsigned char * buffer; - FdSink() + FdSink() : fd(-1), bufSize(32 * 1024), bufPos(0), buffer(0) { } + + FdSink(int fd, unsigned int bufSize = 32 * 1024) + : fd(fd), bufSize(bufSize), bufPos(0), buffer(0) { - fd = -1; } - - FdSink(int fd) + + ~FdSink() { - this->fd = fd; + flush(); + if (buffer) delete[] buffer; } void operator () (const unsigned char * data, unsigned int len); + + void flush(); }; diff --git a/src/nix-worker/nix-worker.cc b/src/nix-worker/nix-worker.cc index 8950f73ef25f..6c222420e0ce 100644 --- a/src/nix-worker/nix-worker.cc +++ b/src/nix-worker/nix-worker.cc @@ -57,6 +57,7 @@ static void tunnelStderr(const unsigned char * buf, size_t count) try { writeInt(STDERR_NEXT, to); writeString(string((char *) buf, count), to); + to.flush(); } catch (...) { /* Write failed; that means that the other side is gone. */ @@ -200,9 +201,7 @@ static void stopWork(bool success = true, const string & msg = "", unsigned int struct TunnelSink : Sink { Sink & to; - TunnelSink(Sink & to) : to(to) - { - } + TunnelSink(Sink & to) : to(to) { } virtual void operator () (const unsigned char * data, unsigned int len) { @@ -215,9 +214,7 @@ struct TunnelSink : Sink struct TunnelSource : Source { Source & from; - TunnelSource(Source & from) : from(from) - { - } + TunnelSource(Source & from) : from(from) { } virtual void operator () (unsigned char * data, unsigned int len) { @@ -228,6 +225,7 @@ struct TunnelSource : Source writeInt(STDERR_READ, to); writeInt(len, to); + to.flush(); string s = readString(from); if (s.size() != len) throw Error("not enough data"); memcpy(data, (const unsigned char *) s.c_str(), len); @@ -596,8 +594,8 @@ static void processConnection() unsigned int magic = readInt(from); if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch"); writeInt(WORKER_MAGIC_2, to); - writeInt(PROTOCOL_VERSION, to); + to.flush(); unsigned int clientVersion = readInt(from); /* Send startup error messages to the client. */ @@ -619,9 +617,11 @@ static void processConnection() store = boost::shared_ptr<StoreAPI>(new LocalStore()); stopWork(); + to.flush(); } catch (Error & e) { stopWork(false, e.msg()); + to.flush(); return; } @@ -652,6 +652,8 @@ static void processConnection() if (!errorAllowed) break; } + to.flush(); + assert(!canSendStderr); }; |