about summary refs log tree commit diff
path: root/src/libutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/archive.cc17
-rw-r--r--src/libutil/archive.hh3
-rw-r--r--src/libutil/compression.cc166
-rw-r--r--src/libutil/compression.hh6
-rw-r--r--src/libutil/hash.cc1
-rw-r--r--src/libutil/local.mk2
-rw-r--r--src/libutil/logging.cc21
-rw-r--r--src/libutil/logging.hh14
-rw-r--r--src/libutil/lru-cache.hh8
-rw-r--r--src/libutil/serialise.cc63
-rw-r--r--src/libutil/serialise.hh41
-rw-r--r--src/libutil/util.cc98
-rw-r--r--src/libutil/util.hh20
13 files changed, 363 insertions, 97 deletions
diff --git a/src/libutil/archive.cc b/src/libutil/archive.cc
index 59be91c5cfb8..b2459336a885 100644
--- a/src/libutil/archive.cc
+++ b/src/libutil/archive.cc
@@ -350,4 +350,21 @@ void restorePath(const Path & path, Source & source)
 }
 
 
+void copyNAR(Source & source, Sink & sink)
+{
+    // FIXME: if 'source' is the output of dumpPath() followed by EOF,
+    // we should just forward all data directly without parsing.
+
+    ParseSink parseSink; /* null sink; just parse the NAR */
+
+    LambdaSource wrapper([&](unsigned char * data, size_t len) {
+        auto n = source.read(data, len);
+        sink(data, n);
+        return n;
+    });
+
+    parseDump(parseSink, wrapper);
+}
+
+
 }
diff --git a/src/libutil/archive.hh b/src/libutil/archive.hh
index 8a15e849c7b8..7a0e688e4201 100644
--- a/src/libutil/archive.hh
+++ b/src/libutil/archive.hh
@@ -74,6 +74,9 @@ void parseDump(ParseSink & sink, Source & source);
 
 void restorePath(const Path & path, Source & source);
 
+/* Read a NAR from 'source' and write it to 'sink'. */
+void copyNAR(Source & source, Sink & sink);
+
 
 // FIXME: global variables are bad m'kay.
 extern bool useCaseHack;
diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc
index 470c925ed7a6..e7fedcbdc52e 100644
--- a/src/libutil/compression.cc
+++ b/src/libutil/compression.cc
@@ -17,7 +17,23 @@
 
 namespace nix {
 
-static ref<std::string> decompressXZ(const std::string & in)
+static const size_t bufSize = 32 * 1024;
+
+static void decompressNone(Source & source, Sink & sink)
+{
+    std::vector<unsigned char> buf(bufSize);
+    while (true) {
+        size_t n;
+        try {
+            n = source.read(buf.data(), buf.size());
+        } catch (EndOfFile &) {
+            break;
+        }
+        sink(buf.data(), n);
+    }
+}
+
+static void decompressXZ(Source & source, Sink & sink)
 {
     lzma_stream strm(LZMA_STREAM_INIT);
 
@@ -29,36 +45,44 @@ static ref<std::string> decompressXZ(const std::string & in)
     Finally free([&]() { lzma_end(&strm); });
 
     lzma_action action = LZMA_RUN;
-    uint8_t outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    strm.next_in = (uint8_t *) in.c_str();
-    strm.avail_in = in.size();
-    strm.next_out = outbuf;
-    strm.avail_out = sizeof(outbuf);
+    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
+    strm.next_in = nullptr;
+    strm.avail_in = 0;
+    strm.next_out = outbuf.data();
+    strm.avail_out = outbuf.size();
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (strm.avail_in == 0 && !eof) {
+            strm.next_in = inbuf.data();
+            try {
+                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
         if (strm.avail_in == 0)
             action = LZMA_FINISH;
 
         lzma_ret ret = lzma_code(&strm, action);
 
-        if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
-            res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out);
-            strm.next_out = outbuf;
-            strm.avail_out = sizeof(outbuf);
+        if (strm.avail_out < outbuf.size()) {
+            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
+            strm.next_out = outbuf.data();
+            strm.avail_out = outbuf.size();
         }
 
-        if (ret == LZMA_STREAM_END)
-            return res;
+        if (ret == LZMA_STREAM_END) return;
 
         if (ret != LZMA_OK)
             throw CompressionError("error %d while decompressing xz file", ret);
     }
 }
 
-static ref<std::string> decompressBzip2(const std::string & in)
+static void decompressBzip2(Source & source, Sink & sink)
 {
     bz_stream strm;
     memset(&strm, 0, sizeof(strm));
@@ -69,39 +93,50 @@ static ref<std::string> decompressBzip2(const std::string & in)
 
     Finally free([&]() { BZ2_bzDecompressEnd(&strm); });
 
-    char outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    strm.next_in = (char *) in.c_str();
-    strm.avail_in = in.size();
-    strm.next_out = outbuf;
-    strm.avail_out = sizeof(outbuf);
+    std::vector<char> inbuf(bufSize), outbuf(bufSize);
+    strm.next_in = nullptr;
+    strm.avail_in = 0;
+    strm.next_out = outbuf.data();
+    strm.avail_out = outbuf.size();
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (strm.avail_in == 0 && !eof) {
+            strm.next_in = inbuf.data();
+            try {
+                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
         int ret = BZ2_bzDecompress(&strm);
 
-        if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
-            res->append(outbuf, sizeof(outbuf) - strm.avail_out);
-            strm.next_out = outbuf;
-            strm.avail_out = sizeof(outbuf);
+        if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof)
+            throw CompressionError("bzip2 data ends prematurely");
+
+        if (strm.avail_out < outbuf.size()) {
+            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
+            strm.next_out = outbuf.data();
+            strm.avail_out = outbuf.size();
         }
 
-        if (ret == BZ_STREAM_END)
-            return res;
+        if (ret == BZ_STREAM_END) return;
 
         if (ret != BZ_OK)
             throw CompressionError("error while decompressing bzip2 file");
-
-        if (strm.avail_in == 0)
-            throw CompressionError("bzip2 data ends prematurely");
     }
 }
 
-static ref<std::string> decompressBrotli(const std::string & in)
+static void decompressBrotli(Source & source, Sink & sink)
 {
 #if !HAVE_BROTLI
-    return make_ref<std::string>(runProgram(BROTLI, true, {"-d"}, {in}));
+    RunOptions options(BROTLI, {"-d"});
+    options.stdin = &source;
+    options.stdout = &sink;
+    runProgram2(options);
 #else
     auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
     if (!s)
@@ -109,16 +144,26 @@ static ref<std::string> decompressBrotli(const std::string & in)
 
     Finally free([s]() { BrotliDecoderDestroyInstance(s); });
 
-    uint8_t outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    const uint8_t *next_in = (uint8_t *)in.c_str();
-    size_t avail_in = in.size();
-    uint8_t *next_out = outbuf;
-    size_t avail_out = sizeof(outbuf);
+    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
+    const uint8_t * next_in = nullptr;
+    size_t avail_in = 0;
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (avail_in == 0 && !eof) {
+            next_in = inbuf.data();
+            try {
+                avail_in = source.read((unsigned char *) next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
+        uint8_t * next_out = outbuf.data();
+        size_t avail_out = outbuf.size();
+
         auto ret = BrotliDecoderDecompressStream(s,
                 &avail_in, &next_in,
                 &avail_out, &next_out,
@@ -128,51 +173,49 @@ static ref<std::string> decompressBrotli(const std::string & in)
         case BROTLI_DECODER_RESULT_ERROR:
             throw CompressionError("error while decompressing brotli file");
         case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
-            throw CompressionError("incomplete or corrupt brotli file");
+            if (eof)
+                throw CompressionError("incomplete or corrupt brotli file");
+            break;
         case BROTLI_DECODER_RESULT_SUCCESS:
             if (avail_in != 0)
                 throw CompressionError("unexpected input after brotli decompression");
             break;
         case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT:
             // I'm not sure if this can happen, but abort if this happens with empty buffer
-            if (avail_out == sizeof(outbuf))
+            if (avail_out == outbuf.size())
                 throw CompressionError("brotli decompression requires larger buffer");
             break;
         }
 
         // Always ensure we have full buffer for next invocation
-        if (avail_out < sizeof(outbuf)) {
-            res->append((char*)outbuf, sizeof(outbuf) - avail_out);
-            next_out = outbuf;
-            avail_out = sizeof(outbuf);
-        }
+        if (avail_out < outbuf.size())
+            sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out);
 
-        if (ret == BROTLI_DECODER_RESULT_SUCCESS) return res;
+        if (ret == BROTLI_DECODER_RESULT_SUCCESS) return;
     }
 #endif // HAVE_BROTLI
 }
 
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
+ref<std::string> decompress(const std::string & method, const std::string & in)
 {
-    StringSink ssink;
-    auto sink = makeCompressionSink(method, ssink, parallel);
-    (*sink)(in);
-    sink->finish();
-    return ssink.s;
+    StringSource source(in);
+    StringSink sink;
+    decompress(method, source, sink);
+    return sink.s;
 }
 
-ref<std::string> decompress(const std::string & method, const std::string & in)
+void decompress(const std::string & method, Source & source, Sink & sink)
 {
     if (method == "none")
-        return make_ref<std::string>(in);
+        return decompressNone(source, sink);
     else if (method == "xz")
-        return decompressXZ(in);
+        return decompressXZ(source, sink);
     else if (method == "bzip2")
-        return decompressBzip2(in);
+        return decompressBzip2(source, sink);
     else if (method == "br")
-        return decompressBrotli(in);
+        return decompressBrotli(source, sink);
     else
-        throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
+        throw UnknownCompressionMethod("unknown compression method '%s'", method);
 }
 
 struct NoneSink : CompressionSink
@@ -499,4 +542,13 @@ ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & next
         throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
 }
 
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
+{
+    StringSink ssink;
+    auto sink = makeCompressionSink(method, ssink, parallel);
+    (*sink)(in);
+    sink->finish();
+    return ssink.s;
+}
+
 }
diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh
index a0d7530d74fc..f7a3e3fbd32e 100644
--- a/src/libutil/compression.hh
+++ b/src/libutil/compression.hh
@@ -8,10 +8,12 @@
 
 namespace nix {
 
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
-
 ref<std::string> decompress(const std::string & method, const std::string & in);
 
+void decompress(const std::string & method, Source & source, Sink & sink);
+
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
+
 struct CompressionSink : BufferedSink
 {
     virtual void finish() = 0;
diff --git a/src/libutil/hash.cc b/src/libutil/hash.cc
index 8267481b8829..a01d651e1ef5 100644
--- a/src/libutil/hash.cc
+++ b/src/libutil/hash.cc
@@ -191,6 +191,7 @@ Hash::Hash(const std::string & s, HashType type)
         auto d = base64Decode(std::string(s, pos));
         if (d.size() != hashSize)
             throw BadHash("invalid base-64 hash '%s'", s);
+        assert(hashSize);
         memcpy(hash, d.data(), hashSize);
     }
 
diff --git a/src/libutil/local.mk b/src/libutil/local.mk
index 5fc2aab569da..824f48fbfc9f 100644
--- a/src/libutil/local.mk
+++ b/src/libutil/local.mk
@@ -6,7 +6,7 @@ libutil_DIR := $(d)
 
 libutil_SOURCES := $(wildcard $(d)/*.cc)
 
-libutil_LDFLAGS = $(LIBLZMA_LIBS) -lbz2 -pthread $(OPENSSL_LIBS) $(LIBBROTLI_LIBS)
+libutil_LDFLAGS = $(LIBLZMA_LIBS) -lbz2 -pthread $(OPENSSL_LIBS) $(LIBBROTLI_LIBS) -lboost_context
 
 libutil_LIBS = libformat
 
diff --git a/src/libutil/logging.cc b/src/libutil/logging.cc
index 27a631a37d10..799c6e1ae441 100644
--- a/src/libutil/logging.cc
+++ b/src/libutil/logging.cc
@@ -6,7 +6,16 @@
 
 namespace nix {
 
-thread_local ActivityId curActivity = 0;
+static thread_local ActivityId curActivity = 0;
+
+ActivityId getCurActivity()
+{
+    return curActivity;
+}
+void setCurActivity(const ActivityId activityId)
+{
+    curActivity = activityId;
+}
 
 Logger * logger = makeDefaultLogger();
 
@@ -44,7 +53,7 @@ public:
             prefix = std::string("<") + c + ">";
         }
 
-        writeToStderr(prefix + filterANSIEscapes(fs.s) + "\n");
+        writeToStderr(prefix + filterANSIEscapes(fs.s, !tty) + "\n");
     }
 
     void startActivity(ActivityId act, Verbosity lvl, ActivityType type,
@@ -221,4 +230,12 @@ bool handleJSONLogMessage(const std::string & msg,
     return true;
 }
 
+Activity::~Activity() {
+    try {
+        logger.stopActivity(id);
+    } catch (...) {
+        ignoreException();
+    }
+}
+
 }
diff --git a/src/libutil/logging.hh b/src/libutil/logging.hh
index 677aa4daec4d..678703102e9b 100644
--- a/src/libutil/logging.hh
+++ b/src/libutil/logging.hh
@@ -77,7 +77,8 @@ public:
     virtual void result(ActivityId act, ResultType type, const Fields & fields) { };
 };
 
-extern thread_local ActivityId curActivity;
+ActivityId getCurActivity();
+void setCurActivity(const ActivityId activityId);
 
 struct Activity
 {
@@ -86,16 +87,15 @@ struct Activity
     const ActivityId id;
 
     Activity(Logger & logger, Verbosity lvl, ActivityType type, const std::string & s = "",
-        const Logger::Fields & fields = {}, ActivityId parent = curActivity);
+        const Logger::Fields & fields = {}, ActivityId parent = getCurActivity());
 
     Activity(Logger & logger, ActivityType type,
-        const Logger::Fields & fields = {}, ActivityId parent = curActivity)
+        const Logger::Fields & fields = {}, ActivityId parent = getCurActivity())
         : Activity(logger, lvlError, type, "", fields, parent) { };
 
     Activity(const Activity & act) = delete;
 
-    ~Activity()
-    { logger.stopActivity(id); }
+    ~Activity();
 
     void progress(uint64_t done = 0, uint64_t expected = 0, uint64_t running = 0, uint64_t failed = 0) const
     { result(resProgress, done, expected, running, failed); }
@@ -122,8 +122,8 @@ struct Activity
 struct PushActivity
 {
     const ActivityId prevAct;
-    PushActivity(ActivityId act) : prevAct(curActivity) { curActivity = act; }
-    ~PushActivity() { curActivity = prevAct; }
+    PushActivity(ActivityId act) : prevAct(getCurActivity()) { setCurActivity(act); }
+    ~PushActivity() { setCurActivity(prevAct); }
 };
 
 extern Logger * logger;
diff --git a/src/libutil/lru-cache.hh b/src/libutil/lru-cache.hh
index 3cb5d50889d9..9b8290e634c9 100644
--- a/src/libutil/lru-cache.hh
+++ b/src/libutil/lru-cache.hh
@@ -2,6 +2,7 @@
 
 #include <map>
 #include <list>
+#include <experimental/optional>
 
 namespace nix {
 
@@ -63,18 +64,17 @@ public:
 
     /* Look up an item in the cache. If it exists, it becomes the most
        recently used item. */
-    // FIXME: use boost::optional?
-    Value * get(const Key & key)
+    std::experimental::optional<Value> get(const Key & key)
     {
         auto i = data.find(key);
-        if (i == data.end()) return 0;
+        if (i == data.end()) return {};
 
         /* Move this item to the back of the LRU list. */
         lru.erase(i->second.first.it);
         auto j = lru.insert(lru.end(), i);
         i->second.first.it = j;
 
-        return &i->second.second;
+        return i->second.second;
     }
 
     size_t size()
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index 9e2a502afaf8..33ae1ea389d7 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -5,6 +5,8 @@
 #include <cerrno>
 #include <memory>
 
+#include <boost/coroutine2/coroutine.hpp>
+
 
 namespace nix {
 
@@ -88,6 +90,23 @@ void Source::operator () (unsigned char * data, size_t len)
 }
 
 
+std::string Source::drain()
+{
+    std::string s;
+    std::vector<unsigned char> buf(8192);
+    while (true) {
+        size_t n;
+        try {
+            n = read(buf.data(), buf.size());
+            s.append((char *) buf.data(), n);
+        } catch (EndOfFile &) {
+            break;
+        }
+    }
+    return s;
+}
+
+
 size_t BufferedSource::read(unsigned char * data, size_t len)
 {
     if (!buffer) buffer = decltype(buffer)(new unsigned char[bufSize]);
@@ -138,6 +157,50 @@ size_t StringSource::read(unsigned char * data, size_t len)
 }
 
 
+std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun)
+{
+    struct SinkToSource : Source
+    {
+        typedef boost::coroutines2::coroutine<std::string> coro_t;
+
+        coro_t::pull_type coro;
+
+        SinkToSource(std::function<void(Sink &)> fun)
+            : coro([&](coro_t::push_type & yield) {
+                LambdaSink sink([&](const unsigned char * data, size_t len) {
+                    if (len) yield(std::string((const char *) data, len));
+                });
+                fun(sink);
+            })
+        {
+        }
+
+        std::string cur;
+        size_t pos = 0;
+
+        size_t read(unsigned char * data, size_t len) override
+        {
+            if (!coro)
+                throw EndOfFile("coroutine has finished");
+
+            if (pos == cur.size()) {
+                if (!cur.empty()) coro();
+                cur = coro.get();
+                pos = 0;
+            }
+
+            auto n = std::min(cur.size() - pos, len);
+            memcpy(data, (unsigned char *) cur.data() + pos, n);
+            pos += n;
+
+            return n;
+        }
+    };
+
+    return std::make_unique<SinkToSource>(fun);
+}
+
+
 void writePadding(size_t len, Sink & sink)
 {
     if (len % 8) {
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 2ea5b6354ee9..d0b4552e3399 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -56,11 +56,13 @@ struct Source
     void operator () (unsigned char * data, size_t len);
 
     /* Store up to ‘len’ in the buffer pointed to by ‘data’, and
-       return the number of bytes stored.  If blocks until at least
+       return the number of bytes stored.  It blocks until at least
        one byte is available. */
     virtual size_t read(unsigned char * data, size_t len) = 0;
 
     virtual bool good() { return true; }
+
+    std::string drain();
 };
 
 
@@ -175,6 +177,43 @@ struct TeeSource : Source
 };
 
 
+/* Convert a function into a sink. */
+struct LambdaSink : Sink
+{
+    typedef std::function<void(const unsigned char *, size_t)> lambda_t;
+
+    lambda_t lambda;
+
+    LambdaSink(const lambda_t & lambda) : lambda(lambda) { }
+
+    virtual void operator () (const unsigned char * data, size_t len)
+    {
+        lambda(data, len);
+    }
+};
+
+
+/* Convert a function into a source. */
+struct LambdaSource : Source
+{
+    typedef std::function<size_t(unsigned char *, size_t)> lambda_t;
+
+    lambda_t lambda;
+
+    LambdaSource(const lambda_t & lambda) : lambda(lambda) { }
+
+    size_t read(unsigned char * data, size_t len) override
+    {
+        return lambda(data, len);
+    }
+};
+
+
+/* Convert a function that feeds data into a Sink into a Source. The
+   Source executes the function as a coroutine. */
+std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun);
+
+
 void writePadding(size_t len, Sink & sink);
 void writeString(const unsigned char * buf, size_t len, Sink & sink);
 
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index 2c4705f6ba77..15962236ec65 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -3,6 +3,7 @@
 #include "affinity.hh"
 #include "sync.hh"
 #include "finally.hh"
+#include "serialise.hh"
 
 #include <cctype>
 #include <cerrno>
@@ -567,21 +568,44 @@ void writeFull(int fd, const string & s, bool allowInterrupts)
 }
 
 
-string drainFD(int fd)
+string drainFD(int fd, bool block)
 {
-    string result;
-    std::vector<unsigned char> buffer(4096);
+    StringSink sink;
+    drainFD(fd, sink, block);
+    return std::move(*sink.s);
+}
+
+
+void drainFD(int fd, Sink & sink, bool block)
+{
+    int saved;
+
+    Finally finally([&]() {
+        if (!block) {
+            if (fcntl(fd, F_SETFL, saved) == -1)
+                throw SysError("making file descriptor blocking");
+        }
+    });
+
+    if (!block) {
+        saved = fcntl(fd, F_GETFL);
+        if (fcntl(fd, F_SETFL, saved | O_NONBLOCK) == -1)
+            throw SysError("making file descriptor non-blocking");
+    }
+
+    std::vector<unsigned char> buf(4096);
     while (1) {
         checkInterrupt();
-        ssize_t rd = read(fd, buffer.data(), buffer.size());
+        ssize_t rd = read(fd, buf.data(), buf.size());
         if (rd == -1) {
+            if (!block && (errno == EAGAIN || errno == EWOULDBLOCK))
+                break;
             if (errno != EINTR)
                 throw SysError("reading from file");
         }
         else if (rd == 0) break;
-        else result.append((char *) buffer.data(), rd);
+        else sink(buf.data(), rd);
     }
-    return result;
 }
 
 
@@ -921,20 +945,47 @@ string runProgram(Path program, bool searchPath, const Strings & args,
     return res.second;
 }
 
-std::pair<int, std::string> runProgram(const RunOptions & options)
+std::pair<int, std::string> runProgram(const RunOptions & options_)
+{
+    RunOptions options(options_);
+    StringSink sink;
+    options.standardOut = &sink;
+
+    int status = 0;
+
+    try {
+        runProgram2(options);
+    } catch (ExecError & e) {
+        status = e.status;
+    }
+
+    return {status, std::move(*sink.s)};
+}
+
+void runProgram2(const RunOptions & options)
 {
     checkInterrupt();
 
+    assert(!(options.standardIn && options.input));
+
+    std::unique_ptr<Source> source_;
+    Source * source = options.standardIn;
+
+    if (options.input) {
+        source_ = std::make_unique<StringSource>(*options.input);
+        source = source_.get();
+    }
+
     /* Create a pipe. */
     Pipe out, in;
-    out.create();
-    if (options.input) in.create();
+    if (options.standardOut) out.create();
+    if (source) in.create();
 
     /* Fork. */
     Pid pid = startProcess([&]() {
-        if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
+        if (options.standardOut && dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
             throw SysError("dupping stdout");
-        if (options.input && dup2(in.readSide.get(), STDIN_FILENO) == -1)
+        if (source && dup2(in.readSide.get(), STDIN_FILENO) == -1)
             throw SysError("dupping stdin");
 
         Strings args_(options.args);
@@ -962,11 +1013,20 @@ std::pair<int, std::string> runProgram(const RunOptions & options)
     });
 
 
-    if (options.input) {
+    if (source) {
         in.readSide = -1;
         writerThread = std::thread([&]() {
             try {
-                writeFull(in.writeSide.get(), *options.input);
+                std::vector<unsigned char> buf(8 * 1024);
+                while (true) {
+                    size_t n;
+                    try {
+                        n = source->read(buf.data(), buf.size());
+                    } catch (EndOfFile &) {
+                        break;
+                    }
+                    writeFull(in.writeSide.get(), buf.data(), n);
+                }
                 promise.set_value();
             } catch (...) {
                 promise.set_exception(std::current_exception());
@@ -975,15 +1035,17 @@ std::pair<int, std::string> runProgram(const RunOptions & options)
         });
     }
 
-    string result = drainFD(out.readSide.get());
+    if (options.standardOut)
+        drainFD(out.readSide.get(), *options.standardOut);
 
     /* Wait for the child to finish. */
     int status = pid.wait();
 
     /* Wait for the writer thread to finish. */
-    if (options.input) promise.get_future().get();
+    if (source) promise.get_future().get();
 
-    return {status, result};
+    if (status)
+        throw ExecError(status, fmt("program '%1%' %2%", options.program, statusToString(status)));
 }
 
 
@@ -1186,7 +1248,7 @@ void ignoreException()
 }
 
 
-std::string filterANSIEscapes(const std::string & s, unsigned int width)
+std::string filterANSIEscapes(const std::string & s, bool filterAll, unsigned int width)
 {
     std::string t, e;
     size_t w = 0;
@@ -1211,7 +1273,7 @@ std::string filterANSIEscapes(const std::string & s, unsigned int width)
                 if (i != s.end() && *i >= 0x40 && *i <= 0x5f) e += *i++;
             }
 
-            if (last == 'm')
+            if (!filterAll && last == 'm')
                 t += e;
         }
 
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index c5c537ee63d8..743d238611fc 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -25,6 +25,9 @@
 
 namespace nix {
 
+struct Sink;
+struct Source;
+
 
 /* Return an environment variable. */
 string getEnv(const string & key, const string & def = "");
@@ -148,8 +151,9 @@ MakeError(EndOfFile, Error)
 
 
 /* Read a file descriptor until EOF occurs. */
-string drainFD(int fd);
+string drainFD(int fd, bool block = true);
 
+void drainFD(int fd, Sink & sink, bool block = true);
 
 
 /* Automatic cleanup of resources. */
@@ -256,6 +260,8 @@ struct RunOptions
     bool searchPath = true;
     Strings args;
     std::experimental::optional<std::string> input;
+    Source * standardIn = nullptr;
+    Sink * standardOut = nullptr;
     bool _killStderr = false;
 
     RunOptions(const Path & program, const Strings & args)
@@ -266,6 +272,8 @@ struct RunOptions
 
 std::pair<int, std::string> runProgram(const RunOptions & options);
 
+void runProgram2(const RunOptions & options);
+
 
 class ExecError : public Error
 {
@@ -391,11 +399,13 @@ void ignoreException();
 #define ANSI_BLUE "\e[34;1m"
 
 
-/* Truncate a string to 'width' printable characters. Certain ANSI
-   escape sequences (such as colour setting) are copied but not
-   included in the character count. Other ANSI escape sequences are
-   filtered. Also, tabs are expanded to spaces. */
+/* Truncate a string to 'width' printable characters. If 'filterAll'
+   is true, all ANSI escape sequences are filtered out. Otherwise,
+   some escape sequences (such as colour setting) are copied but not
+   included in the character count. Also, tabs are expanded to
+   spaces. */
 std::string filterANSIEscapes(const std::string & s,
+    bool filterAll = false,
     unsigned int width = std::numeric_limits<unsigned int>::max());