about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2018-03-16T15·59+0100
committerEelco Dolstra <edolstra@gmail.com>2018-03-16T19·35+0100
commit3e6b194d78024373c2320f31f4ba0de3d0658b83 (patch)
tree76576868d1a0b728f1905e9df3ba3429f3deafe8 /src
parent64441f055194e9fd9e924a58e67cadb030ba918f (diff)
decompress(): Use a Source and Sink
This allows decompression to happen in O(1) memory.
Diffstat (limited to 'src')
-rw-r--r--src/libstore/binary-cache-store.cc20
-rw-r--r--src/libutil/compression.cc166
-rw-r--r--src/libutil/compression.hh6
-rw-r--r--src/libutil/serialise.hh18
-rw-r--r--src/libutil/util.cc75
-rw-r--r--src/libutil/util.hh8
6 files changed, 206 insertions, 87 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index d1b278b8ef..2e9a13e564 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -203,22 +203,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
     stats.narRead++;
     stats.narReadCompressedBytes += nar->size();
 
-    /* Decompress the NAR. FIXME: would be nice to have the remote
-       side do this. */
-    try {
-        nar = decompress(info->compression, *nar);
-    } catch (UnknownCompressionMethod &) {
-        throw Error(format("binary cache path '%s' uses unknown compression method '%s'")
-            % storePath % info->compression);
-    }
+    uint64_t narSize = 0;
 
-    stats.narReadBytes += nar->size();
+    StringSource source(*nar);
 
-    printMsg(lvlTalkative, format("exporting path '%1%' (%2% bytes)") % storePath % nar->size());
+    LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
+        sink(data, len);
+        narSize += len;
+    });
 
-    assert(nar->size() % 8 == 0);
+    decompress(info->compression, source, wrapperSink);
 
-    sink((unsigned char *) nar->c_str(), nar->size());
+    stats.narReadBytes += narSize;
 }
 
 void BinaryCacheStore::queryPathInfoUncached(const Path & storePath,
diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc
index 470c925ed7..e7fedcbdc5 100644
--- a/src/libutil/compression.cc
+++ b/src/libutil/compression.cc
@@ -17,7 +17,23 @@
 
 namespace nix {
 
-static ref<std::string> decompressXZ(const std::string & in)
+static const size_t bufSize = 32 * 1024;
+
+static void decompressNone(Source & source, Sink & sink)
+{
+    std::vector<unsigned char> buf(bufSize);
+    while (true) {
+        size_t n;
+        try {
+            n = source.read(buf.data(), buf.size());
+        } catch (EndOfFile &) {
+            break;
+        }
+        sink(buf.data(), n);
+    }
+}
+
+static void decompressXZ(Source & source, Sink & sink)
 {
     lzma_stream strm(LZMA_STREAM_INIT);
 
@@ -29,36 +45,44 @@ static ref<std::string> decompressXZ(const std::string & in)
     Finally free([&]() { lzma_end(&strm); });
 
     lzma_action action = LZMA_RUN;
-    uint8_t outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    strm.next_in = (uint8_t *) in.c_str();
-    strm.avail_in = in.size();
-    strm.next_out = outbuf;
-    strm.avail_out = sizeof(outbuf);
+    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
+    strm.next_in = nullptr;
+    strm.avail_in = 0;
+    strm.next_out = outbuf.data();
+    strm.avail_out = outbuf.size();
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (strm.avail_in == 0 && !eof) {
+            strm.next_in = inbuf.data();
+            try {
+                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
         if (strm.avail_in == 0)
             action = LZMA_FINISH;
 
         lzma_ret ret = lzma_code(&strm, action);
 
-        if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
-            res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out);
-            strm.next_out = outbuf;
-            strm.avail_out = sizeof(outbuf);
+        if (strm.avail_out < outbuf.size()) {
+            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
+            strm.next_out = outbuf.data();
+            strm.avail_out = outbuf.size();
         }
 
-        if (ret == LZMA_STREAM_END)
-            return res;
+        if (ret == LZMA_STREAM_END) return;
 
         if (ret != LZMA_OK)
             throw CompressionError("error %d while decompressing xz file", ret);
     }
 }
 
-static ref<std::string> decompressBzip2(const std::string & in)
+static void decompressBzip2(Source & source, Sink & sink)
 {
     bz_stream strm;
     memset(&strm, 0, sizeof(strm));
@@ -69,39 +93,50 @@ static ref<std::string> decompressBzip2(const std::string & in)
 
     Finally free([&]() { BZ2_bzDecompressEnd(&strm); });
 
-    char outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    strm.next_in = (char *) in.c_str();
-    strm.avail_in = in.size();
-    strm.next_out = outbuf;
-    strm.avail_out = sizeof(outbuf);
+    std::vector<char> inbuf(bufSize), outbuf(bufSize);
+    strm.next_in = nullptr;
+    strm.avail_in = 0;
+    strm.next_out = outbuf.data();
+    strm.avail_out = outbuf.size();
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (strm.avail_in == 0 && !eof) {
+            strm.next_in = inbuf.data();
+            try {
+                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
         int ret = BZ2_bzDecompress(&strm);
 
-        if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
-            res->append(outbuf, sizeof(outbuf) - strm.avail_out);
-            strm.next_out = outbuf;
-            strm.avail_out = sizeof(outbuf);
+        if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof)
+            throw CompressionError("bzip2 data ends prematurely");
+
+        if (strm.avail_out < outbuf.size()) {
+            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
+            strm.next_out = outbuf.data();
+            strm.avail_out = outbuf.size();
         }
 
-        if (ret == BZ_STREAM_END)
-            return res;
+        if (ret == BZ_STREAM_END) return;
 
         if (ret != BZ_OK)
             throw CompressionError("error while decompressing bzip2 file");
-
-        if (strm.avail_in == 0)
-            throw CompressionError("bzip2 data ends prematurely");
     }
 }
 
-static ref<std::string> decompressBrotli(const std::string & in)
+static void decompressBrotli(Source & source, Sink & sink)
 {
 #if !HAVE_BROTLI
-    return make_ref<std::string>(runProgram(BROTLI, true, {"-d"}, {in}));
+    RunOptions options(BROTLI, {"-d"});
+    options.stdin = &source;
+    options.stdout = &sink;
+    runProgram2(options);
 #else
     auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
     if (!s)
@@ -109,16 +144,26 @@ static ref<std::string> decompressBrotli(const std::string & in)
 
     Finally free([s]() { BrotliDecoderDestroyInstance(s); });
 
-    uint8_t outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    const uint8_t *next_in = (uint8_t *)in.c_str();
-    size_t avail_in = in.size();
-    uint8_t *next_out = outbuf;
-    size_t avail_out = sizeof(outbuf);
+    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
+    const uint8_t * next_in = nullptr;
+    size_t avail_in = 0;
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (avail_in == 0 && !eof) {
+            next_in = inbuf.data();
+            try {
+                avail_in = source.read((unsigned char *) next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
+        uint8_t * next_out = outbuf.data();
+        size_t avail_out = outbuf.size();
+
         auto ret = BrotliDecoderDecompressStream(s,
                 &avail_in, &next_in,
                 &avail_out, &next_out,
@@ -128,51 +173,49 @@ static ref<std::string> decompressBrotli(const std::string & in)
         case BROTLI_DECODER_RESULT_ERROR:
             throw CompressionError("error while decompressing brotli file");
         case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
-            throw CompressionError("incomplete or corrupt brotli file");
+            if (eof)
+                throw CompressionError("incomplete or corrupt brotli file");
+            break;
         case BROTLI_DECODER_RESULT_SUCCESS:
             if (avail_in != 0)
                 throw CompressionError("unexpected input after brotli decompression");
             break;
         case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT:
             // I'm not sure if this can happen, but abort if this happens with empty buffer
-            if (avail_out == sizeof(outbuf))
+            if (avail_out == outbuf.size())
                 throw CompressionError("brotli decompression requires larger buffer");
             break;
         }
 
         // Always ensure we have full buffer for next invocation
-        if (avail_out < sizeof(outbuf)) {
-            res->append((char*)outbuf, sizeof(outbuf) - avail_out);
-            next_out = outbuf;
-            avail_out = sizeof(outbuf);
-        }
+        if (avail_out < outbuf.size())
+            sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out);
 
-        if (ret == BROTLI_DECODER_RESULT_SUCCESS) return res;
+        if (ret == BROTLI_DECODER_RESULT_SUCCESS) return;
     }
 #endif // HAVE_BROTLI
 }
 
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
+ref<std::string> decompress(const std::string & method, const std::string & in)
 {
-    StringSink ssink;
-    auto sink = makeCompressionSink(method, ssink, parallel);
-    (*sink)(in);
-    sink->finish();
-    return ssink.s;
+    StringSource source(in);
+    StringSink sink;
+    decompress(method, source, sink);
+    return sink.s;
 }
 
-ref<std::string> decompress(const std::string & method, const std::string & in)
+void decompress(const std::string & method, Source & source, Sink & sink)
 {
     if (method == "none")
-        return make_ref<std::string>(in);
+        return decompressNone(source, sink);
     else if (method == "xz")
-        return decompressXZ(in);
+        return decompressXZ(source, sink);
     else if (method == "bzip2")
-        return decompressBzip2(in);
+        return decompressBzip2(source, sink);
     else if (method == "br")
-        return decompressBrotli(in);
+        return decompressBrotli(source, sink);
     else
-        throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
+        throw UnknownCompressionMethod("unknown compression method '%s'", method);
 }
 
 struct NoneSink : CompressionSink
@@ -499,4 +542,13 @@ ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & next
         throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
 }
 
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
+{
+    StringSink ssink;
+    auto sink = makeCompressionSink(method, ssink, parallel);
+    (*sink)(in);
+    sink->finish();
+    return ssink.s;
+}
+
 }
diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh
index a0d7530d74..f7a3e3fbd3 100644
--- a/src/libutil/compression.hh
+++ b/src/libutil/compression.hh
@@ -8,10 +8,12 @@
 
 namespace nix {
 
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
-
 ref<std::string> decompress(const std::string & method, const std::string & in);
 
+void decompress(const std::string & method, Source & source, Sink & sink);
+
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
+
 struct CompressionSink : BufferedSink
 {
     virtual void finish() = 0;
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 2ea5b6354e..103b057673 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -56,7 +56,7 @@ struct Source
     void operator () (unsigned char * data, size_t len);
 
     /* Store up to ‘len’ in the buffer pointed to by ‘data’, and
-       return the number of bytes stored.  If blocks until at least
+       return the number of bytes stored.  It blocks until at least
        one byte is available. */
     virtual size_t read(unsigned char * data, size_t len) = 0;
 
@@ -175,6 +175,22 @@ struct TeeSource : Source
 };
 
 
+/* Convert a function into a sink. */
+struct LambdaSink : Sink
+{
+    typedef std::function<void(const unsigned char *, size_t)> lambda_t;
+
+    lambda_t lambda;
+
+    LambdaSink(const lambda_t & lambda) : lambda(lambda) { }
+
+    virtual void operator () (const unsigned char * data, size_t len)
+    {
+        lambda(data, len);
+    }
+};
+
+
 void writePadding(size_t len, Sink & sink);
 void writeString(const unsigned char * buf, size_t len, Sink & sink);
 
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index a60ba8508e..37a35ab233 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -3,6 +3,7 @@
 #include "affinity.hh"
 #include "sync.hh"
 #include "finally.hh"
+#include "serialise.hh"
 
 #include <cctype>
 #include <cerrno>
@@ -568,19 +569,25 @@ void writeFull(int fd, const string & s, bool allowInterrupts)
 
 string drainFD(int fd)
 {
-    string result;
-    unsigned char buffer[4096];
+    StringSink sink;
+    drainFD(fd, sink);
+    return std::move(*sink.s);
+}
+
+
+void drainFD(int fd, Sink & sink)
+{
+    std::vector<unsigned char> buf(4096);
     while (1) {
         checkInterrupt();
-        ssize_t rd = read(fd, buffer, sizeof buffer);
+        ssize_t rd = read(fd, buf.data(), buf.size());
         if (rd == -1) {
             if (errno != EINTR)
                 throw SysError("reading from file");
         }
         else if (rd == 0) break;
-        else result.append((char *) buffer, rd);
+        else sink(buf.data(), rd);
     }
-    return result;
 }
 
 
@@ -920,20 +927,47 @@ string runProgram(Path program, bool searchPath, const Strings & args,
     return res.second;
 }
 
-std::pair<int, std::string> runProgram(const RunOptions & options)
+std::pair<int, std::string> runProgram(const RunOptions & options_)
+{
+    RunOptions options(options_);
+    StringSink sink;
+    options.stdout = &sink;
+
+    int status = 0;
+
+    try {
+        runProgram2(options);
+    } catch (ExecError & e) {
+        status = e.status;
+    }
+
+    return {status, std::move(*sink.s)};
+}
+
+void runProgram2(const RunOptions & options)
 {
     checkInterrupt();
 
+    assert(!(options.stdin && options.input));
+
+    std::unique_ptr<Source> source_;
+    Source * source = options.stdin;
+
+    if (options.input) {
+        source_ = std::make_unique<StringSource>(*options.input);
+        source = source_.get();
+    }
+
     /* Create a pipe. */
     Pipe out, in;
-    out.create();
-    if (options.input) in.create();
+    if (options.stdout) out.create();
+    if (source) in.create();
 
     /* Fork. */
     Pid pid = startProcess([&]() {
-        if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
+        if (options.stdout && dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
             throw SysError("dupping stdout");
-        if (options.input && dup2(in.readSide.get(), STDIN_FILENO) == -1)
+        if (source && dup2(in.readSide.get(), STDIN_FILENO) == -1)
             throw SysError("dupping stdin");
 
         Strings args_(options.args);
@@ -961,11 +995,20 @@ std::pair<int, std::string> runProgram(const RunOptions & options)
     });
 
 
-    if (options.input) {
+    if (source) {
         in.readSide = -1;
         writerThread = std::thread([&]() {
             try {
-                writeFull(in.writeSide.get(), *options.input);
+                std::vector<unsigned char> buf(8 * 1024);
+                while (true) {
+                    size_t n;
+                    try {
+                        n = source->read(buf.data(), buf.size());
+                    } catch (EndOfFile &) {
+                        break;
+                    }
+                    writeFull(in.writeSide.get(), buf.data(), n);
+                }
                 promise.set_value();
             } catch (...) {
                 promise.set_exception(std::current_exception());
@@ -974,15 +1017,17 @@ std::pair<int, std::string> runProgram(const RunOptions & options)
         });
     }
 
-    string result = drainFD(out.readSide.get());
+    if (options.stdout)
+        drainFD(out.readSide.get(), *options.stdout);
 
     /* Wait for the child to finish. */
     int status = pid.wait();
 
     /* Wait for the writer thread to finish. */
-    if (options.input) promise.get_future().get();
+    if (source) promise.get_future().get();
 
-    return {status, result};
+    if (status)
+        throw ExecError(status, fmt("program '%1%' %2%", options.program, statusToString(status)));
 }
 
 
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index 500ab7811b..1ea1027ac0 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -25,6 +25,9 @@
 
 namespace nix {
 
+struct Sink;
+struct Source;
+
 
 /* Return an environment variable. */
 string getEnv(const string & key, const string & def = "");
@@ -150,6 +153,7 @@ MakeError(EndOfFile, Error)
 /* Read a file descriptor until EOF occurs. */
 string drainFD(int fd);
 
+void drainFD(int fd, Sink & sink);
 
 
 /* Automatic cleanup of resources. */
@@ -256,6 +260,8 @@ struct RunOptions
     bool searchPath = true;
     Strings args;
     std::experimental::optional<std::string> input;
+    Source * stdin = nullptr;
+    Sink * stdout = nullptr;
     bool _killStderr = false;
 
     RunOptions(const Path & program, const Strings & args)
@@ -266,6 +272,8 @@ struct RunOptions
 
 std::pair<int, std::string> runProgram(const RunOptions & options);
 
+void runProgram2(const RunOptions & options);
+
 
 class ExecError : public Error
 {