about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/remote-store.cc4
-rw-r--r--src/libutil/serialise.cc25
-rw-r--r--src/libutil/serialise.hh19
-rw-r--r--src/nix-worker/nix-worker.cc16
4 files changed, 50 insertions, 14 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 0c6a1c37d1ec..8269b6a83168 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -65,6 +65,7 @@ void RemoteStore::openConnection()
     /* Send the magic greeting, check for the reply. */
     try {
         writeInt(WORKER_MAGIC_1, to);
+        to.flush();
         unsigned int magic = readInt(from);
         if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
 
@@ -166,6 +167,7 @@ void RemoteStore::connectToDaemon()
 RemoteStore::~RemoteStore()
 {
     try {
+        to.flush();
         fdSocket.close();
         if (child != -1)
             child.wait(true);
@@ -488,6 +490,7 @@ void RemoteStore::clearFailedPaths(const PathSet & paths)
 
 void RemoteStore::processStderr(Sink * sink, Source * source)
 {
+    to.flush();
     unsigned int msg;
     while ((msg = readInt(from)) == STDERR_NEXT
         || msg == STDERR_READ || msg == STDERR_WRITE) {
@@ -503,6 +506,7 @@ void RemoteStore::processStderr(Sink * sink, Source * source)
             AutoDeleteArray<unsigned char> d(buf);
             (*source)(buf, len);
             writeString(string((const char *) buf, len), to);
+            to.flush();
         }
         else {
             string s = readString(from);
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index 9b422271323f..66a64a6be4c8 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -9,7 +9,30 @@ namespace nix {
 
 void FdSink::operator () (const unsigned char * data, unsigned int len)
 {
-    writeFull(fd, data, len);
+    if (!buffer) buffer = new unsigned char[bufSize];
+    
+    while (len) {
+        /* Optimisation: bypass the buffer if the data exceeds the
+           buffer size and there is no unflushed data. */
+        if (bufPos == 0 && len >= bufSize) {
+            writeFull(fd, data, len);
+            break;
+        }
+        /* Otherwise, copy the bytes to the buffer.  Flush the buffer
+           when it's full. */
+        size_t n = bufPos + len > bufSize ? bufSize - bufPos : len;
+        memcpy(buffer + bufPos, data, n);
+        data += n; bufPos += n; len -= n;
+        if (bufPos == bufSize) flush();
+    }
+}
+
+
+void FdSink::flush()
+{
+    if (fd == -1 || bufPos == 0) return;
+    writeFull(fd, buffer, bufPos);
+    bufPos = 0;
 }
 
 
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 0e797d63bca9..711bd5e6c7ce 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -28,22 +28,29 @@ struct Source
 };
 
 
-/* A sink that writes data to a file descriptor. */
+/* A sink that writes data to a file descriptor (using a buffer). */
 struct FdSink : Sink
 {
     int fd;
+    unsigned int bufSize, bufPos;
+    unsigned char * buffer;
 
-    FdSink()
+    FdSink() : fd(-1), bufSize(32 * 1024), bufPos(0), buffer(0) { }
+    
+    FdSink(int fd, unsigned int bufSize = 32 * 1024)
+        : fd(fd), bufSize(bufSize), bufPos(0), buffer(0)
     {
-        fd = -1;
     }
-    
-    FdSink(int fd) 
+
+    ~FdSink()
     {
-        this->fd = fd;
+        flush();
+        if (buffer) delete[] buffer;
     }
     
     void operator () (const unsigned char * data, unsigned int len);
+
+    void flush();
 };
 
 
diff --git a/src/nix-worker/nix-worker.cc b/src/nix-worker/nix-worker.cc
index 8950f73ef25f..6c222420e0ce 100644
--- a/src/nix-worker/nix-worker.cc
+++ b/src/nix-worker/nix-worker.cc
@@ -57,6 +57,7 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
         try {
             writeInt(STDERR_NEXT, to);
             writeString(string((char *) buf, count), to);
+            to.flush();
         } catch (...) {
             /* Write failed; that means that the other side is
                gone. */
@@ -200,9 +201,7 @@ static void stopWork(bool success = true, const string & msg = "", unsigned int
 struct TunnelSink : Sink
 {
     Sink & to;
-    TunnelSink(Sink & to) : to(to)
-    {
-    }
+    TunnelSink(Sink & to) : to(to) { }
     virtual void operator ()
         (const unsigned char * data, unsigned int len)
     {
@@ -215,9 +214,7 @@ struct TunnelSink : Sink
 struct TunnelSource : Source
 {
     Source & from;
-    TunnelSource(Source & from) : from(from)
-    {
-    }
+    TunnelSource(Source & from) : from(from) { }
     virtual void operator ()
         (unsigned char * data, unsigned int len)
     {
@@ -228,6 +225,7 @@ struct TunnelSource : Source
         
         writeInt(STDERR_READ, to);
         writeInt(len, to);
+        to.flush();
         string s = readString(from);
         if (s.size() != len) throw Error("not enough data");
         memcpy(data, (const unsigned char *) s.c_str(), len);
@@ -596,8 +594,8 @@ static void processConnection()
     unsigned int magic = readInt(from);
     if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
     writeInt(WORKER_MAGIC_2, to);
-
     writeInt(PROTOCOL_VERSION, to);
+    to.flush();
     unsigned int clientVersion = readInt(from);
 
     /* Send startup error messages to the client. */
@@ -619,9 +617,11 @@ static void processConnection()
         store = boost::shared_ptr<StoreAPI>(new LocalStore());
 
         stopWork();
+        to.flush();
         
     } catch (Error & e) {
         stopWork(false, e.msg());
+        to.flush();
         return;
     }
 
@@ -652,6 +652,8 @@ static void processConnection()
             if (!errorAllowed) break;
         }
 
+        to.flush();
+
         assert(!canSendStderr);
     };