about summary refs log tree commit diff
path: root/src/libutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/local.mk2
-rw-r--r--src/libutil/serialise.cc63
-rw-r--r--src/libutil/serialise.hh23
3 files changed, 87 insertions, 1 deletions
diff --git a/src/libutil/local.mk b/src/libutil/local.mk
index 5fc2aab569da..824f48fbfc9f 100644
--- a/src/libutil/local.mk
+++ b/src/libutil/local.mk
@@ -6,7 +6,7 @@ libutil_DIR := $(d)
 
 libutil_SOURCES := $(wildcard $(d)/*.cc)
 
-libutil_LDFLAGS = $(LIBLZMA_LIBS) -lbz2 -pthread $(OPENSSL_LIBS) $(LIBBROTLI_LIBS)
+libutil_LDFLAGS = $(LIBLZMA_LIBS) -lbz2 -pthread $(OPENSSL_LIBS) $(LIBBROTLI_LIBS) -lboost_context
 
 libutil_LIBS = libformat
 
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index 9e2a502afaf8..6b793922511e 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -5,6 +5,8 @@
 #include <cerrno>
 #include <memory>
 
+#include <boost/coroutine2/coroutine.hpp>
+
 
 namespace nix {
 
@@ -88,6 +90,23 @@ void Source::operator () (unsigned char * data, size_t len)
 }
 
 
+std::string Source::drain()
+{
+    std::string s;
+    std::vector<unsigned char> buf(8192);
+    while (true) {
+        size_t n;
+        try {
+            n = read(buf.data(), buf.size());
+            s.append((char *) buf.data(), n);
+        } catch (EndOfFile &) {
+            break;
+        }
+    }
+    return s;
+}
+
+
 size_t BufferedSource::read(unsigned char * data, size_t len)
 {
     if (!buffer) buffer = decltype(buffer)(new unsigned char[bufSize]);
@@ -138,6 +157,50 @@ size_t StringSource::read(unsigned char * data, size_t len)
 }
 
 
+std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun)
+{
+    struct SinkToSource : Source
+    {
+        typedef boost::coroutines2::coroutine<std::string> coro_t;
+
+        coro_t::pull_type coro;
+
+        SinkToSource(std::function<void(Sink &)> fun)
+            : coro([&](coro_t::push_type & yield) {
+                LambdaSink sink([&](const unsigned char * data, size_t len) {
+                    if (len) yield(std::string((const char *) data, len));
+                });
+                fun(sink);
+            })
+        {
+        }
+
+        std::string cur;
+        size_t pos = 0;
+
+        size_t read(unsigned char * data, size_t len) override
+        {
+            if (!coro)
+                throw EndOfFile("coroutine has finished");
+
+            if (pos == cur.size()) {
+                if (!cur.empty()) coro();
+                cur = std::move(coro.get());
+                pos = 0;
+            }
+
+            auto n = std::min(cur.size() - pos, len);
+            memcpy(data, (unsigned char *) cur.data() + pos, n);
+            pos += n;
+
+            return n;
+        }
+    };
+
+    return std::make_unique<SinkToSource>(fun);
+}
+
+
 void writePadding(size_t len, Sink & sink)
 {
     if (len % 8) {
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 103b05767363..d0b4552e3399 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -61,6 +61,8 @@ struct Source
     virtual size_t read(unsigned char * data, size_t len) = 0;
 
     virtual bool good() { return true; }
+
+    std::string drain();
 };
 
 
@@ -191,6 +193,27 @@ struct LambdaSink : Sink
 };
 
 
+/* Convert a function into a source. */
+struct LambdaSource : Source
+{
+    typedef std::function<size_t(unsigned char *, size_t)> lambda_t;
+
+    lambda_t lambda;
+
+    LambdaSource(const lambda_t & lambda) : lambda(lambda) { }
+
+    size_t read(unsigned char * data, size_t len) override
+    {
+        return lambda(data, len);
+    }
+};
+
+
+/* Convert a function that feeds data into a Sink into a Source. The
+   Source executes the function as a coroutine. */
+std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun);
+
+
 void writePadding(size_t len, Sink & sink);
 void writeString(const unsigned char * buf, size_t len, Sink & sink);