about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libstore/remote-store.cc69
-rw-r--r--src/libstore/worker-protocol.hh3
-rw-r--r--src/libutil/logging.hh2
-rw-r--r--src/nix-daemon/nix-daemon.cc80
4 files changed, 124 insertions, 30 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index e9f2cee80db0..f0e3502bf79a 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -629,17 +629,37 @@ RemoteStore::Connection::~Connection()
 }
 
 
+static Logger::Fields readFields(Source & from)
+{
+    Logger::Fields fields;
+    size_t size = readInt(from);
+    for (size_t n = 0; n < size; n++) {
+        auto type = (decltype(Logger::Field::type)) readInt(from);
+        if (type == Logger::Field::tInt)
+            fields.push_back(readNum<uint64_t>(from));
+        else if (type == Logger::Field::tString)
+            fields.push_back(readString(from));
+        else
+            throw Error("got unsupported field type %x from Nix daemon", (int) type);
+    }
+    return fields;
+}
+
+
 void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
 {
     to.flush();
-    unsigned int msg;
-    while ((msg = readInt(from)) == STDERR_NEXT
-        || msg == STDERR_READ || msg == STDERR_WRITE) {
+
+    while (true) {
+
+        auto msg = readNum<uint64_t>(from);
+
         if (msg == STDERR_WRITE) {
             string s = readString(from);
             if (!sink) throw Error("no sink");
             (*sink)(s);
         }
+
         else if (msg == STDERR_READ) {
             if (!source) throw Error("no source");
             size_t len = readNum<size_t>(from);
@@ -647,16 +667,43 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
             writeString(buf.get(), source->read(buf.get(), len), to);
             to.flush();
         }
-        else
+
+        else if (msg == STDERR_ERROR) {
+            string error = readString(from);
+            unsigned int status = readInt(from);
+            throw Error(status, error);
+        }
+
+        else if (msg == STDERR_NEXT)
             printError(chomp(readString(from)));
+
+        else if (msg == STDERR_START_ACTIVITY) {
+            auto act = readNum<ActivityId>(from);
+            auto type = (ActivityType) readInt(from);
+            auto s = readString(from);
+            auto fields = readFields(from);
+            auto parent = readNum<ActivityId>(from);
+            logger->startActivity(act, type, s, fields, parent);
+        }
+
+        else if (msg == STDERR_STOP_ACTIVITY) {
+            auto act = readNum<ActivityId>(from);
+            logger->stopActivity(act);
+        }
+
+        else if (msg == STDERR_RESULT) {
+            auto act = readNum<ActivityId>(from);
+            auto type = (ResultType) readInt(from);
+            auto fields = readFields(from);
+            logger->result(act, type, fields);
+        }
+
+        else if (msg == STDERR_LAST)
+            break;
+
+        else
+            throw Error("got unknown message type %x from Nix daemon", msg);
     }
-    if (msg == STDERR_ERROR) {
-        string error = readString(from);
-        unsigned int status = readInt(from);
-        throw Error(status, error);
-    }
-    else if (msg != STDERR_LAST)
-        throw Error("protocol error processing standard error");
 }
 
 
diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh
index 6c6766b36124..9daeb46add1a 100644
--- a/src/libstore/worker-protocol.hh
+++ b/src/libstore/worker-protocol.hh
@@ -57,6 +57,9 @@ typedef enum {
 #define STDERR_WRITE 0x64617416 // data for sink
 #define STDERR_LAST  0x616c7473
 #define STDERR_ERROR 0x63787470
+#define STDERR_START_ACTIVITY 0x53545254
+#define STDERR_STOP_ACTIVITY  0x53544f50
+#define STDERR_RESULT         0x52534c54
 
 
 Path readStorePath(Store & store, Source & from);
diff --git a/src/libutil/logging.hh b/src/libutil/logging.hh
index 2ec15cb68d25..e3e7c8e6f330 100644
--- a/src/libutil/logging.hh
+++ b/src/libutil/logging.hh
@@ -47,7 +47,7 @@ public:
     struct Field
     {
         // FIXME: use std::variant.
-        enum { tInt, tString } type;
+        enum { tInt = 0, tString = 1 } type;
         uint64_t i = 0;
         std::string s;
         Field(const std::string & s) : type(tString), s(s) { }
diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc
index 3237333f4d06..65c88562cf09 100644
--- a/src/nix-daemon/nix-daemon.cc
+++ b/src/nix-daemon/nix-daemon.cc
@@ -56,6 +56,21 @@ static FdSource from(STDIN_FILENO);
 static FdSink to(STDOUT_FILENO);
 
 
+Sink & operator << (Sink & sink, const Logger::Fields & fields)
+{
+    sink << fields.size();
+    for (auto & f : fields) {
+        sink << f.type;
+        if (f.type == Logger::Field::tInt)
+            sink << f.i;
+        else if (f.type == Logger::Field::tString)
+            sink << f.s;
+        else abort();
+    }
+    return sink;
+}
+
+
 /* Logger that forwards log messages to the client, *if* we're in a
    state where the protocol allows it (i.e., when canSendStderr is
    true). */
@@ -69,15 +84,14 @@ struct TunnelLogger : public Logger
 
     Sync<State> state_;
 
-    void log(Verbosity lvl, const FormatOrString & fs) override
+    void enqueueMsg(const std::string & s)
     {
-        if (lvl > verbosity) return;
-
         auto state(state_.lock());
 
         if (state->canSendStderr) {
+            assert(state->pendingMsgs.empty());
             try {
-                to << STDERR_NEXT << (fs.s + "\n");
+                to(s);
                 to.flush();
             } catch (...) {
                 /* Write failed; that means that the other side is
@@ -86,7 +100,16 @@ struct TunnelLogger : public Logger
                 throw;
             }
         } else
-            state->pendingMsgs.push_back(fs.s);
+            state->pendingMsgs.push_back(s);
+    }
+
+    void log(Verbosity lvl, const FormatOrString & fs) override
+    {
+        if (lvl > verbosity) return;
+
+        StringSink buf;
+        buf << STDERR_NEXT << (fs.s + "\n");
+        enqueueMsg(*buf.s);
     }
 
     /* startWork() means that we're starting an operation for which we
@@ -99,7 +122,7 @@ struct TunnelLogger : public Logger
         state->canSendStderr = true;
 
         for (auto & msg : state->pendingMsgs)
-            to << STDERR_NEXT << (msg + "\n");
+            to(msg);
 
         state->pendingMsgs.clear();
 
@@ -121,6 +144,28 @@ struct TunnelLogger : public Logger
             if (status != 0) to << status;
         }
     }
+
+    void startActivity(ActivityId act, ActivityType type,
+        const std::string & s, const Fields & fields, ActivityId parent) override
+    {
+        StringSink buf;
+        buf << STDERR_START_ACTIVITY << act << type << s << fields << parent;
+        enqueueMsg(*buf.s);
+    }
+
+    void stopActivity(ActivityId act) override
+    {
+        StringSink buf;
+        buf << STDERR_STOP_ACTIVITY << act;
+        enqueueMsg(*buf.s);
+    }
+
+    void result(ActivityId act, ResultType type, const Fields & fields) override
+    {
+        StringSink buf;
+        buf << STDERR_RESULT << act << type << fields;
+        enqueueMsg(*buf.s);
+    }
 };
 
 
@@ -665,16 +710,15 @@ static void processConnection(bool trusted)
 {
     MonitorFdHup monitor(from.fd);
 
-    TunnelLogger tunnelLogger;
+    auto tunnelLogger = new TunnelLogger();
     auto prevLogger = nix::logger;
-    logger = &tunnelLogger;
+    logger = tunnelLogger;
 
     unsigned int opCount = 0;
 
     Finally finally([&]() {
-        logger = prevLogger;
         _isInterrupted = false;
-        debug("%d operations", opCount);
+        prevLogger->log(lvlDebug, fmt("%d operations", opCount));
     });
 
     /* Exchange the greeting. */
@@ -693,7 +737,7 @@ static void processConnection(bool trusted)
     readInt(from); // obsolete reserveSpace
 
     /* Send startup error messages to the client. */
-    tunnelLogger.startWork();
+    tunnelLogger->startWork();
 
     try {
 
@@ -713,7 +757,7 @@ static void processConnection(bool trusted)
         params["path-info-cache-size"] = "0";
         auto store = make_ref<LocalStore>(params);
 
-        tunnelLogger.stopWork();
+        tunnelLogger->stopWork();
         to.flush();
 
         /* Process client requests. */
@@ -730,28 +774,28 @@ static void processConnection(bool trusted)
             opCount++;
 
             try {
-                performOp(&tunnelLogger, store, trusted, clientVersion, from, to, op);
+                performOp(tunnelLogger, store, trusted, clientVersion, from, to, op);
             } catch (Error & e) {
                 /* If we're not in a state where we can send replies, then
                    something went wrong processing the input of the
                    client.  This can happen especially if I/O errors occur
                    during addTextToStore() / importPath().  If that
                    happens, just send the error message and exit. */
-                bool errorAllowed = tunnelLogger.state_.lock()->canSendStderr;
-                tunnelLogger.stopWork(false, e.msg(), e.status);
+                bool errorAllowed = tunnelLogger->state_.lock()->canSendStderr;
+                tunnelLogger->stopWork(false, e.msg(), e.status);
                 if (!errorAllowed) throw;
             } catch (std::bad_alloc & e) {
-                tunnelLogger.stopWork(false, "Nix daemon out of memory", 1);
+                tunnelLogger->stopWork(false, "Nix daemon out of memory", 1);
                 throw;
             }
 
             to.flush();
 
-            assert(!tunnelLogger.state_.lock()->canSendStderr);
+            assert(!tunnelLogger->state_.lock()->canSendStderr);
         };
 
     } catch (std::exception & e) {
-        tunnelLogger.stopWork(false, e.what(), 1);
+        tunnelLogger->stopWork(false, e.what(), 1);
         to.flush();
         return;
     }