From 3e6b194d78024373c2320f31f4ba0de3d0658b83 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Fri, 16 Mar 2018 16:59:31 +0100 Subject: decompress(): Use a Source and Sink This allows decompression to happen in O(1) memory. --- src/libstore/binary-cache-store.cc | 20 ++--- src/libutil/compression.cc | 166 ++++++++++++++++++++++++------------- src/libutil/compression.hh | 6 +- src/libutil/serialise.hh | 18 +++- src/libutil/util.cc | 75 +++++++++++++---- src/libutil/util.hh | 8 ++ 6 files changed, 206 insertions(+), 87 deletions(-) diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index d1b278b8ef..2e9a13e564 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -203,22 +203,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) stats.narRead++; stats.narReadCompressedBytes += nar->size(); - /* Decompress the NAR. FIXME: would be nice to have the remote - side do this. */ - try { - nar = decompress(info->compression, *nar); - } catch (UnknownCompressionMethod &) { - throw Error(format("binary cache path '%s' uses unknown compression method '%s'") - % storePath % info->compression); - } + uint64_t narSize = 0; - stats.narReadBytes += nar->size(); + StringSource source(*nar); - printMsg(lvlTalkative, format("exporting path '%1%' (%2% bytes)") % storePath % nar->size()); + LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { + sink(data, len); + narSize += len; + }); - assert(nar->size() % 8 == 0); + decompress(info->compression, source, wrapperSink); - sink((unsigned char *) nar->c_str(), nar->size()); + stats.narReadBytes += narSize; } void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc index 470c925ed7..e7fedcbdc5 100644 --- a/src/libutil/compression.cc +++ b/src/libutil/compression.cc @@ -17,7 +17,23 @@ namespace nix { -static ref decompressXZ(const std::string & in) +static const size_t bufSize = 32 * 1024; + +static void decompressNone(Source & source, Sink & sink) +{ + std::vector buf(bufSize); + while (true) { + size_t n; + try { + n = source.read(buf.data(), buf.size()); + } catch (EndOfFile &) { + break; + } + sink(buf.data(), n); + } +} + +static void decompressXZ(Source & source, Sink & sink) { lzma_stream strm(LZMA_STREAM_INIT); @@ -29,36 +45,44 @@ static ref decompressXZ(const std::string & in) Finally free([&]() { lzma_end(&strm); }); lzma_action action = LZMA_RUN; - uint8_t outbuf[BUFSIZ]; - ref res = make_ref(); - strm.next_in = (uint8_t *) in.c_str(); - strm.avail_in = in.size(); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + std::vector inbuf(bufSize), outbuf(bufSize); + strm.next_in = nullptr; + strm.avail_in = 0; + strm.next_out = outbuf.data(); + strm.avail_out = outbuf.size(); + bool eof = false; while (true) { checkInterrupt(); + if (strm.avail_in == 0 && !eof) { + strm.next_in = inbuf.data(); + try { + strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size()); + } catch (EndOfFile &) { + eof = true; + } + } + if (strm.avail_in == 0) action = LZMA_FINISH; lzma_ret ret = lzma_code(&strm, action); - if (strm.avail_out == 0 || ret == LZMA_STREAM_END) { - res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + if (strm.avail_out < outbuf.size()) { + sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out); + strm.next_out = outbuf.data(); + strm.avail_out = outbuf.size(); } - if (ret == LZMA_STREAM_END) - return res; + if (ret == LZMA_STREAM_END) return; if (ret != LZMA_OK) throw CompressionError("error %d while decompressing xz file", ret); } } -static ref decompressBzip2(const std::string & in) +static void decompressBzip2(Source & source, Sink & sink) { bz_stream strm; memset(&strm, 0, sizeof(strm)); @@ -69,39 +93,50 @@ static ref decompressBzip2(const std::string & in) Finally free([&]() { BZ2_bzDecompressEnd(&strm); }); - char outbuf[BUFSIZ]; - ref res = make_ref(); - strm.next_in = (char *) in.c_str(); - strm.avail_in = in.size(); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + std::vector inbuf(bufSize), outbuf(bufSize); + strm.next_in = nullptr; + strm.avail_in = 0; + strm.next_out = outbuf.data(); + strm.avail_out = outbuf.size(); + bool eof = false; while (true) { checkInterrupt(); + if (strm.avail_in == 0 && !eof) { + strm.next_in = inbuf.data(); + try { + strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size()); + } catch (EndOfFile &) { + eof = true; + } + } + int ret = BZ2_bzDecompress(&strm); - if (strm.avail_out == 0 || ret == BZ_STREAM_END) { - res->append(outbuf, sizeof(outbuf) - strm.avail_out); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof) + throw CompressionError("bzip2 data ends prematurely"); + + if (strm.avail_out < outbuf.size()) { + sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out); + strm.next_out = outbuf.data(); + strm.avail_out = outbuf.size(); } - if (ret == BZ_STREAM_END) - return res; + if (ret == BZ_STREAM_END) return; if (ret != BZ_OK) throw CompressionError("error while decompressing bzip2 file"); - - if (strm.avail_in == 0) - throw CompressionError("bzip2 data ends prematurely"); } } -static ref decompressBrotli(const std::string & in) +static void decompressBrotli(Source & source, Sink & sink) { #if !HAVE_BROTLI - return make_ref(runProgram(BROTLI, true, {"-d"}, {in})); + RunOptions options(BROTLI, {"-d"}); + options.stdin = &source; + options.stdout = &sink; + runProgram2(options); #else auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr); if (!s) @@ -109,16 +144,26 @@ static ref decompressBrotli(const std::string & in) Finally free([s]() { BrotliDecoderDestroyInstance(s); }); - uint8_t outbuf[BUFSIZ]; - ref res = make_ref(); - const uint8_t *next_in = (uint8_t *)in.c_str(); - size_t avail_in = in.size(); - uint8_t *next_out = outbuf; - size_t avail_out = sizeof(outbuf); + std::vector inbuf(bufSize), outbuf(bufSize); + const uint8_t * next_in = nullptr; + size_t avail_in = 0; + bool eof = false; while (true) { checkInterrupt(); + if (avail_in == 0 && !eof) { + next_in = inbuf.data(); + try { + avail_in = source.read((unsigned char *) next_in, inbuf.size()); + } catch (EndOfFile &) { + eof = true; + } + } + + uint8_t * next_out = outbuf.data(); + size_t avail_out = outbuf.size(); + auto ret = BrotliDecoderDecompressStream(s, &avail_in, &next_in, &avail_out, &next_out, @@ -128,51 +173,49 @@ static ref decompressBrotli(const std::string & in) case BROTLI_DECODER_RESULT_ERROR: throw CompressionError("error while decompressing brotli file"); case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT: - throw CompressionError("incomplete or corrupt brotli file"); + if (eof) + throw CompressionError("incomplete or corrupt brotli file"); + break; case BROTLI_DECODER_RESULT_SUCCESS: if (avail_in != 0) throw CompressionError("unexpected input after brotli decompression"); break; case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT: // I'm not sure if this can happen, but abort if this happens with empty buffer - if (avail_out == sizeof(outbuf)) + if (avail_out == outbuf.size()) throw CompressionError("brotli decompression requires larger buffer"); break; } // Always ensure we have full buffer for next invocation - if (avail_out < sizeof(outbuf)) { - res->append((char*)outbuf, sizeof(outbuf) - avail_out); - next_out = outbuf; - avail_out = sizeof(outbuf); - } + if (avail_out < outbuf.size()) + sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out); - if (ret == BROTLI_DECODER_RESULT_SUCCESS) return res; + if (ret == BROTLI_DECODER_RESULT_SUCCESS) return; } #endif // HAVE_BROTLI } -ref compress(const std::string & method, const std::string & in, const bool parallel) +ref decompress(const std::string & method, const std::string & in) { - StringSink ssink; - auto sink = makeCompressionSink(method, ssink, parallel); - (*sink)(in); - sink->finish(); - return ssink.s; + StringSource source(in); + StringSink sink; + decompress(method, source, sink); + return sink.s; } -ref decompress(const std::string & method, const std::string & in) +void decompress(const std::string & method, Source & source, Sink & sink) { if (method == "none") - return make_ref(in); + return decompressNone(source, sink); else if (method == "xz") - return decompressXZ(in); + return decompressXZ(source, sink); else if (method == "bzip2") - return decompressBzip2(in); + return decompressBzip2(source, sink); else if (method == "br") - return decompressBrotli(in); + return decompressBrotli(source, sink); else - throw UnknownCompressionMethod(format("unknown compression method '%s'") % method); + throw UnknownCompressionMethod("unknown compression method '%s'", method); } struct NoneSink : CompressionSink @@ -499,4 +542,13 @@ ref makeCompressionSink(const std::string & method, Sink & next throw UnknownCompressionMethod(format("unknown compression method '%s'") % method); } +ref compress(const std::string & method, const std::string & in, const bool parallel) +{ + StringSink ssink; + auto sink = makeCompressionSink(method, ssink, parallel); + (*sink)(in); + sink->finish(); + return ssink.s; +} + } diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh index a0d7530d74..f7a3e3fbd3 100644 --- a/src/libutil/compression.hh +++ b/src/libutil/compression.hh @@ -8,10 +8,12 @@ namespace nix { -ref compress(const std::string & method, const std::string & in, const bool parallel = false); - ref decompress(const std::string & method, const std::string & in); +void decompress(const std::string & method, Source & source, Sink & sink); + +ref compress(const std::string & method, const std::string & in, const bool parallel = false); + struct CompressionSink : BufferedSink { virtual void finish() = 0; diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 2ea5b6354e..103b057673 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -56,7 +56,7 @@ struct Source void operator () (unsigned char * data, size_t len); /* Store up to ‘len’ in the buffer pointed to by ‘data’, and - return the number of bytes stored. If blocks until at least + return the number of bytes stored. It blocks until at least one byte is available. */ virtual size_t read(unsigned char * data, size_t len) = 0; @@ -175,6 +175,22 @@ struct TeeSource : Source }; +/* Convert a function into a sink. */ +struct LambdaSink : Sink +{ + typedef std::function lambda_t; + + lambda_t lambda; + + LambdaSink(const lambda_t & lambda) : lambda(lambda) { } + + virtual void operator () (const unsigned char * data, size_t len) + { + lambda(data, len); + } +}; + + void writePadding(size_t len, Sink & sink); void writeString(const unsigned char * buf, size_t len, Sink & sink); diff --git a/src/libutil/util.cc b/src/libutil/util.cc index a60ba8508e..37a35ab233 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -3,6 +3,7 @@ #include "affinity.hh" #include "sync.hh" #include "finally.hh" +#include "serialise.hh" #include #include @@ -568,19 +569,25 @@ void writeFull(int fd, const string & s, bool allowInterrupts) string drainFD(int fd) { - string result; - unsigned char buffer[4096]; + StringSink sink; + drainFD(fd, sink); + return std::move(*sink.s); +} + + +void drainFD(int fd, Sink & sink) +{ + std::vector buf(4096); while (1) { checkInterrupt(); - ssize_t rd = read(fd, buffer, sizeof buffer); + ssize_t rd = read(fd, buf.data(), buf.size()); if (rd == -1) { if (errno != EINTR) throw SysError("reading from file"); } else if (rd == 0) break; - else result.append((char *) buffer, rd); + else sink(buf.data(), rd); } - return result; } @@ -920,20 +927,47 @@ string runProgram(Path program, bool searchPath, const Strings & args, return res.second; } -std::pair runProgram(const RunOptions & options) +std::pair runProgram(const RunOptions & options_) +{ + RunOptions options(options_); + StringSink sink; + options.stdout = &sink; + + int status = 0; + + try { + runProgram2(options); + } catch (ExecError & e) { + status = e.status; + } + + return {status, std::move(*sink.s)}; +} + +void runProgram2(const RunOptions & options) { checkInterrupt(); + assert(!(options.stdin && options.input)); + + std::unique_ptr source_; + Source * source = options.stdin; + + if (options.input) { + source_ = std::make_unique(*options.input); + source = source_.get(); + } + /* Create a pipe. */ Pipe out, in; - out.create(); - if (options.input) in.create(); + if (options.stdout) out.create(); + if (source) in.create(); /* Fork. */ Pid pid = startProcess([&]() { - if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1) + if (options.stdout && dup2(out.writeSide.get(), STDOUT_FILENO) == -1) throw SysError("dupping stdout"); - if (options.input && dup2(in.readSide.get(), STDIN_FILENO) == -1) + if (source && dup2(in.readSide.get(), STDIN_FILENO) == -1) throw SysError("dupping stdin"); Strings args_(options.args); @@ -961,11 +995,20 @@ std::pair runProgram(const RunOptions & options) }); - if (options.input) { + if (source) { in.readSide = -1; writerThread = std::thread([&]() { try { - writeFull(in.writeSide.get(), *options.input); + std::vector buf(8 * 1024); + while (true) { + size_t n; + try { + n = source->read(buf.data(), buf.size()); + } catch (EndOfFile &) { + break; + } + writeFull(in.writeSide.get(), buf.data(), n); + } promise.set_value(); } catch (...) { promise.set_exception(std::current_exception()); @@ -974,15 +1017,17 @@ std::pair runProgram(const RunOptions & options) }); } - string result = drainFD(out.readSide.get()); + if (options.stdout) + drainFD(out.readSide.get(), *options.stdout); /* Wait for the child to finish. */ int status = pid.wait(); /* Wait for the writer thread to finish. */ - if (options.input) promise.get_future().get(); + if (source) promise.get_future().get(); - return {status, result}; + if (status) + throw ExecError(status, fmt("program '%1%' %2%", options.program, statusToString(status))); } diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 500ab7811b..1ea1027ac0 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -25,6 +25,9 @@ namespace nix { +struct Sink; +struct Source; + /* Return an environment variable. */ string getEnv(const string & key, const string & def = ""); @@ -150,6 +153,7 @@ MakeError(EndOfFile, Error) /* Read a file descriptor until EOF occurs. */ string drainFD(int fd); +void drainFD(int fd, Sink & sink); /* Automatic cleanup of resources. */ @@ -256,6 +260,8 @@ struct RunOptions bool searchPath = true; Strings args; std::experimental::optional input; + Source * stdin = nullptr; + Sink * stdout = nullptr; bool _killStderr = false; RunOptions(const Path & program, const Strings & args) @@ -266,6 +272,8 @@ struct RunOptions std::pair runProgram(const RunOptions & options); +void runProgram2(const RunOptions & options); + class ExecError : public Error { -- cgit 1.4.1