about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEelco Dolstra <e.dolstra@tudelft.nl>2011-12-16T19·44+0000
committerEelco Dolstra <e.dolstra@tudelft.nl>2011-12-16T19·44+0000
commite0bd307802d13476055f8ba99ab7808de0fd71e5 (patch)
tree83be8fedec92ebb15f05120e2e49451841699482
parent78598d06f0240a15b74720d8f987daeb702318d7 (diff)
* Make the import operation through the daemon much more efficient
  (way fewer roundtrips) by allowing the client to send data in bigger
  chunks.
* Some refactoring.

-rw-r--r--src/libstore/local-store.cc7
-rw-r--r--src/libstore/remote-store.cc6
-rw-r--r--src/libstore/worker-protocol.hh2
-rw-r--r--src/libutil/serialise.cc42
-rw-r--r--src/libutil/serialise.hh31
-rw-r--r--src/nix-worker/nix-worker.cc24
6 files changed, 68 insertions, 44 deletions
diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc
index 525e5fc7bbe3..65b1cdbc87a3 100644
--- a/src/libstore/local-store.cc
+++ b/src/libstore/local-store.cc
@@ -1199,10 +1199,11 @@ struct HashAndReadSource : Source
     {
         hashing = true;
     }
-    virtual void operator () (unsigned char * data, size_t len)
+    size_t read(unsigned char * data, size_t len)
     {
-        readSource(data, len);
-        if (hashing) hashSink(data, len);
+        size_t n = readSource.read(data, len);
+        if (hashing) hashSink(data, n);
+        return n;
     }
 };
 
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 7bf0ad7bd4c7..e976e8fa57ae 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -501,11 +501,11 @@ void RemoteStore::processStderr(Sink * sink, Source * source)
         }
         else if (msg == STDERR_READ) {
             if (!source) throw Error("no source");
-            unsigned int len = readInt(from);
+            size_t len = readInt(from);
             unsigned char * buf = new unsigned char[len];
             AutoDeleteArray<unsigned char> d(buf);
-            (*source)(buf, len);
-            writeString(string((const char *) buf, len), to);
+            size_t n = source->read(buf, len);
+            writeString(string((const char *) buf, n), to); // !!! inefficient
             to.flush();
         }
         else {
diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh
index acb8bc8b2948..acabd6ca30d2 100644
--- a/src/libstore/worker-protocol.hh
+++ b/src/libstore/worker-protocol.hh
@@ -8,7 +8,7 @@ namespace nix {
 #define WORKER_MAGIC_1 0x6e697863
 #define WORKER_MAGIC_2 0x6478696f
 
-#define PROTOCOL_VERSION 0x108
+#define PROTOCOL_VERSION 0x109
 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
 #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
 
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index 76f2e721a535..640267a131bf 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -23,8 +23,9 @@ void BufferedSink::operator () (const unsigned char * data, size_t len)
     
     while (len) {
         /* Optimisation: bypass the buffer if the data exceeds the
-           buffer size and there is no unflushed data. */
-        if (bufPos == 0 && len >= bufSize) {
+           buffer size. */
+        if (bufPos + len >= bufSize) {
+            flush();
             write(data, len);
             break;
         }
@@ -59,29 +60,37 @@ void FdSink::write(const unsigned char * data, size_t len)
 }
 
 
+void Source::operator () (unsigned char * data, size_t len)
+{
+    while (len) {
+        size_t n = read(data, len);
+        data += n; len -= n;
+    }
+}
+
+
 BufferedSource::~BufferedSource()
 {
     if (buffer) delete[] buffer;
 }
 
 
-void BufferedSource::operator () (unsigned char * data, size_t len)
+size_t BufferedSource::read(unsigned char * data, size_t len)
 {
     if (!buffer) buffer = new unsigned char[bufSize];
 
-    while (len) {
-        if (!bufPosIn) bufPosIn = read(buffer, bufSize);
+    if (!bufPosIn) bufPosIn = readUnbuffered(buffer, bufSize);
             
-        /* Copy out the data in the buffer. */
-        size_t n = len > bufPosIn - bufPosOut ? bufPosIn - bufPosOut : len;
-        memcpy(data, buffer + bufPosOut, n);
-        data += n; bufPosOut += n; len -= n;
-        if (bufPosIn == bufPosOut) bufPosIn = bufPosOut = 0;
-    }
+    /* Copy out the data in the buffer. */
+    size_t n = len > bufPosIn - bufPosOut ? bufPosIn - bufPosOut : len;
+    memcpy(data, buffer + bufPosOut, n);
+    bufPosOut += n;
+    if (bufPosIn == bufPosOut) bufPosIn = bufPosOut = 0;
+    return n;
 }
 
 
-size_t FdSource::read(unsigned char * data, size_t len)
+size_t FdSource::readUnbuffered(unsigned char * data, size_t len)
 {
     ssize_t n;
     do {
@@ -94,6 +103,15 @@ size_t FdSource::read(unsigned char * data, size_t len)
 }
 
 
+size_t StringSource::read(unsigned char * data, size_t len)
+{
+    if (pos == s.size()) throw EndOfFile("end of string reached");
+    size_t n = s.copy((char *) data, len, pos);
+    pos += n;
+    return n;
+}
+
+
 void writePadding(size_t len, Sink & sink)
 {
     if (len % 8) {
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index a155f6681e2b..25398b09d7e6 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -24,7 +24,7 @@ struct BufferedSink : Sink
     BufferedSink(size_t bufSize = 32 * 1024)
         : bufSize(bufSize), bufPos(0), buffer(0) { }
     ~BufferedSink();
-    
+
     void operator () (const unsigned char * data, size_t len);
     
     void flush();
@@ -39,9 +39,14 @@ struct Source
     virtual ~Source() { }
     
     /* Store exactly ‘len’ bytes in the buffer pointed to by ‘data’.
-       It blocks if that much data is not yet available, or throws an
-       error if it is not going to be available. */
-    virtual void operator () (unsigned char * data, size_t len) = 0;
+       It blocks until all the requested data is available, or throws
+       an error if it is not going to be available.   */
+    void operator () (unsigned char * data, size_t len);
+
+    /* Store up to ‘len’ in the buffer pointed to by ‘data’, and
+       return the number of bytes stored.  If blocks until at least
+       one byte is available. */
+    virtual size_t read(unsigned char * data, size_t len) = 0;
 };
 
 
@@ -55,12 +60,10 @@ struct BufferedSource : Source
         : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
     ~BufferedSource();
     
-    void operator () (unsigned char * data, size_t len);
+    size_t read(unsigned char * data, size_t len);
     
-    /* Store up to ‘len’ in the buffer pointed to by ‘data’, and
-       return the number of bytes stored.  If should block until at
-       least one byte is available. */
-    virtual size_t read(unsigned char * data, size_t len) = 0;
+    /* Underlying read call, to be overriden. */
+    virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0;
 };
 
 
@@ -83,7 +86,7 @@ struct FdSource : BufferedSource
     int fd;
     FdSource() : fd(-1) { }
     FdSource(int fd) : fd(fd) { }
-    size_t read(unsigned char * data, size_t len);
+    size_t readUnbuffered(unsigned char * data, size_t len);
 };
 
 
@@ -104,13 +107,7 @@ struct StringSource : Source
     const string & s;
     size_t pos;
     StringSource(const string & _s) : s(_s), pos(0) { }
-    virtual void operator () (unsigned char * data, size_t len)
-    {
-        s.copy((char *) data, len, pos);
-        pos += len;
-        if (pos > s.size())
-            throw Error("end of string reached");
-    }
+    size_t read(unsigned char * data, size_t len);    
 };
 
 
diff --git a/src/nix-worker/nix-worker.cc b/src/nix-worker/nix-worker.cc
index a89852638214..695e4c38d5d1 100644
--- a/src/nix-worker/nix-worker.cc
+++ b/src/nix-worker/nix-worker.cc
@@ -210,11 +210,11 @@ struct TunnelSink : Sink
 };
 
 
-struct TunnelSource : Source
+struct TunnelSource : BufferedSource
 {
     Source & from;
     TunnelSource(Source & from) : from(from) { }
-    virtual void operator () (unsigned char * data, size_t len)
+    size_t readUnbuffered(unsigned char * data, size_t len)
     {
         /* Careful: we're going to receive data from the client now,
            so we have to disable the SIGPOLL handler. */
@@ -224,11 +224,16 @@ struct TunnelSource : Source
         writeInt(STDERR_READ, to);
         writeInt(len, to);
         to.flush();
-        string s = readString(from);
-        if (s.size() != len) throw Error("not enough data");
-        memcpy(data, (const unsigned char *) s.c_str(), len);
+        string s = readString(from); // !!! inefficient
 
         startWork();
+
+        if (s.empty()) throw EndOfFile("unexpected end-of-file");
+        if (s.size() > len) throw Error("client sent too much data");
+
+        memcpy(data, (const unsigned char *) s.c_str(), s.size());
+
+        return s.size();
     }
 };
 
@@ -265,10 +270,11 @@ struct SavingSourceAdapter : Source
     Source & orig;
     string s;
     SavingSourceAdapter(Source & orig) : orig(orig) { }
-    void operator () (unsigned char * data, size_t len)
+    size_t read(unsigned char * data, size_t len)
     {
-        orig(data, len);
-        s.append((const char *) data, len);
+        size_t n = orig.read(data, len);
+        s.append((const char *) data, n);
+        return n;
     }
 };
 
@@ -397,6 +403,8 @@ static void performOp(unsigned int clientVersion,
 
     case wopImportPath: {
         startWork();
+        if (GET_PROTOCOL_MINOR(clientVersion) < 9)
+            throw Error("import not supported; upgrade your client");
         TunnelSource source(from);
         Path path = store->importPath(true, source);
         stopWork();