about summary refs log tree commit diff
path: root/src/libutil/compression.cc
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2018-03-16T15·59+0100
committerEelco Dolstra <edolstra@gmail.com>2018-03-16T19·35+0100
commit3e6b194d78024373c2320f31f4ba0de3d0658b83 (patch)
tree76576868d1a0b728f1905e9df3ba3429f3deafe8 /src/libutil/compression.cc
parent64441f055194e9fd9e924a58e67cadb030ba918f (diff)
decompress(): Use a Source and Sink
This allows decompression to happen in O(1) memory.
Diffstat (limited to 'src/libutil/compression.cc')
-rw-r--r--src/libutil/compression.cc166
1 files changed, 109 insertions, 57 deletions
diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc
index 470c925ed7..e7fedcbdc5 100644
--- a/src/libutil/compression.cc
+++ b/src/libutil/compression.cc
@@ -17,7 +17,23 @@
 
 namespace nix {
 
-static ref<std::string> decompressXZ(const std::string & in)
+static const size_t bufSize = 32 * 1024;
+
+static void decompressNone(Source & source, Sink & sink)
+{
+    std::vector<unsigned char> buf(bufSize);
+    while (true) {
+        size_t n;
+        try {
+            n = source.read(buf.data(), buf.size());
+        } catch (EndOfFile &) {
+            break;
+        }
+        sink(buf.data(), n);
+    }
+}
+
+static void decompressXZ(Source & source, Sink & sink)
 {
     lzma_stream strm(LZMA_STREAM_INIT);
 
@@ -29,36 +45,44 @@ static ref<std::string> decompressXZ(const std::string & in)
     Finally free([&]() { lzma_end(&strm); });
 
     lzma_action action = LZMA_RUN;
-    uint8_t outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    strm.next_in = (uint8_t *) in.c_str();
-    strm.avail_in = in.size();
-    strm.next_out = outbuf;
-    strm.avail_out = sizeof(outbuf);
+    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
+    strm.next_in = nullptr;
+    strm.avail_in = 0;
+    strm.next_out = outbuf.data();
+    strm.avail_out = outbuf.size();
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (strm.avail_in == 0 && !eof) {
+            strm.next_in = inbuf.data();
+            try {
+                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
         if (strm.avail_in == 0)
             action = LZMA_FINISH;
 
         lzma_ret ret = lzma_code(&strm, action);
 
-        if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
-            res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out);
-            strm.next_out = outbuf;
-            strm.avail_out = sizeof(outbuf);
+        if (strm.avail_out < outbuf.size()) {
+            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
+            strm.next_out = outbuf.data();
+            strm.avail_out = outbuf.size();
         }
 
-        if (ret == LZMA_STREAM_END)
-            return res;
+        if (ret == LZMA_STREAM_END) return;
 
         if (ret != LZMA_OK)
             throw CompressionError("error %d while decompressing xz file", ret);
     }
 }
 
-static ref<std::string> decompressBzip2(const std::string & in)
+static void decompressBzip2(Source & source, Sink & sink)
 {
     bz_stream strm;
     memset(&strm, 0, sizeof(strm));
@@ -69,39 +93,50 @@ static ref<std::string> decompressBzip2(const std::string & in)
 
     Finally free([&]() { BZ2_bzDecompressEnd(&strm); });
 
-    char outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    strm.next_in = (char *) in.c_str();
-    strm.avail_in = in.size();
-    strm.next_out = outbuf;
-    strm.avail_out = sizeof(outbuf);
+    std::vector<char> inbuf(bufSize), outbuf(bufSize);
+    strm.next_in = nullptr;
+    strm.avail_in = 0;
+    strm.next_out = outbuf.data();
+    strm.avail_out = outbuf.size();
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (strm.avail_in == 0 && !eof) {
+            strm.next_in = inbuf.data();
+            try {
+                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
         int ret = BZ2_bzDecompress(&strm);
 
-        if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
-            res->append(outbuf, sizeof(outbuf) - strm.avail_out);
-            strm.next_out = outbuf;
-            strm.avail_out = sizeof(outbuf);
+        if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof)
+            throw CompressionError("bzip2 data ends prematurely");
+
+        if (strm.avail_out < outbuf.size()) {
+            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
+            strm.next_out = outbuf.data();
+            strm.avail_out = outbuf.size();
         }
 
-        if (ret == BZ_STREAM_END)
-            return res;
+        if (ret == BZ_STREAM_END) return;
 
         if (ret != BZ_OK)
             throw CompressionError("error while decompressing bzip2 file");
-
-        if (strm.avail_in == 0)
-            throw CompressionError("bzip2 data ends prematurely");
     }
 }
 
-static ref<std::string> decompressBrotli(const std::string & in)
+static void decompressBrotli(Source & source, Sink & sink)
 {
 #if !HAVE_BROTLI
-    return make_ref<std::string>(runProgram(BROTLI, true, {"-d"}, {in}));
+    RunOptions options(BROTLI, {"-d"});
+    options.stdin = &source;
+    options.stdout = &sink;
+    runProgram2(options);
 #else
     auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
     if (!s)
@@ -109,16 +144,26 @@ static ref<std::string> decompressBrotli(const std::string & in)
 
     Finally free([s]() { BrotliDecoderDestroyInstance(s); });
 
-    uint8_t outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    const uint8_t *next_in = (uint8_t *)in.c_str();
-    size_t avail_in = in.size();
-    uint8_t *next_out = outbuf;
-    size_t avail_out = sizeof(outbuf);
+    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
+    const uint8_t * next_in = nullptr;
+    size_t avail_in = 0;
+    bool eof = false;
 
     while (true) {
         checkInterrupt();
 
+        if (avail_in == 0 && !eof) {
+            next_in = inbuf.data();
+            try {
+                avail_in = source.read((unsigned char *) next_in, inbuf.size());
+            } catch (EndOfFile &) {
+                eof = true;
+            }
+        }
+
+        uint8_t * next_out = outbuf.data();
+        size_t avail_out = outbuf.size();
+
         auto ret = BrotliDecoderDecompressStream(s,
                 &avail_in, &next_in,
                 &avail_out, &next_out,
@@ -128,51 +173,49 @@ static ref<std::string> decompressBrotli(const std::string & in)
         case BROTLI_DECODER_RESULT_ERROR:
             throw CompressionError("error while decompressing brotli file");
         case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
-            throw CompressionError("incomplete or corrupt brotli file");
+            if (eof)
+                throw CompressionError("incomplete or corrupt brotli file");
+            break;
         case BROTLI_DECODER_RESULT_SUCCESS:
             if (avail_in != 0)
                 throw CompressionError("unexpected input after brotli decompression");
             break;
         case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT:
             // I'm not sure if this can happen, but abort if this happens with empty buffer
-            if (avail_out == sizeof(outbuf))
+            if (avail_out == outbuf.size())
                 throw CompressionError("brotli decompression requires larger buffer");
             break;
         }
 
         // Always ensure we have full buffer for next invocation
-        if (avail_out < sizeof(outbuf)) {
-            res->append((char*)outbuf, sizeof(outbuf) - avail_out);
-            next_out = outbuf;
-            avail_out = sizeof(outbuf);
-        }
+        if (avail_out < outbuf.size())
+            sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out);
 
-        if (ret == BROTLI_DECODER_RESULT_SUCCESS) return res;
+        if (ret == BROTLI_DECODER_RESULT_SUCCESS) return;
     }
 #endif // HAVE_BROTLI
 }
 
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
+ref<std::string> decompress(const std::string & method, const std::string & in)
 {
-    StringSink ssink;
-    auto sink = makeCompressionSink(method, ssink, parallel);
-    (*sink)(in);
-    sink->finish();
-    return ssink.s;
+    StringSource source(in);
+    StringSink sink;
+    decompress(method, source, sink);
+    return sink.s;
 }
 
-ref<std::string> decompress(const std::string & method, const std::string & in)
+void decompress(const std::string & method, Source & source, Sink & sink)
 {
     if (method == "none")
-        return make_ref<std::string>(in);
+        return decompressNone(source, sink);
     else if (method == "xz")
-        return decompressXZ(in);
+        return decompressXZ(source, sink);
     else if (method == "bzip2")
-        return decompressBzip2(in);
+        return decompressBzip2(source, sink);
     else if (method == "br")
-        return decompressBrotli(in);
+        return decompressBrotli(source, sink);
     else
-        throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
+        throw UnknownCompressionMethod("unknown compression method '%s'", method);
 }
 
 struct NoneSink : CompressionSink
@@ -499,4 +542,13 @@ ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & next
         throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
 }
 
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
+{
+    StringSink ssink;
+    auto sink = makeCompressionSink(method, ssink, parallel);
+    (*sink)(in);
+    sink->finish();
+    return ssink.s;
+}
+
 }