From 0d4a10e910e1916a6f36cb9af6b68817172b51a9 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 4 May 2016 15:46:25 +0200 Subject: Do compression in a sink --- src/libstore/build.cc | 61 ++++------ src/libstore/remote-store.cc | 2 +- src/libutil/compression.cc | 260 ++++++++++++++++++++++++++++--------------- src/libutil/compression.hh | 8 ++ src/libutil/serialise.hh | 10 ++ src/nix-store/nix-store.cc | 2 - 6 files changed, 206 insertions(+), 137 deletions(-) diff --git a/src/libstore/build.cc b/src/libstore/build.cc index 3b9ecab1c12a..e6ec7886f3cd 100644 --- a/src/libstore/build.cc +++ b/src/libstore/build.cc @@ -9,6 +9,7 @@ #include "affinity.hh" #include "builtins.hh" #include "finally.hh" +#include "compression.hh" #include #include @@ -29,14 +30,11 @@ #include #include #include -#include #include #include #include -#include - /* chroot-like behavior from Apple's sandbox */ #if __APPLE__ #define DEFAULT_ALLOWED_IMPURE_PREFIXES "/System/Library /usr/lib /dev /bin/sh" @@ -741,9 +739,8 @@ private: Path tmpDirInSandbox; /* File descriptor for the log file. */ - FILE * fLogFile = 0; - BZFILE * bzLogFile = 0; AutoCloseFD fdLogFile; + std::shared_ptr logFileSink, logSink; /* Number of bytes received from the builder's stdout/stderr. */ unsigned long logSize; @@ -2854,46 +2851,31 @@ Path DerivationGoal::openLogFile() Path dir = (format("%1%/%2%/%3%/") % settings.nixLogDir % drvsLogDir % string(baseName, 0, 2)).str(); createDirs(dir); - if (settings.compressLog) { - - Path logFileName = (format("%1%/%2%.bz2") % dir % string(baseName, 2)).str(); - AutoCloseFD fd = open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0666); - if (fd == -1) throw SysError(format("creating log file ‘%1%’") % logFileName); - closeOnExec(fd); + Path logFileName = (format("%1%/%2%%3%") + % dir + % string(baseName, 2) + % (settings.compressLog ? ".bz2" : "")).str(); - if (!(fLogFile = fdopen(fd.borrow(), "w"))) - throw SysError(format("opening file ‘%1%’") % logFileName); + fdLogFile = open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC, 0666); + if (fdLogFile == -1) throw SysError(format("creating log file ‘%1%’") % logFileName); - int err; - if (!(bzLogFile = BZ2_bzWriteOpen(&err, fLogFile, 9, 0, 0))) - throw Error(format("cannot open compressed log file ‘%1%’") % logFileName); + logFileSink = std::make_shared(fdLogFile); - return logFileName; + if (settings.compressLog) + logSink = std::shared_ptr(makeCompressionSink("bzip2", *logFileSink)); + else + logSink = logFileSink; - } else { - Path logFileName = (format("%1%/%2%") % dir % string(baseName, 2)).str(); - fdLogFile = open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0666); - if (fdLogFile == -1) throw SysError(format("creating log file ‘%1%’") % logFileName); - closeOnExec(fdLogFile); - return logFileName; - } + return logFileName; } void DerivationGoal::closeLogFile() { - if (bzLogFile) { - int err; - BZ2_bzWriteClose(&err, bzLogFile, 0, 0, 0); - bzLogFile = 0; - if (err != BZ_OK) throw Error(format("cannot close compressed log file (BZip2 error = %1%)") % err); - } - - if (fLogFile) { - fclose(fLogFile); - fLogFile = 0; - } - + auto logSink2 = std::dynamic_pointer_cast(logSink); + if (logSink2) logSink2->finish(); + if (logFileSink) logFileSink->flush(); + logSink = logFileSink = 0; fdLogFile.close(); } @@ -2940,12 +2922,7 @@ void DerivationGoal::handleChildOutput(int fd, const string & data) currentLogLine[currentLogLinePos++] = c; } - if (bzLogFile) { - int err; - BZ2_bzWrite(&err, bzLogFile, (unsigned char *) data.data(), data.size()); - if (err != BZ_OK) throw Error(format("cannot write to compressed log file (BZip2 error = %1%)") % err); - } else if (fdLogFile != -1) - writeFull(fdLogFile, data); + if (logSink) (*logSink)(data); } if (hook && fd == hook->fromHook.readSide) diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 1a6afe46bb01..9a00a6ed9910 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -541,7 +541,7 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source) if (msg == STDERR_WRITE) { string s = readString(from); if (!sink) throw Error("no sink"); - (*sink)((const unsigned char *) s.data(), s.size()); + (*sink)(s); } else if (msg == STDERR_READ) { if (!source) throw Error("no source"); diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc index cd2cc9cc10fa..a3bbb5170d9f 100644 --- a/src/libutil/compression.cc +++ b/src/libutil/compression.cc @@ -7,50 +7,9 @@ #include #include -namespace nix { - -static ref compressXZ(const std::string & in) -{ - lzma_stream strm(LZMA_STREAM_INIT); - - // FIXME: apply the x86 BCJ filter? - - lzma_ret ret = lzma_easy_encoder( - &strm, 6, LZMA_CHECK_CRC64); - if (ret != LZMA_OK) - throw Error("unable to initialise lzma encoder"); - - Finally free([&]() { lzma_end(&strm); }); - - lzma_action action = LZMA_RUN; - uint8_t outbuf[BUFSIZ]; - ref res = make_ref(); - strm.next_in = (uint8_t *) in.c_str(); - strm.avail_in = in.size(); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - - while (true) { - checkInterrupt(); - - if (strm.avail_in == 0) - action = LZMA_FINISH; - - lzma_ret ret = lzma_code(&strm, action); - - if (strm.avail_out == 0 || ret == LZMA_STREAM_END) { - res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - - if (ret == LZMA_STREAM_END) - return res; +#include - if (ret != LZMA_OK) - throw Error("error while compressing xz file"); - } -} +namespace nix { static ref decompressXZ(const std::string & in) { @@ -93,49 +52,6 @@ static ref decompressXZ(const std::string & in) } } -static ref compressBzip2(const std::string & in) -{ - bz_stream strm; - memset(&strm, 0, sizeof(strm)); - - int ret = BZ2_bzCompressInit(&strm, 9, 0, 30); - if (ret != BZ_OK) - throw Error("unable to initialise bzip2 encoder"); - - Finally free([&]() { BZ2_bzCompressEnd(&strm); }); - - int action = BZ_RUN; - char outbuf[BUFSIZ]; - ref res = make_ref(); - strm.next_in = (char *) in.c_str(); - strm.avail_in = in.size(); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - - while (true) { - checkInterrupt(); - - if (strm.avail_in == 0) - action = BZ_FINISH; - - int ret = BZ2_bzCompress(&strm, action); - - if (strm.avail_out == 0 || ret == BZ_STREAM_END) { - res->append(outbuf, sizeof(outbuf) - strm.avail_out); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - - if (ret == BZ_STREAM_END) - return res; - - if (ret != BZ_OK && ret != BZ_FINISH_OK) - Error("error while compressing bzip2 file"); - } - - return res; -} - static ref decompressBzip2(const std::string & in) { bz_stream strm; @@ -174,25 +90,185 @@ static ref decompressBzip2(const std::string & in) } ref compress(const std::string & method, const std::string & in) +{ + StringSink ssink; + auto sink = makeCompressionSink(method, ssink); + (*sink)(in); + sink->finish(); + return ssink.s; +} + +ref decompress(const std::string & method, const std::string & in) { if (method == "none") return make_ref(in); else if (method == "xz") - return compressXZ(in); + return decompressXZ(in); else if (method == "bzip2") - return compressBzip2(in); + return decompressBzip2(in); else throw UnknownCompressionMethod(format("unknown compression method ‘%s’") % method); } -ref decompress(const std::string & method, const std::string & in) +struct NoneSink : CompressionSink +{ + Sink & nextSink; + NoneSink(Sink & nextSink) : nextSink(nextSink) { } + void finish() override { flush(); } + void write(const unsigned char * data, size_t len) override { nextSink(data, len); } +}; + +struct XzSink : CompressionSink +{ + Sink & nextSink; + uint8_t outbuf[BUFSIZ]; + lzma_stream strm = LZMA_STREAM_INIT; + bool finished = false; + + XzSink(Sink & nextSink) : nextSink(nextSink) + { + lzma_ret ret = lzma_easy_encoder( + &strm, 6, LZMA_CHECK_CRC64); + if (ret != LZMA_OK) + throw Error("unable to initialise lzma encoder"); + // FIXME: apply the x86 BCJ filter? + + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + } + + ~XzSink() + { + assert(finished); + lzma_end(&strm); + } + + void finish() override + { + CompressionSink::flush(); + + assert(!finished); + finished = true; + + while (true) { + checkInterrupt(); + + lzma_ret ret = lzma_code(&strm, LZMA_FINISH); + if (ret != LZMA_OK && ret != LZMA_STREAM_END) + throw Error("error while flushing xz file"); + + if (strm.avail_out == 0 || ret == LZMA_STREAM_END) { + nextSink(outbuf, sizeof(outbuf) - strm.avail_out); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + } + + if (ret == LZMA_STREAM_END) break; + } + } + + void write(const unsigned char * data, size_t len) override + { + assert(!finished); + + strm.next_in = data; + strm.avail_in = len; + + while (strm.avail_in) { + checkInterrupt(); + + lzma_ret ret = lzma_code(&strm, LZMA_RUN); + if (ret != LZMA_OK) + throw Error("error while compressing xz file"); + + if (strm.avail_out == 0) { + nextSink(outbuf, sizeof(outbuf)); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + } + } + } +}; + +struct BzipSink : CompressionSink +{ + Sink & nextSink; + char outbuf[BUFSIZ]; + bz_stream strm; + bool finished = false; + + BzipSink(Sink & nextSink) : nextSink(nextSink) + { + memset(&strm, 0, sizeof(strm)); + int ret = BZ2_bzCompressInit(&strm, 9, 0, 30); + if (ret != BZ_OK) + throw Error("unable to initialise bzip2 encoder"); + + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + } + + ~BzipSink() + { + assert(finished); + BZ2_bzCompressEnd(&strm); + } + + void finish() override + { + flush(); + + assert(!finished); + finished = true; + + while (true) { + checkInterrupt(); + + int ret = BZ2_bzCompress(&strm, BZ_FINISH); + if (ret != BZ_FINISH_OK && ret != BZ_STREAM_END) + throw Error("error while flushing bzip2 file"); + + if (strm.avail_out == 0 || ret == BZ_STREAM_END) { + nextSink((unsigned char *) outbuf, sizeof(outbuf) - strm.avail_out); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + } + + if (ret == BZ_STREAM_END) break; + } + } + + void write(const unsigned char * data, size_t len) override + { + assert(!finished); + + strm.next_in = (char *) data; + strm.avail_in = len; + + while (strm.avail_in) { + checkInterrupt(); + + int ret = BZ2_bzCompress(&strm, BZ_RUN); + if (ret != BZ_OK) + Error("error while compressing bzip2 file"); + + if (strm.avail_out == 0) { + nextSink((unsigned char *) outbuf, sizeof(outbuf)); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + } + } + } +}; + +ref makeCompressionSink(const std::string & method, Sink & nextSink) { if (method == "none") - return make_ref(in); + return make_ref(nextSink); else if (method == "xz") - return decompressXZ(in); + return make_ref(nextSink); else if (method == "bzip2") - return decompressBzip2(in); + return make_ref(nextSink); else throw UnknownCompressionMethod(format("unknown compression method ‘%s’") % method); } diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh index ed3c463865c1..eacf559d65e9 100644 --- a/src/libutil/compression.hh +++ b/src/libutil/compression.hh @@ -2,6 +2,7 @@ #include "ref.hh" #include "types.hh" +#include "serialise.hh" #include @@ -11,6 +12,13 @@ ref compress(const std::string & method, const std::string & in); ref decompress(const std::string & method, const std::string & in); +struct CompressionSink : BufferedSink +{ + virtual void finish() = 0; +}; + +ref makeCompressionSink(const std::string & method, Sink & nextSink); + MakeError(UnknownCompressionMethod, Error); } diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 9ba6391f817a..892ec4aa36de 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -13,6 +13,11 @@ struct Sink virtual ~Sink() { } virtual void operator () (const unsigned char * data, size_t len) = 0; virtual bool good() { return true; } + + void operator () (const std::string & s) + { + (*this)((const unsigned char *) s.data(), s.size()); + } }; @@ -28,6 +33,11 @@ struct BufferedSink : Sink void operator () (const unsigned char * data, size_t len) override; + void operator () (const std::string & s) + { + Sink::operator()(s); + } + void flush(); virtual void write(const unsigned char * data, size_t len) = 0; diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc index 99fee5c65fb7..9bb1ac50a607 100644 --- a/src/nix-store/nix-store.cc +++ b/src/nix-store/nix-store.cc @@ -19,8 +19,6 @@ #include #include -#include - #if HAVE_SODIUM #include #endif -- cgit 1.4.1