about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEelco Dolstra <eelco.dolstra@logicblox.com>2016-05-04T13·46+0200
committerEelco Dolstra <eelco.dolstra@logicblox.com>2016-05-04T14·16+0200
commit0d4a10e910e1916a6f36cb9af6b68817172b51a9 (patch)
treedc2525bab942c3bdddfd22e40fdaacc525c8a937
parentc6a21aed07520e11dfae09cf9eff9e34e60906d8 (diff)
Do compression in a sink
-rw-r--r--src/libstore/build.cc61
-rw-r--r--src/libstore/remote-store.cc2
-rw-r--r--src/libutil/compression.cc260
-rw-r--r--src/libutil/compression.hh8
-rw-r--r--src/libutil/serialise.hh10
-rw-r--r--src/nix-store/nix-store.cc2
6 files changed, 206 insertions, 137 deletions
diff --git a/src/libstore/build.cc b/src/libstore/build.cc
index 3b9ecab1c12a..e6ec7886f3cd 100644
--- a/src/libstore/build.cc
+++ b/src/libstore/build.cc
@@ -9,6 +9,7 @@
 #include "affinity.hh"
 #include "builtins.hh"
 #include "finally.hh"
+#include "compression.hh"
 
 #include <algorithm>
 #include <iostream>
@@ -29,14 +30,11 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <errno.h>
-#include <stdio.h>
 #include <cstring>
 
 #include <pwd.h>
 #include <grp.h>
 
-#include <bzlib.h>
-
 /* chroot-like behavior from Apple's sandbox */
 #if __APPLE__
     #define DEFAULT_ALLOWED_IMPURE_PREFIXES "/System/Library /usr/lib /dev /bin/sh"
@@ -741,9 +739,8 @@ private:
     Path tmpDirInSandbox;
 
     /* File descriptor for the log file. */
-    FILE * fLogFile = 0;
-    BZFILE * bzLogFile = 0;
     AutoCloseFD fdLogFile;
+    std::shared_ptr<BufferedSink> logFileSink, logSink;
 
     /* Number of bytes received from the builder's stdout/stderr. */
     unsigned long logSize;
@@ -2854,46 +2851,31 @@ Path DerivationGoal::openLogFile()
     Path dir = (format("%1%/%2%/%3%/") % settings.nixLogDir % drvsLogDir % string(baseName, 0, 2)).str();
     createDirs(dir);
 
-    if (settings.compressLog) {
-
-        Path logFileName = (format("%1%/%2%.bz2") % dir % string(baseName, 2)).str();
-        AutoCloseFD fd = open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0666);
-        if (fd == -1) throw SysError(format("creating log file ‘%1%’") % logFileName);
-        closeOnExec(fd);
+    Path logFileName = (format("%1%/%2%%3%")
+        % dir
+        % string(baseName, 2)
+        % (settings.compressLog ? ".bz2" : "")).str();
 
-        if (!(fLogFile = fdopen(fd.borrow(), "w")))
-            throw SysError(format("opening file ‘%1%’") % logFileName);
+    fdLogFile = open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC, 0666);
+    if (fdLogFile == -1) throw SysError(format("creating log file ‘%1%’") % logFileName);
 
-        int err;
-        if (!(bzLogFile = BZ2_bzWriteOpen(&err, fLogFile, 9, 0, 0)))
-            throw Error(format("cannot open compressed log file ‘%1%’") % logFileName);
+    logFileSink = std::make_shared<FdSink>(fdLogFile);
 
-        return logFileName;
+    if (settings.compressLog)
+        logSink = std::shared_ptr<CompressionSink>(makeCompressionSink("bzip2", *logFileSink));
+    else
+        logSink = logFileSink;
 
-    } else {
-        Path logFileName = (format("%1%/%2%") % dir % string(baseName, 2)).str();
-        fdLogFile = open(logFileName.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0666);
-        if (fdLogFile == -1) throw SysError(format("creating log file ‘%1%’") % logFileName);
-        closeOnExec(fdLogFile);
-        return logFileName;
-    }
+    return logFileName;
 }
 
 
 void DerivationGoal::closeLogFile()
 {
-    if (bzLogFile) {
-        int err;
-        BZ2_bzWriteClose(&err, bzLogFile, 0, 0, 0);
-        bzLogFile = 0;
-        if (err != BZ_OK) throw Error(format("cannot close compressed log file (BZip2 error = %1%)") % err);
-    }
-
-    if (fLogFile) {
-        fclose(fLogFile);
-        fLogFile = 0;
-    }
-
+    auto logSink2 = std::dynamic_pointer_cast<CompressionSink>(logSink);
+    if (logSink2) logSink2->finish();
+    if (logFileSink) logFileSink->flush();
+    logSink = logFileSink = 0;
     fdLogFile.close();
 }
 
@@ -2940,12 +2922,7 @@ void DerivationGoal::handleChildOutput(int fd, const string & data)
                 currentLogLine[currentLogLinePos++] = c;
             }
 
-        if (bzLogFile) {
-            int err;
-            BZ2_bzWrite(&err, bzLogFile, (unsigned char *) data.data(), data.size());
-            if (err != BZ_OK) throw Error(format("cannot write to compressed log file (BZip2 error = %1%)") % err);
-        } else if (fdLogFile != -1)
-            writeFull(fdLogFile, data);
+        if (logSink) (*logSink)(data);
     }
 
     if (hook && fd == hook->fromHook.readSide)
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 1a6afe46bb01..9a00a6ed9910 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -541,7 +541,7 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
         if (msg == STDERR_WRITE) {
             string s = readString(from);
             if (!sink) throw Error("no sink");
-            (*sink)((const unsigned char *) s.data(), s.size());
+            (*sink)(s);
         }
         else if (msg == STDERR_READ) {
             if (!source) throw Error("no source");
diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc
index cd2cc9cc10fa..a3bbb5170d9f 100644
--- a/src/libutil/compression.cc
+++ b/src/libutil/compression.cc
@@ -7,50 +7,9 @@
 #include <cstdio>
 #include <cstring>
 
-namespace nix {
-
-static ref<std::string> compressXZ(const std::string & in)
-{
-    lzma_stream strm(LZMA_STREAM_INIT);
-
-    // FIXME: apply the x86 BCJ filter?
-
-    lzma_ret ret = lzma_easy_encoder(
-        &strm, 6, LZMA_CHECK_CRC64);
-    if (ret != LZMA_OK)
-        throw Error("unable to initialise lzma encoder");
-
-    Finally free([&]() { lzma_end(&strm); });
-
-    lzma_action action = LZMA_RUN;
-    uint8_t outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    strm.next_in = (uint8_t *) in.c_str();
-    strm.avail_in = in.size();
-    strm.next_out = outbuf;
-    strm.avail_out = sizeof(outbuf);
-
-    while (true) {
-        checkInterrupt();
-
-        if (strm.avail_in == 0)
-            action = LZMA_FINISH;
-
-        lzma_ret ret = lzma_code(&strm, action);
-
-        if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
-            res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out);
-            strm.next_out = outbuf;
-            strm.avail_out = sizeof(outbuf);
-        }
-
-        if (ret == LZMA_STREAM_END)
-            return res;
+#include <iostream>
 
-        if (ret != LZMA_OK)
-            throw Error("error while compressing xz file");
-    }
-}
+namespace nix {
 
 static ref<std::string> decompressXZ(const std::string & in)
 {
@@ -93,49 +52,6 @@ static ref<std::string> decompressXZ(const std::string & in)
     }
 }
 
-static ref<std::string> compressBzip2(const std::string & in)
-{
-    bz_stream strm;
-    memset(&strm, 0, sizeof(strm));
-
-    int ret = BZ2_bzCompressInit(&strm, 9, 0, 30);
-    if (ret != BZ_OK)
-        throw Error("unable to initialise bzip2 encoder");
-
-    Finally free([&]() { BZ2_bzCompressEnd(&strm); });
-
-    int action = BZ_RUN;
-    char outbuf[BUFSIZ];
-    ref<std::string> res = make_ref<std::string>();
-    strm.next_in = (char *) in.c_str();
-    strm.avail_in = in.size();
-    strm.next_out = outbuf;
-    strm.avail_out = sizeof(outbuf);
-
-    while (true) {
-        checkInterrupt();
-
-        if (strm.avail_in == 0)
-            action = BZ_FINISH;
-
-        int ret = BZ2_bzCompress(&strm, action);
-
-        if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
-            res->append(outbuf, sizeof(outbuf) - strm.avail_out);
-            strm.next_out = outbuf;
-            strm.avail_out = sizeof(outbuf);
-        }
-
-        if (ret == BZ_STREAM_END)
-            return res;
-
-        if (ret != BZ_OK && ret != BZ_FINISH_OK)
-             Error("error while compressing bzip2 file");
-    }
-
-    return res;
-}
-
 static ref<std::string> decompressBzip2(const std::string & in)
 {
     bz_stream strm;
@@ -175,24 +91,184 @@ static ref<std::string> decompressBzip2(const std::string & in)
 
 ref<std::string> compress(const std::string & method, const std::string & in)
 {
+    StringSink ssink;
+    auto sink = makeCompressionSink(method, ssink);
+    (*sink)(in);
+    sink->finish();
+    return ssink.s;
+}
+
+ref<std::string> decompress(const std::string & method, const std::string & in)
+{
     if (method == "none")
         return make_ref<std::string>(in);
     else if (method == "xz")
-        return compressXZ(in);
+        return decompressXZ(in);
     else if (method == "bzip2")
-        return compressBzip2(in);
+        return decompressBzip2(in);
     else
         throw UnknownCompressionMethod(format("unknown compression method ‘%s’") % method);
 }
 
-ref<std::string> decompress(const std::string & method, const std::string & in)
+struct NoneSink : CompressionSink
+{
+    Sink & nextSink;
+    NoneSink(Sink & nextSink) : nextSink(nextSink) { }
+    void finish() override { flush(); }
+    void write(const unsigned char * data, size_t len) override { nextSink(data, len); }
+};
+
+struct XzSink : CompressionSink
+{
+    Sink & nextSink;
+    uint8_t outbuf[BUFSIZ];
+    lzma_stream strm = LZMA_STREAM_INIT;
+    bool finished = false;
+
+    XzSink(Sink & nextSink) : nextSink(nextSink)
+    {
+        lzma_ret ret = lzma_easy_encoder(
+            &strm, 6, LZMA_CHECK_CRC64);
+        if (ret != LZMA_OK)
+            throw Error("unable to initialise lzma encoder");
+        // FIXME: apply the x86 BCJ filter?
+
+        strm.next_out = outbuf;
+        strm.avail_out = sizeof(outbuf);
+    }
+
+    ~XzSink()
+    {
+        assert(finished);
+        lzma_end(&strm);
+    }
+
+    void finish() override
+    {
+        CompressionSink::flush();
+
+        assert(!finished);
+        finished = true;
+
+        while (true) {
+            checkInterrupt();
+
+            lzma_ret ret = lzma_code(&strm, LZMA_FINISH);
+            if (ret != LZMA_OK && ret != LZMA_STREAM_END)
+                throw Error("error while flushing xz file");
+
+            if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
+                strm.next_out = outbuf;
+                strm.avail_out = sizeof(outbuf);
+            }
+
+            if (ret == LZMA_STREAM_END) break;
+        }
+    }
+
+    void write(const unsigned char * data, size_t len) override
+    {
+        assert(!finished);
+
+        strm.next_in = data;
+        strm.avail_in = len;
+
+        while (strm.avail_in) {
+            checkInterrupt();
+
+            lzma_ret ret = lzma_code(&strm, LZMA_RUN);
+            if (ret != LZMA_OK)
+                throw Error("error while compressing xz file");
+
+            if (strm.avail_out == 0) {
+                nextSink(outbuf, sizeof(outbuf));
+                strm.next_out = outbuf;
+                strm.avail_out = sizeof(outbuf);
+            }
+        }
+    }
+};
+
+struct BzipSink : CompressionSink
+{
+    Sink & nextSink;
+    char outbuf[BUFSIZ];
+    bz_stream strm;
+    bool finished = false;
+
+    BzipSink(Sink & nextSink) : nextSink(nextSink)
+    {
+        memset(&strm, 0, sizeof(strm));
+        int ret = BZ2_bzCompressInit(&strm, 9, 0, 30);
+        if (ret != BZ_OK)
+            throw Error("unable to initialise bzip2 encoder");
+
+        strm.next_out = outbuf;
+        strm.avail_out = sizeof(outbuf);
+    }
+
+    ~BzipSink()
+    {
+        assert(finished);
+        BZ2_bzCompressEnd(&strm);
+    }
+
+    void finish() override
+    {
+        flush();
+
+        assert(!finished);
+        finished = true;
+
+        while (true) {
+            checkInterrupt();
+
+            int ret = BZ2_bzCompress(&strm, BZ_FINISH);
+            if (ret != BZ_FINISH_OK && ret != BZ_STREAM_END)
+                throw Error("error while flushing bzip2 file");
+
+            if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
+                nextSink((unsigned char *) outbuf, sizeof(outbuf) - strm.avail_out);
+                strm.next_out = outbuf;
+                strm.avail_out = sizeof(outbuf);
+            }
+
+            if (ret == BZ_STREAM_END) break;
+        }
+    }
+
+    void write(const unsigned char * data, size_t len) override
+    {
+        assert(!finished);
+
+        strm.next_in = (char *) data;
+        strm.avail_in = len;
+
+        while (strm.avail_in) {
+            checkInterrupt();
+
+            int ret = BZ2_bzCompress(&strm, BZ_RUN);
+            if (ret != BZ_OK)
+                Error("error while compressing bzip2 file");
+
+            if (strm.avail_out == 0) {
+                nextSink((unsigned char *) outbuf, sizeof(outbuf));
+                strm.next_out = outbuf;
+                strm.avail_out = sizeof(outbuf);
+            }
+        }
+    }
+};
+
+ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink)
 {
     if (method == "none")
-        return make_ref<std::string>(in);
+        return make_ref<NoneSink>(nextSink);
     else if (method == "xz")
-        return decompressXZ(in);
+        return make_ref<XzSink>(nextSink);
     else if (method == "bzip2")
-        return decompressBzip2(in);
+        return make_ref<BzipSink>(nextSink);
     else
         throw UnknownCompressionMethod(format("unknown compression method ‘%s’") % method);
 }
diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh
index ed3c463865c1..eacf559d65e9 100644
--- a/src/libutil/compression.hh
+++ b/src/libutil/compression.hh
@@ -2,6 +2,7 @@
 
 #include "ref.hh"
 #include "types.hh"
+#include "serialise.hh"
 
 #include <string>
 
@@ -11,6 +12,13 @@ ref<std::string> compress(const std::string & method, const std::string & in);
 
 ref<std::string> decompress(const std::string & method, const std::string & in);
 
+struct CompressionSink : BufferedSink
+{
+    virtual void finish() = 0;
+};
+
+ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink);
+
 MakeError(UnknownCompressionMethod, Error);
 
 }
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 9ba6391f817a..892ec4aa36de 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -13,6 +13,11 @@ struct Sink
     virtual ~Sink() { }
     virtual void operator () (const unsigned char * data, size_t len) = 0;
     virtual bool good() { return true; }
+
+    void operator () (const std::string & s)
+    {
+        (*this)((const unsigned char *) s.data(), s.size());
+    }
 };
 
 
@@ -28,6 +33,11 @@ struct BufferedSink : Sink
 
     void operator () (const unsigned char * data, size_t len) override;
 
+    void operator () (const std::string & s)
+    {
+        Sink::operator()(s);
+    }
+
     void flush();
 
     virtual void write(const unsigned char * data, size_t len) = 0;
diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc
index 99fee5c65fb7..9bb1ac50a607 100644
--- a/src/nix-store/nix-store.cc
+++ b/src/nix-store/nix-store.cc
@@ -19,8 +19,6 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 
-#include <bzlib.h>
-
 #if HAVE_SODIUM
 #include <sodium.h>
 #endif