about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/build.cc6
-rw-r--r--src/libstore/remote-store.cc39
-rw-r--r--src/libstore/remote-store.hh2
-rw-r--r--src/libstore/worker-protocol.hh5
-rw-r--r--src/libutil/util.cc11
-rw-r--r--src/libutil/util.hh2
-rw-r--r--src/nix-worker/main.cc245
7 files changed, 206 insertions, 104 deletions
diff --git a/src/libstore/build.cc b/src/libstore/build.cc
index 71560b2d0c..d8b90252b4 100644
--- a/src/libstore/build.cc
+++ b/src/libstore/build.cc
@@ -872,7 +872,7 @@ static void drain(int fd)
             if (errno != EINTR)
                 throw SysError("draining");
         } else if (rd == 0) break;
-        else writeFull(STDERR_FILENO, buffer, rd);
+        else writeToStderr(buffer, rd);
     }
 }
 
@@ -1610,7 +1610,7 @@ void DerivationGoal::handleChildOutput(int fd, const string & data)
 {
     if (fd == logPipe.readSide) {
         if (verbosity >= buildVerbosity)
-            writeFull(STDERR_FILENO, (unsigned char *) data.c_str(), data.size());
+            writeToStderr((unsigned char *) data.c_str(), data.size());
         writeFull(fdLogFile, (unsigned char *) data.c_str(), data.size());
     }
 
@@ -1923,7 +1923,7 @@ void SubstitutionGoal::handleChildOutput(int fd, const string & data)
 {
     assert(fd == logPipe.readSide);
     if (verbosity >= buildVerbosity)
-        writeFull(STDERR_FILENO, (unsigned char *) data.c_str(), data.size());
+        writeToStderr((unsigned char *) data.c_str(), data.size());
     /* Don't write substitution output to a log file for now.  We
        probably should, though. */
 }
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 9b9d74f7e8..87547ce912 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -4,6 +4,10 @@
 #include "worker-protocol.hh"
 #include "archive.hh"
 
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
 #include <iostream>
 #include <unistd.h>
 
@@ -38,9 +42,15 @@ RemoteStore::RemoteStore()
             if (dup2(toChild.readSide, STDIN_FILENO) == -1)
                 throw SysError("dupping read side");
 
-            execlp(worker.c_str(), worker.c_str(),
-                "--slave", NULL);
+            int fdDebug = open("/tmp/worker-log", O_WRONLY | O_CREAT | O_TRUNC, 0644);
+            assert(fdDebug != -1);
+            if (dup2(fdDebug, STDERR_FILENO) == -1)
+                throw SysError("dupping stderr");
+            close(fdDebug);
             
+            execlp(worker.c_str(), worker.c_str(),
+                "-vvv", "--slave", NULL);
+
             throw SysError(format("executing `%1%'") % worker);
             
         } catch (std::exception & e) {
@@ -66,9 +76,13 @@ RemoteStore::RemoteStore()
 
 RemoteStore::~RemoteStore()
 {
-    writeInt(wopQuit, to);
-    readInt(from);
-    child.wait(true);
+    try {
+        fromChild.readSide.close();
+        toChild.writeSide.close();
+        child.wait(true);
+    } catch (Error & e) {
+        printMsg(lvlError, format("error (ignored): %1%") % e.msg());
+    }
 }
 
 
@@ -158,6 +172,7 @@ void RemoteStore::buildDerivations(const PathSet & drvPaths)
 {
     writeInt(wopBuildDerivations, to);
     writeStringSet(drvPaths, to);
+    processStderr();
     readInt(from);
 }
 
@@ -185,4 +200,18 @@ void RemoteStore::syncWithGC()
 }
 
 
+void RemoteStore::processStderr()
+{
+    unsigned int msg;
+    while ((msg = readInt(from)) == STDERR_NEXT) {
+        string s = readString(from);
+        writeToStderr((unsigned char *) s.c_str(), s.size());
+    }
+    if (msg == STDERR_ERROR)
+        throw Error(readString(from));
+    else if (msg != STDERR_LAST)
+        throw Error("protocol error processing standard error");
+}
+
+
 }
diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh
index b11191c09d..05d2a21ecb 100644
--- a/src/libstore/remote-store.hh
+++ b/src/libstore/remote-store.hh
@@ -57,6 +57,8 @@ private:
     FdSink to;
     FdSource from;
     Pid child;
+
+    void processStderr();
 };
 
 
diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh
index 2700b67197..284477483a 100644
--- a/src/libstore/worker-protocol.hh
+++ b/src/libstore/worker-protocol.hh
@@ -23,4 +23,9 @@ typedef enum {
 } WorkerOp;
 
 
+#define STDERR_NEXT  0x6f6c6d67
+#define STDERR_LAST  0x616c7473
+#define STDERR_ERROR 0x63787470
+
+
 #endif /* !__WORKER_PROTOCOL_H */
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index 6d96310dad..4460d95b8e 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -437,7 +437,7 @@ void printMsg_(Verbosity level, const format & f)
     else if (logType == ltEscapes && level != lvlInfo)
         prefix = "\033[" + escVerbosity(level) + "s";
     string s = (format("%1%%2%\n") % prefix % f.str()).str();
-    writeFull(STDERR_FILENO, (const unsigned char *) s.c_str(), s.size());
+    writeToStderr((const unsigned char *) s.c_str(), s.size());
 }
 
 
@@ -450,6 +450,15 @@ void warnOnce(bool & haveWarned, const format & f)
 }
 
 
+static void defaultWriteToStderr(const unsigned char * buf, size_t count)
+{
+    writeFull(STDERR_FILENO, buf, count);
+}
+
+
+void (*writeToStderr) (const unsigned char * buf, size_t count) = defaultWriteToStderr;
+
+
 void readFull(int fd, unsigned char * buf, size_t count)
 {
     while (count) {
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index d49067dfe2..0d39ffee9e 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -131,6 +131,8 @@ void printMsg_(Verbosity level, const format & f);
 
 void warnOnce(bool & haveWarned, const format & f);
 
+extern void (*writeToStderr) (const unsigned char * buf, size_t count);
+
 
 /* Wrappers arount read()/write() that read/write exactly the
    requested number of bytes. */
diff --git a/src/nix-worker/main.cc b/src/nix-worker/main.cc
index cf550895e4..17e892c648 100644
--- a/src/nix-worker/main.cc
+++ b/src/nix-worker/main.cc
@@ -10,7 +10,7 @@
 using namespace nix;
 
 
-Path readStorePath(Source & from)
+static Path readStorePath(Source & from)
 {
     Path path = readString(from);
     assertStorePath(path);
@@ -18,7 +18,7 @@ Path readStorePath(Source & from)
 }
 
 
-PathSet readStorePaths(Source & from)
+static PathSet readStorePaths(Source & from)
 {
     PathSet paths = readStringSet(from);
     for (PathSet::iterator i = paths.begin(); i != paths.end(); ++i)
@@ -27,123 +27,178 @@ PathSet readStorePaths(Source & from)
 }
 
 
-void processConnection(Source & from, Sink & to)
+static Sink * _to; /* !!! should make writeToStderr an object */
+bool canSendStderr;
+
+
+static void tunnelStderr(const unsigned char * buf, size_t count)
 {
-    store = boost::shared_ptr<StoreAPI>(new LocalStore(true));
+    writeFull(STDERR_FILENO, buf, count);
+    if (canSendStderr) {
+        try {
+            writeInt(STDERR_NEXT, *_to);
+            writeString(string((char *) buf, count), *_to);
+        } catch (...) {
+            /* Write failed; that means that the other side is
+               gone. */
+            canSendStderr = false;
+            throw;
+        }
+    }
+}
 
-    unsigned int magic = readInt(from);
-    if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
 
-    writeInt(WORKER_MAGIC_2, to);
+/* startWork() means that we're starting an operation for which we
+   want to send out stderr to the client. */
+static void startWork()
+{
+    canSendStderr = true;
+}
 
-    debug("greeting exchanged");
 
-    bool quit = false;
+/* stopWork() means that we're done; stop sending stderr to the
+   client. */
+static void stopWork()
+{
+    canSendStderr = false;
+    writeInt(STDERR_LAST, *_to);
+}
 
-    unsigned int opCount = 0;
-    
-    do {
+
+static void performOp(Source & from, Sink & to, unsigned int op)
+{
+    switch (op) {
+
+#if 0        
+    case wopQuit: {
+        /* Close the database. */
+        store.reset((StoreAPI *) 0);
+        writeInt(1, to);
+        break;
+    }
+#endif
+
+    case wopIsValidPath: {
+        Path path = readStorePath(from);
+        writeInt(store->isValidPath(path), to);
+        break;
+    }
+
+    case wopHasSubstitutes: {
+        Path path = readStorePath(from);
+        writeInt(store->hasSubstitutes(path), to);
+        break;
+    }
+
+    case wopQueryPathHash: {
+        Path path = readStorePath(from);
+        writeString(printHash(store->queryPathHash(path)), to);
+        break;
+    }
+
+    case wopQueryReferences:
+    case wopQueryReferrers: {
+        Path path = readStorePath(from);
+        PathSet paths;
+        if (op == wopQueryReferences)
+            store->queryReferences(path, paths);
+        else
+            store->queryReferrers(path, paths);
+        writeStringSet(paths, to);
+        break;
+    }
+
+    case wopAddToStore: {
+        /* !!! uberquick hack */
+        string baseName = readString(from);
+        bool fixed = readInt(from) == 1;
+        bool recursive = readInt(from) == 1;
+        string hashAlgo = readString(from);
         
-        WorkerOp op = (WorkerOp) readInt(from);
+        Path tmp = createTempDir();
+        Path tmp2 = tmp + "/" + baseName;
+        restorePath(tmp2, from);
 
-        opCount++;
+        writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to);
+            
+        deletePath(tmp);
+        break;
+    }
 
-        switch (op) {
+    case wopAddTextToStore: {
+        string suffix = readString(from);
+        string s = readString(from);
+        PathSet refs = readStorePaths(from);
+        writeString(store->addTextToStore(suffix, s, refs), to);
+        break;
+    }
 
-        case wopQuit: {
-            /* Close the database. */
-            store.reset((StoreAPI *) 0);
-            writeInt(1, to);
-            quit = true;
-            break;
-        }
+    case wopBuildDerivations: {
+        PathSet drvs = readStorePaths(from);
+        startWork();
+        store->buildDerivations(drvs);
+        stopWork();
+        writeInt(1, to);
+        break;
+    }
 
-        case wopIsValidPath: {
-            Path path = readStorePath(from);
-            writeInt(store->isValidPath(path), to);
-            break;
-        }
+    case wopEnsurePath: {
+        Path path = readStorePath(from);
+        store->ensurePath(path);
+        writeInt(1, to);
+        break;
+    }
 
-        case wopHasSubstitutes: {
-            Path path = readStorePath(from);
-            writeInt(store->hasSubstitutes(path), to);
-            break;
-        }
+    case wopAddTempRoot: {
+        Path path = readStorePath(from);
+        store->addTempRoot(path);
+        writeInt(1, to);
+        break;
+    }
 
-        case wopQueryPathHash: {
-            Path path = readStorePath(from);
-            writeString(printHash(store->queryPathHash(path)), to);
-            break;
-        }
+    case wopSyncWithGC: {
+        store->syncWithGC();
+        writeInt(1, to);
+        break;
+    }
 
-        case wopQueryReferences:
-        case wopQueryReferrers: {
-            Path path = readStorePath(from);
-            PathSet paths;
-            if (op == wopQueryReferences)
-                store->queryReferences(path, paths);
-            else
-                store->queryReferrers(path, paths);
-            writeStringSet(paths, to);
-            break;
-        }
+    default:
+        throw Error(format("invalid operation %1%") % op);
+    }
+}
 
-        case wopAddToStore: {
-            /* !!! uberquick hack */
-            string baseName = readString(from);
-            bool fixed = readInt(from) == 1;
-            bool recursive = readInt(from) == 1;
-            string hashAlgo = readString(from);
 
-            Path tmp = createTempDir();
-            Path tmp2 = tmp + "/" + baseName;
-            restorePath(tmp2, from);
+static void processConnection(Source & from, Sink & to)
+{
+    store = boost::shared_ptr<StoreAPI>(new LocalStore(true));
 
-            writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to);
-            
-            deletePath(tmp);
-            break;
-        }
+    unsigned int magic = readInt(from);
+    if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
 
-        case wopAddTextToStore: {
-            string suffix = readString(from);
-            string s = readString(from);
-            PathSet refs = readStorePaths(from);
-            writeString(store->addTextToStore(suffix, s, refs), to);
-            break;
-        }
+    writeInt(WORKER_MAGIC_2, to);
 
-        case wopBuildDerivations: {
-            PathSet drvs = readStorePaths(from);
-            store->buildDerivations(drvs);
-            writeInt(1, to);
-            break;
-        }
+    debug("greeting exchanged");
 
-        case wopEnsurePath: {
-            Path path = readStorePath(from);
-            store->ensurePath(path);
-            writeInt(1, to);
-            break;
-        }
+    _to = &to;
+    canSendStderr = false;
+    writeToStderr = tunnelStderr;
 
-        case wopAddTempRoot: {
-            Path path = readStorePath(from);
-            store->addTempRoot(path);
-            writeInt(1, to);
-            break;
-        }
+    bool quit = false;
 
-        case wopSyncWithGC: {
-            store->syncWithGC();
-            writeInt(1, to);
-            break;
-        }
+    unsigned int opCount = 0;
+    
+    do {
+        WorkerOp op = (WorkerOp) readInt(from);
 
-        default:
-            throw Error(format("invalid operation %1%") % op);
+        opCount++;
+
+        try {
+            performOp(from, to, op);
+        } catch (Error & e) {
+            writeInt(STDERR_ERROR, *_to);
+            writeString(e.msg(), to);
         }
-        
+
     } while (!quit);
 
     printMsg(lvlError, format("%1% worker operations") % opCount);