From 3e6b194d78024373c2320f31f4ba0de3d0658b83 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Fri, 16 Mar 2018 16:59:31 +0100 Subject: decompress(): Use a Source and Sink This allows decompression to happen in O(1) memory. --- src/libutil/compression.cc | 166 +++++++++++++++++++++++++++++---------------- 1 file changed, 109 insertions(+), 57 deletions(-) (limited to 'src/libutil/compression.cc') diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc index 470c925ed7a6..e7fedcbdc52e 100644 --- a/src/libutil/compression.cc +++ b/src/libutil/compression.cc @@ -17,7 +17,23 @@ namespace nix { -static ref decompressXZ(const std::string & in) +static const size_t bufSize = 32 * 1024; + +static void decompressNone(Source & source, Sink & sink) +{ + std::vector buf(bufSize); + while (true) { + size_t n; + try { + n = source.read(buf.data(), buf.size()); + } catch (EndOfFile &) { + break; + } + sink(buf.data(), n); + } +} + +static void decompressXZ(Source & source, Sink & sink) { lzma_stream strm(LZMA_STREAM_INIT); @@ -29,36 +45,44 @@ static ref decompressXZ(const std::string & in) Finally free([&]() { lzma_end(&strm); }); lzma_action action = LZMA_RUN; - uint8_t outbuf[BUFSIZ]; - ref res = make_ref(); - strm.next_in = (uint8_t *) in.c_str(); - strm.avail_in = in.size(); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + std::vector inbuf(bufSize), outbuf(bufSize); + strm.next_in = nullptr; + strm.avail_in = 0; + strm.next_out = outbuf.data(); + strm.avail_out = outbuf.size(); + bool eof = false; while (true) { checkInterrupt(); + if (strm.avail_in == 0 && !eof) { + strm.next_in = inbuf.data(); + try { + strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size()); + } catch (EndOfFile &) { + eof = true; + } + } + if (strm.avail_in == 0) action = LZMA_FINISH; lzma_ret ret = lzma_code(&strm, action); - if (strm.avail_out == 0 || ret == LZMA_STREAM_END) { - res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + if (strm.avail_out < outbuf.size()) { + sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out); + strm.next_out = outbuf.data(); + strm.avail_out = outbuf.size(); } - if (ret == LZMA_STREAM_END) - return res; + if (ret == LZMA_STREAM_END) return; if (ret != LZMA_OK) throw CompressionError("error %d while decompressing xz file", ret); } } -static ref decompressBzip2(const std::string & in) +static void decompressBzip2(Source & source, Sink & sink) { bz_stream strm; memset(&strm, 0, sizeof(strm)); @@ -69,39 +93,50 @@ static ref decompressBzip2(const std::string & in) Finally free([&]() { BZ2_bzDecompressEnd(&strm); }); - char outbuf[BUFSIZ]; - ref res = make_ref(); - strm.next_in = (char *) in.c_str(); - strm.avail_in = in.size(); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + std::vector inbuf(bufSize), outbuf(bufSize); + strm.next_in = nullptr; + strm.avail_in = 0; + strm.next_out = outbuf.data(); + strm.avail_out = outbuf.size(); + bool eof = false; while (true) { checkInterrupt(); + if (strm.avail_in == 0 && !eof) { + strm.next_in = inbuf.data(); + try { + strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size()); + } catch (EndOfFile &) { + eof = true; + } + } + int ret = BZ2_bzDecompress(&strm); - if (strm.avail_out == 0 || ret == BZ_STREAM_END) { - res->append(outbuf, sizeof(outbuf) - strm.avail_out); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof) + throw CompressionError("bzip2 data ends prematurely"); + + if (strm.avail_out < outbuf.size()) { + sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out); + strm.next_out = outbuf.data(); + strm.avail_out = outbuf.size(); } - if (ret == BZ_STREAM_END) - return res; + if (ret == BZ_STREAM_END) return; if (ret != BZ_OK) throw CompressionError("error while decompressing bzip2 file"); - - if (strm.avail_in == 0) - throw CompressionError("bzip2 data ends prematurely"); } } -static ref decompressBrotli(const std::string & in) +static void decompressBrotli(Source & source, Sink & sink) { #if !HAVE_BROTLI - return make_ref(runProgram(BROTLI, true, {"-d"}, {in})); + RunOptions options(BROTLI, {"-d"}); + options.stdin = &source; + options.stdout = &sink; + runProgram2(options); #else auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr); if (!s) @@ -109,16 +144,26 @@ static ref decompressBrotli(const std::string & in) Finally free([s]() { BrotliDecoderDestroyInstance(s); }); - uint8_t outbuf[BUFSIZ]; - ref res = make_ref(); - const uint8_t *next_in = (uint8_t *)in.c_str(); - size_t avail_in = in.size(); - uint8_t *next_out = outbuf; - size_t avail_out = sizeof(outbuf); + std::vector inbuf(bufSize), outbuf(bufSize); + const uint8_t * next_in = nullptr; + size_t avail_in = 0; + bool eof = false; while (true) { checkInterrupt(); + if (avail_in == 0 && !eof) { + next_in = inbuf.data(); + try { + avail_in = source.read((unsigned char *) next_in, inbuf.size()); + } catch (EndOfFile &) { + eof = true; + } + } + + uint8_t * next_out = outbuf.data(); + size_t avail_out = outbuf.size(); + auto ret = BrotliDecoderDecompressStream(s, &avail_in, &next_in, &avail_out, &next_out, @@ -128,51 +173,49 @@ static ref decompressBrotli(const std::string & in) case BROTLI_DECODER_RESULT_ERROR: throw CompressionError("error while decompressing brotli file"); case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT: - throw CompressionError("incomplete or corrupt brotli file"); + if (eof) + throw CompressionError("incomplete or corrupt brotli file"); + break; case BROTLI_DECODER_RESULT_SUCCESS: if (avail_in != 0) throw CompressionError("unexpected input after brotli decompression"); break; case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT: // I'm not sure if this can happen, but abort if this happens with empty buffer - if (avail_out == sizeof(outbuf)) + if (avail_out == outbuf.size()) throw CompressionError("brotli decompression requires larger buffer"); break; } // Always ensure we have full buffer for next invocation - if (avail_out < sizeof(outbuf)) { - res->append((char*)outbuf, sizeof(outbuf) - avail_out); - next_out = outbuf; - avail_out = sizeof(outbuf); - } + if (avail_out < outbuf.size()) + sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out); - if (ret == BROTLI_DECODER_RESULT_SUCCESS) return res; + if (ret == BROTLI_DECODER_RESULT_SUCCESS) return; } #endif // HAVE_BROTLI } -ref compress(const std::string & method, const std::string & in, const bool parallel) +ref decompress(const std::string & method, const std::string & in) { - StringSink ssink; - auto sink = makeCompressionSink(method, ssink, parallel); - (*sink)(in); - sink->finish(); - return ssink.s; + StringSource source(in); + StringSink sink; + decompress(method, source, sink); + return sink.s; } -ref decompress(const std::string & method, const std::string & in) +void decompress(const std::string & method, Source & source, Sink & sink) { if (method == "none") - return make_ref(in); + return decompressNone(source, sink); else if (method == "xz") - return decompressXZ(in); + return decompressXZ(source, sink); else if (method == "bzip2") - return decompressBzip2(in); + return decompressBzip2(source, sink); else if (method == "br") - return decompressBrotli(in); + return decompressBrotli(source, sink); else - throw UnknownCompressionMethod(format("unknown compression method '%s'") % method); + throw UnknownCompressionMethod("unknown compression method '%s'", method); } struct NoneSink : CompressionSink @@ -499,4 +542,13 @@ ref makeCompressionSink(const std::string & method, Sink & next throw UnknownCompressionMethod(format("unknown compression method '%s'") % method); } +ref compress(const std::string & method, const std::string & in, const bool parallel) +{ + StringSink ssink; + auto sink = makeCompressionSink(method, ssink, parallel); + (*sink)(in); + sink->finish(); + return ssink.s; +} + } -- cgit 1.4.1