about summary refs log tree commit diff
path: root/src/nix-worker
diff options
context:
space:
mode:
Diffstat (limited to 'src/nix-worker')
-rw-r--r--src/nix-worker/nix-worker.cc86
1 files changed, 49 insertions, 37 deletions
diff --git a/src/nix-worker/nix-worker.cc b/src/nix-worker/nix-worker.cc
index 3587bd7fde2e..68567f341e05 100644
--- a/src/nix-worker/nix-worker.cc
+++ b/src/nix-worker/nix-worker.cc
@@ -56,7 +56,8 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
     if (canSendStderr && myPid == getpid()) {
         try {
             writeInt(STDERR_NEXT, to);
-            writeString(string((char *) buf, count), to);
+            writeString(buf, count, to);
+            to.flush();
         } catch (...) {
             /* Write failed; that means that the other side is
                gone. */
@@ -200,26 +201,20 @@ static void stopWork(bool success = true, const string & msg = "", unsigned int
 struct TunnelSink : Sink
 {
     Sink & to;
-    TunnelSink(Sink & to) : to(to)
-    {
-    }
-    virtual void operator ()
-        (const unsigned char * data, unsigned int len)
+    TunnelSink(Sink & to) : to(to) { }
+    virtual void operator () (const unsigned char * data, size_t len)
     {
         writeInt(STDERR_WRITE, to);
-        writeString(string((const char *) data, len), to);
+        writeString(data, len, to);
     }
 };
 
 
-struct TunnelSource : Source
+struct TunnelSource : BufferedSource
 {
     Source & from;
-    TunnelSource(Source & from) : from(from)
-    {
-    }
-    virtual void operator ()
-        (unsigned char * data, unsigned int len)
+    TunnelSource(Source & from) : from(from) { }
+    size_t readUnbuffered(unsigned char * data, size_t len)
     {
         /* Careful: we're going to receive data from the client now,
            so we have to disable the SIGPOLL handler. */
@@ -228,11 +223,12 @@ struct TunnelSource : Source
         
         writeInt(STDERR_READ, to);
         writeInt(len, to);
-        string s = readString(from);
-        if (s.size() != len) throw Error("not enough data");
-        memcpy(data, (const unsigned char *) s.c_str(), len);
+        to.flush();
+        size_t n = readString(data, len, from);
 
         startWork();
+        if (n == 0) throw EndOfFile("unexpected end-of-file");
+        return n;
     }
 };
 
@@ -241,11 +237,14 @@ struct TunnelSource : Source
    the contents of the file to `s'.  Otherwise barf. */
 struct RetrieveRegularNARSink : ParseSink
 {
+    bool regular;
     string s;
 
+    RetrieveRegularNARSink() : regular(true) { }
+
     void createDirectory(const Path & path)
     {
-        throw Error("regular file expected");
+        regular = false;
     }
 
     void receiveContents(unsigned char * data, unsigned int len)
@@ -255,7 +254,7 @@ struct RetrieveRegularNARSink : ParseSink
 
     void createSymlink(const Path & path, const string & target)
     {
-        throw Error("regular file expected");
+        regular = false;
     }
 };
 
@@ -266,10 +265,11 @@ struct SavingSourceAdapter : Source
     Source & orig;
     string s;
     SavingSourceAdapter(Source & orig) : orig(orig) { }
-    void operator () (unsigned char * data, unsigned int len)
+    size_t read(unsigned char * data, size_t len)
     {
-        orig(data, len);
-        s.append((const char *) data, len);
+        size_t n = orig.read(data, len);
+        s.append((const char *) data, n);
+        return n;
     }
 };
 
@@ -327,7 +327,7 @@ static void performOp(unsigned int clientVersion,
             store->queryReferrers(path, paths);
         else paths = store->queryDerivationOutputs(path);
         stopWork();
-        writeStringSet(paths, to);
+        writeStrings(paths, to);
         break;
     }
 
@@ -371,11 +371,11 @@ static void performOp(unsigned int clientVersion,
                addToStoreFromDump(). */
             ParseSink sink; /* null sink; just parse the NAR */
             parseDump(sink, savedNAR);
-        } else {
+        } else
             parseDump(savedRegular, from);
-        }
             
         startWork();
+        if (!savedRegular.regular) throw Error("regular file expected");
         Path path = dynamic_cast<LocalStore *>(store.get())
             ->addToStoreFromDump(recursive ? savedNAR.s : savedRegular.s, baseName, recursive, hashAlgo);
         stopWork();
@@ -387,7 +387,7 @@ static void performOp(unsigned int clientVersion,
     case wopAddTextToStore: {
         string suffix = readString(from);
         string s = readString(from);
-        PathSet refs = readStorePaths(from);
+        PathSet refs = readStorePaths<PathSet>(from);
         startWork();
         Path path = store->addTextToStore(suffix, s, refs);
         stopWork();
@@ -406,17 +406,17 @@ static void performOp(unsigned int clientVersion,
         break;
     }
 
-    case wopImportPath: {
+    case wopImportPaths: {
         startWork();
         TunnelSource source(from);
-        Path path = store->importPath(true, source);
+        Paths paths = store->importPaths(true, source);
         stopWork();
-        writeString(path, to);
+        writeStrings(paths, to);
         break;
     }
 
     case wopBuildDerivations: {
-        PathSet drvs = readStorePaths(from);
+        PathSet drvs = readStorePaths<PathSet>(from);
         startWork();
         store->buildDerivations(drvs);
         stopWork();
@@ -474,7 +474,7 @@ static void performOp(unsigned int clientVersion,
     case wopCollectGarbage: {
         GCOptions options;
         options.action = (GCOptions::GCAction) readInt(from);
-        options.pathsToDelete = readStorePaths(from);
+        options.pathsToDelete = readStorePaths<PathSet>(from);
         options.ignoreLiveness = readInt(from);
         options.maxFreed = readLongLong(from);
         options.maxLinks = readInt(from);
@@ -492,7 +492,7 @@ static void performOp(unsigned int clientVersion,
         store->collectGarbage(options, results);
         stopWork();
         
-        writeStringSet(results.paths, to);
+        writeStrings(results.paths, to);
         writeLongLong(results.bytesFreed, to);
         writeLongLong(results.blocksFreed, to);
         
@@ -530,7 +530,7 @@ static void performOp(unsigned int clientVersion,
         writeInt(res ? 1 : 0, to);
         if (res) {
             writeString(info.deriver, to);
-            writeStringSet(info.references, to);
+            writeStrings(info.references, to);
             writeLongLong(info.downloadSize, to);
             if (GET_PROTOCOL_MINOR(clientVersion) >= 7)
                 writeLongLong(info.narSize, to);
@@ -542,7 +542,7 @@ static void performOp(unsigned int clientVersion,
         startWork();
         PathSet paths = store->queryValidPaths();
         stopWork();
-        writeStringSet(paths, to);
+        writeStrings(paths, to);
         break;
     }
 
@@ -550,12 +550,12 @@ static void performOp(unsigned int clientVersion,
         startWork();
         PathSet paths = store->queryFailedPaths();
         stopWork();
-        writeStringSet(paths, to);
+        writeStrings(paths, to);
         break;
     }
 
     case wopClearFailedPaths: {
-        PathSet paths = readStringSet(from);
+        PathSet paths = readStrings<PathSet>(from);
         startWork();
         store->clearFailedPaths(paths);
         stopWork();
@@ -570,7 +570,7 @@ static void performOp(unsigned int clientVersion,
         stopWork();
         writeString(info.deriver, to);
         writeString(printHash(info.hash), to);
-        writeStringSet(info.references, to);
+        writeStrings(info.references, to);
         writeInt(info.registrationTime, to);
         writeLongLong(info.narSize, to);
         break;
@@ -603,8 +603,8 @@ static void processConnection()
     unsigned int magic = readInt(from);
     if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
     writeInt(WORKER_MAGIC_2, to);
-
     writeInt(PROTOCOL_VERSION, to);
+    to.flush();
     unsigned int clientVersion = readInt(from);
 
     /* Send startup error messages to the client. */
@@ -626,9 +626,11 @@ static void processConnection()
         store = boost::shared_ptr<StoreAPI>(new LocalStore());
 
         stopWork();
+        to.flush();
         
     } catch (Error & e) {
         stopWork(false, e.msg());
+        to.flush();
         return;
     }
 
@@ -648,9 +650,19 @@ static void processConnection()
         try {
             performOp(clientVersion, from, to, op);
         } catch (Error & e) {
+            /* If we're not in a state were we can send replies, then
+               something went wrong processing the input of the
+               client.  This can happen especially if I/O errors occur
+               during addTextToStore() / importPath().  If that
+               happens, just send the error message and exit. */
+            bool errorAllowed = canSendStderr;
+            if (!errorAllowed) printMsg(lvlError, format("error processing client input: %1%") % e.msg());
             stopWork(false, e.msg(), GET_PROTOCOL_MINOR(clientVersion) >= 8 ? e.status : 0);
+            if (!errorAllowed) break;
         }
 
+        to.flush();
+
         assert(!canSendStderr);
     };