about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libexpr/eval.cc20
-rw-r--r--src/libexpr/parser.y4
-rw-r--r--src/libexpr/primops.cc18
-rw-r--r--src/libstore/binary-cache-store.cc23
-rw-r--r--src/libstore/builtins/fetchurl.cc21
-rw-r--r--src/libstore/download.cc87
-rw-r--r--src/libstore/download.hh4
-rw-r--r--src/libstore/legacy-ssh-store.cc62
-rw-r--r--src/libstore/s3-binary-cache-store.cc30
-rw-r--r--src/libstore/s3.hh4
-rw-r--r--src/libstore/serve-protocol.hh3
-rw-r--r--src/libstore/ssh.cc22
-rw-r--r--src/libstore/ssh.hh1
-rw-r--r--src/libstore/store-api.cc34
-rw-r--r--src/libutil/compression.cc607
-rw-r--r--src/libutil/compression.hh12
-rw-r--r--src/libutil/serialise.cc4
-rwxr-xr-xsrc/nix-build/nix-build.cc8
-rw-r--r--src/nix-daemon/nix-daemon.cc1
-rw-r--r--src/nix-prefetch-url/nix-prefetch-url.cc24
-rw-r--r--src/nix-store/nix-store.cc24
-rw-r--r--src/nix/copy.cc4
22 files changed, 510 insertions, 507 deletions
diff --git a/src/libexpr/eval.cc b/src/libexpr/eval.cc
index 2c8a0eb422fc..f41905787f9e 100644
--- a/src/libexpr/eval.cc
+++ b/src/libexpr/eval.cc
@@ -349,19 +349,25 @@ Path EvalState::checkSourcePath(const Path & path_)
 
     bool found = false;
 
+    /* First canonicalize the path without symlinks, so we make sure an
+     * attacker can't append ../../... to a path that would be in allowedPaths
+     * and thus leak symlink targets.
+     */
+    Path abspath = canonPath(path_);
+
     for (auto & i : *allowedPaths) {
-        if (isDirOrInDir(path_, i)) {
+        if (isDirOrInDir(abspath, i)) {
             found = true;
             break;
         }
     }
 
     if (!found)
-        throw RestrictedPathError("access to path '%1%' is forbidden in restricted mode", path_);
+        throw RestrictedPathError("access to path '%1%' is forbidden in restricted mode", abspath);
 
     /* Resolve symlinks. */
-    debug(format("checking access to '%s'") % path_);
-    Path path = canonPath(path_, true);
+    debug(format("checking access to '%s'") % abspath);
+    Path path = canonPath(abspath, true);
 
     for (auto & i : *allowedPaths) {
         if (isDirOrInDir(path, i)) {
@@ -1076,6 +1082,8 @@ void EvalState::callPrimOp(Value & fun, Value & arg, Value & v, const Pos & pos)
 
 void EvalState::callFunction(Value & fun, Value & arg, Value & v, const Pos & pos)
 {
+    forceValue(fun, pos);
+
     if (fun.type == tPrimOp || fun.type == tPrimOpApp) {
         callPrimOp(fun, arg, v, pos);
         return;
@@ -1091,10 +1099,8 @@ void EvalState::callFunction(Value & fun, Value & arg, Value & v, const Pos & po
         auto & fun2 = *allocValue();
         fun2 = fun;
         /* !!! Should we use the attr pos here? */
-        forceValue(*found->value, pos);
         Value v2;
         callFunction(*found->value, fun2, v2, pos);
-        forceValue(v2, pos);
         return callFunction(v2, arg, v, pos);
       }
     }
@@ -1181,7 +1187,6 @@ void EvalState::autoCallFunction(Bindings & args, Value & fun, Value & res)
     if (fun.type == tAttrs) {
         auto found = fun.attrs->find(sFunctor);
         if (found != fun.attrs->end()) {
-            forceValue(*found->value);
             Value * v = allocValue();
             callFunction(*found->value, fun, *v, noPos);
             forceValue(*v);
@@ -1565,7 +1570,6 @@ string EvalState::coerceToString(const Pos & pos, Value & v, PathSet & context,
     if (v.type == tAttrs) {
         auto i = v.attrs->find(sToString);
         if (i != v.attrs->end()) {
-            forceValue(*i->value, pos);
             Value v1;
             callFunction(*i->value, v, v1, pos);
             return coerceToString(pos, v1, context, coerceMore, copyToStore);
diff --git a/src/libexpr/parser.y b/src/libexpr/parser.y
index eee48887dc22..cbd576d7d126 100644
--- a/src/libexpr/parser.y
+++ b/src/libexpr/parser.y
@@ -273,11 +273,11 @@ void yyerror(YYLTYPE * loc, yyscan_t scanner, ParseData * data, const char * err
 %token IND_STRING_OPEN IND_STRING_CLOSE
 %token ELLIPSIS
 
-%nonassoc IMPL
+%right IMPL
 %left OR
 %left AND
 %nonassoc EQ NEQ
-%left '<' '>' LEQ GEQ
+%nonassoc '<' '>' LEQ GEQ
 %right UPDATE
 %left NOT
 %left '+' '-'
diff --git a/src/libexpr/primops.cc b/src/libexpr/primops.cc
index e71e3a6d46cb..8ace6db4d11d 100644
--- a/src/libexpr/primops.cc
+++ b/src/libexpr/primops.cc
@@ -1359,7 +1359,6 @@ static void prim_functionArgs(EvalState & state, const Pos & pos, Value * * args
 /* Apply a function to every element of an attribute set. */
 static void prim_mapAttrs(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
-    state.forceFunction(*args[0], pos);
     state.forceAttrs(*args[1], pos);
 
     state.mkAttrs(v, args[1]->attrs->size());
@@ -1368,7 +1367,7 @@ static void prim_mapAttrs(EvalState & state, const Pos & pos, Value * * args, Va
         Value * vName = state.allocValue();
         Value * vFun2 = state.allocValue();
         mkString(*vName, i.name);
-        state.callFunction(*args[0], *vName, *vFun2, pos);
+        mkApp(*vFun2, *args[0], *vName);
         mkApp(*state.allocAttr(v, i.name), *vFun2, *i.value);
     }
 }
@@ -1429,7 +1428,6 @@ static void prim_tail(EvalState & state, const Pos & pos, Value * * args, Value
 /* Apply a function to every element of a list. */
 static void prim_map(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
-    state.forceFunction(*args[0], pos);
     state.forceList(*args[1], pos);
 
     state.mkList(v, args[1]->listSize());
@@ -1508,19 +1506,20 @@ static void prim_foldlStrict(EvalState & state, const Pos & pos, Value * * args,
     state.forceFunction(*args[0], pos);
     state.forceList(*args[2], pos);
 
-    Value * vCur = args[1];
+    if (args[2]->listSize()) {
+        Value * vCur = args[1];
 
-    if (args[2]->listSize())
         for (unsigned int n = 0; n < args[2]->listSize(); ++n) {
             Value vTmp;
             state.callFunction(*args[0], *vCur, vTmp, pos);
             vCur = n == args[2]->listSize() - 1 ? &v : state.allocValue();
             state.callFunction(vTmp, *args[2]->listElems()[n], *vCur, pos);
         }
-    else
-        v = *vCur;
-
-    state.forceValue(v);
+        state.forceValue(v);
+    } else {
+        state.forceValue(*args[1]);
+        v = *args[1];
+    }
 }
 
 
@@ -1557,7 +1556,6 @@ static void prim_all(EvalState & state, const Pos & pos, Value * * args, Value &
 
 static void prim_genList(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
-    state.forceFunction(*args[0], pos);
     auto len = state.forceInt(*args[1], pos);
 
     if (len < 0)
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index 76c0a1a891b8..9c75c85993f9 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -217,17 +217,6 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
 {
     auto info = queryPathInfo(storePath).cast<const NarInfo>();
 
-    auto source = sinkToSource([this, url{info->url}](Sink & sink) {
-        try {
-            getFile(url, sink);
-        } catch (NoSuchBinaryCacheFile & e) {
-            throw SubstituteGone(e.what());
-        }
-    });
-
-    stats.narRead++;
-    //stats.narReadCompressedBytes += nar->size(); // FIXME
-
     uint64_t narSize = 0;
 
     LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
@@ -235,8 +224,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
         narSize += len;
     });
 
-    decompress(info->compression, *source, wrapperSink);
+    auto decompressor = makeDecompressionSink(info->compression, wrapperSink);
 
+    try {
+        getFile(info->url, *decompressor);
+    } catch (NoSuchBinaryCacheFile & e) {
+        throw SubstituteGone(e.what());
+    }
+
+    decompressor->flush();
+
+    stats.narRead++;
+    //stats.narReadCompressedBytes += nar->size(); // FIXME
     stats.narReadBytes += narSize;
 }
 
diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc
index 1f4abd374f54..b4dcb35f951a 100644
--- a/src/libstore/builtins/fetchurl.cc
+++ b/src/libstore/builtins/fetchurl.cc
@@ -39,21 +39,16 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
             request.verifyTLS = false;
             request.decompress = false;
 
-            downloader->download(std::move(request), sink);
+            auto decompressor = makeDecompressionSink(
+                hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink);
+            downloader->download(std::move(request), *decompressor);
+            decompressor->finish();
         });
 
-        if (get(drv.env, "unpack", "") == "1") {
-
-            if (hasSuffix(mainUrl, ".xz")) {
-                auto source2 = sinkToSource([&](Sink & sink) {
-                    decompress("xz", *source, sink);
-                });
-                restorePath(storePath, *source2);
-            } else
-                restorePath(storePath, *source);
-
-        } else
-              writeFile(storePath, *source);
+        if (get(drv.env, "unpack", "") == "1")
+            restorePath(storePath, *source);
+        else
+            writeFile(storePath, *source);
 
         auto executable = drv.env.find("executable");
         if (executable != drv.env.end() && executable->second == "1") {
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 07acd5d0e004..973fca0b130f 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -58,16 +58,6 @@ std::string resolveUri(const std::string & uri)
         return uri;
 }
 
-ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data)
-{
-    if (encoding == "")
-        return data;
-    else if (encoding == "br")
-        return decompress(encoding, *data);
-    else
-        throw Error("unsupported Content-Encoding '%s'", encoding);
-}
-
 struct CurlDownloader : public Downloader
 {
     CURLM * curlm = 0;
@@ -106,6 +96,12 @@ struct CurlDownloader : public Downloader
                 fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
                 {request.uri}, request.parentAct)
             , callback(callback)
+            , finalSink([this](const unsigned char * data, size_t len) {
+                if (this->request.dataCallback)
+                    this->request.dataCallback((char *) data, len);
+                else
+                    this->result.data->append((char *) data, len);
+              })
         {
             if (!request.expectedETag.empty())
                 requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
@@ -129,22 +125,40 @@ struct CurlDownloader : public Downloader
             }
         }
 
-        template<class T>
-        void fail(const T & e)
+        void failEx(std::exception_ptr ex)
         {
             assert(!done);
             done = true;
-            callback.rethrow(std::make_exception_ptr(e));
+            callback.rethrow(ex);
         }
 
+        template<class T>
+        void fail(const T & e)
+        {
+            failEx(std::make_exception_ptr(e));
+        }
+
+        LambdaSink finalSink;
+        std::shared_ptr<CompressionSink> decompressionSink;
+
+        std::exception_ptr writeException;
+
         size_t writeCallback(void * contents, size_t size, size_t nmemb)
         {
-            size_t realSize = size * nmemb;
-            if (request.dataCallback)
-                request.dataCallback((char *) contents, realSize);
-            else
-                result.data->append((char *) contents, realSize);
-            return realSize;
+            try {
+                size_t realSize = size * nmemb;
+                result.bodySize += realSize;
+
+                if (!decompressionSink)
+                    decompressionSink = makeDecompressionSink(encoding, finalSink);
+
+                (*decompressionSink)((unsigned char *) contents, realSize);
+
+                return realSize;
+            } catch (...) {
+                writeException = std::current_exception();
+                return 0;
+            }
         }
 
         static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
@@ -162,6 +176,7 @@ struct CurlDownloader : public Downloader
                 auto ss = tokenizeString<vector<string>>(line, " ");
                 status = ss.size() >= 2 ? ss[1] : "";
                 result.data = std::make_shared<std::string>();
+                result.bodySize = 0;
                 encoding = "";
             } else {
                 auto i = line.find(':');
@@ -296,6 +311,7 @@ struct CurlDownloader : public Downloader
             curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
 
             result.data = std::make_shared<std::string>();
+            result.bodySize = 0;
         }
 
         void finish(CURLcode code)
@@ -309,29 +325,35 @@ struct CurlDownloader : public Downloader
                 result.effectiveUrl = effectiveUrlCStr;
 
             debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
-                request.verb(), request.uri, code, httpStatus, result.data ? result.data->size() : 0);
+                request.verb(), request.uri, code, httpStatus, result.bodySize);
+
+            if (decompressionSink)
+                decompressionSink->finish();
 
             if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
                 code = CURLE_OK;
                 httpStatus = 304;
             }
 
-            if (code == CURLE_OK &&
+            if (writeException)
+                failEx(writeException);
+
+            else if (code == CURLE_OK &&
                 (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
             {
                 result.cached = httpStatus == 304;
                 done = true;
 
                 try {
-                    if (request.decompress)
-                        result.data = decodeContent(encoding, ref<std::string>(result.data));
                     act.progress(result.data->size(), result.data->size());
                     callback(std::move(result));
                 } catch (...) {
                     done = true;
                     callback.rethrow();
                 }
-            } else {
+            }
+
+            else {
                 // We treat most errors as transient, but won't retry when hopeless
                 Error err = Transient;
 
@@ -366,6 +388,7 @@ struct CurlDownloader : public Downloader
                         case CURLE_UNKNOWN_OPTION:
                         case CURLE_SSL_CACERT_BADFILE:
                         case CURLE_TOO_MANY_REDIRECTS:
+                        case CURLE_WRITE_ERROR:
                             err = Misc;
                             break;
                         default: // Shut up warnings
@@ -598,7 +621,7 @@ struct CurlDownloader : public Downloader
             // FIXME: do this on a worker thread
             try {
 #ifdef ENABLE_S3
-                S3Helper s3Helper("", Aws::Region::US_EAST_1); // FIXME: make configurable
+                S3Helper s3Helper("", Aws::Region::US_EAST_1, ""); // FIXME: make configurable
                 auto slash = request.uri.find('/', 5);
                 if (slash == std::string::npos)
                     throw nix::Error("bad S3 URI '%s'", request.uri);
@@ -718,15 +741,17 @@ void Downloader::download(DownloadRequest && request, Sink & sink)
     while (true) {
         checkInterrupt();
 
-        if (state->quit) {
-            if (state->exc) std::rethrow_exception(state->exc);
-            break;
-        }
-
         /* If no data is available, then wait for the download thread
            to wake us up. */
-        if (state->data.empty())
+        if (state->data.empty()) {
+
+            if (state->quit) {
+                if (state->exc) std::rethrow_exception(state->exc);
+                break;
+            }
+
             state.wait(state->avail);
+        }
 
         /* If data is available, then flush it to the sink and wake up
            the download thread if it's blocked on a full buffer. */
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index da55df7a6e71..f0228f7d053a 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -38,6 +38,7 @@ struct DownloadResult
     std::string etag;
     std::string effectiveUrl;
     std::shared_ptr<std::string> data;
+    uint64_t bodySize = 0;
 };
 
 class Store;
@@ -87,7 +88,4 @@ public:
 
 bool isUri(const string & s);
 
-/* Decode data according to the Content-Encoding header. */
-ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data);
-
 }
diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc
index 02d91ded04cd..7c214f09d6fb 100644
--- a/src/libstore/legacy-ssh-store.cc
+++ b/src/libstore/legacy-ssh-store.cc
@@ -17,6 +17,7 @@ struct LegacySSHStore : public Store
     const Setting<Path> sshKey{this, "", "ssh-key", "path to an SSH private key"};
     const Setting<bool> compress{this, false, "compress", "whether to compress the connection"};
     const Setting<Path> remoteProgram{this, "nix-store", "remote-program", "path to the nix-store executable on the remote system"};
+    const Setting<std::string> remoteStore{this, "", "remote-store", "URI of the store on the remote system"};
 
     // Hack for getting remote build log output.
     const Setting<int> logFD{this, -1, "log-fd", "file descriptor to which SSH's stderr is connected"};
@@ -27,6 +28,7 @@ struct LegacySSHStore : public Store
         FdSink to;
         FdSource from;
         int remoteVersion;
+        bool good = true;
     };
 
     std::string host;
@@ -41,7 +43,7 @@ struct LegacySSHStore : public Store
         , connections(make_ref<Pool<Connection>>(
             std::max(1, (int) maxConnections),
             [this]() { return openConnection(); },
-            [](const ref<Connection> & r) { return true; }
+            [](const ref<Connection> & r) { return r->good; }
             ))
         , master(
             host,
@@ -56,7 +58,9 @@ struct LegacySSHStore : public Store
     ref<Connection> openConnection()
     {
         auto conn = make_ref<Connection>();
-        conn->sshConn = master.startCommand(fmt("%s --serve --write", remoteProgram));
+        conn->sshConn = master.startCommand(
+            fmt("%s --serve --write", remoteProgram)
+            + (remoteStore.get() == "" ? "" : " --store " + shellEscape(remoteStore.get())));
         conn->to = FdSink(conn->sshConn->in.get());
         conn->from = FdSource(conn->sshConn->out.get());
 
@@ -127,18 +131,48 @@ struct LegacySSHStore : public Store
 
         auto conn(connections->get());
 
-        conn->to
-            << cmdImportPaths
-            << 1;
-        copyNAR(source, conn->to);
-        conn->to
-            << exportMagic
-            << info.path
-            << info.references
-            << info.deriver
-            << 0
-            << 0;
-        conn->to.flush();
+        if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 4) {
+
+            conn->to
+                << cmdAddToStoreNar
+                << info.path
+                << info.deriver
+                << info.narHash.to_string(Base16, false)
+                << info.references
+                << info.registrationTime
+                << info.narSize
+                << info.ultimate
+                << info.sigs
+                << info.ca;
+            try {
+                copyNAR(source, conn->to);
+            } catch (...) {
+                conn->good = false;
+                throw;
+            }
+            conn->to.flush();
+
+        } else {
+
+            conn->to
+                << cmdImportPaths
+                << 1;
+            try {
+                copyNAR(source, conn->to);
+            } catch (...) {
+                conn->good = false;
+                throw;
+            }
+            conn->to
+                << exportMagic
+                << info.path
+                << info.references
+                << info.deriver
+                << 0
+                << 0;
+            conn->to.flush();
+
+        }
 
         if (readInt(conn->from) != 1)
             throw Error("failed to add path '%s' to remote host '%s', info.path, host");
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index 26144ccb40cc..6d95c1fa8c65 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -84,8 +84,8 @@ static void initAWS()
     });
 }
 
-S3Helper::S3Helper(const std::string & profile, const std::string & region)
-    : config(makeConfig(region))
+S3Helper::S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint)
+    : config(makeConfig(region, endpoint))
     , client(make_ref<Aws::S3::S3Client>(
             profile == ""
             ? std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>(
@@ -99,7 +99,7 @@ S3Helper::S3Helper(const std::string & profile, const std::string & region)
 #else
             Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
 #endif
-            false))
+            endpoint.empty()))
 {
 }
 
@@ -116,11 +116,14 @@ class RetryStrategy : public Aws::Client::DefaultRetryStrategy
     }
 };
 
-ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region)
+ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region, const string & endpoint)
 {
     initAWS();
     auto res = make_ref<Aws::Client::ClientConfiguration>();
     res->region = region;
+    if (!endpoint.empty()) {
+        res->endpointOverride = endpoint;
+    }
     res->requestTimeoutMs = 600 * 1000;
     res->retryStrategy = std::make_shared<RetryStrategy>();
     res->caFile = settings.caFile;
@@ -150,10 +153,8 @@ S3Helper::DownloadResult S3Helper::getObject(
         auto result = checkAws(fmt("AWS error fetching '%s'", key),
             client->GetObject(request));
 
-        res.data = decodeContent(
-            result.GetContentEncoding(),
-            make_ref<std::string>(
-                dynamic_cast<std::stringstream &>(result.GetBody()).str()));
+        res.data = decompress(result.GetContentEncoding(),
+            dynamic_cast<std::stringstream &>(result.GetBody()).str());
 
     } catch (S3Error & e) {
         if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
@@ -170,6 +171,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
 {
     const Setting<std::string> profile{this, "", "profile", "The name of the AWS configuration profile to use."};
     const Setting<std::string> region{this, Aws::Region::US_EAST_1, "region", {"aws-region"}};
+    const Setting<std::string> endpoint{this, "", "endpoint", "An optional override of the endpoint to use when talking to S3."};
     const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
     const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
     const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
@@ -186,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         const Params & params, const std::string & bucketName)
         : S3BinaryCacheStore(params)
         , bucketName(bucketName)
-        , s3Helper(profile, region)
+        , s3Helper(profile, region, endpoint)
     {
         diskCache = getNarInfoDiskCache();
     }
@@ -289,10 +291,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         transferConfig.s3Client = s3Helper.client;
         transferConfig.bufferSize = bufferSize;
 
-        if (contentEncoding != "")
-            transferConfig.createMultipartUploadTemplate.SetContentEncoding(
-                contentEncoding);
-
         transferConfig.uploadProgressCallback =
             [&](const TransferManager *transferManager,
                 const std::shared_ptr<const TransferHandle>
@@ -334,8 +332,10 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         auto now1 = std::chrono::steady_clock::now();
 
         std::shared_ptr<TransferHandle> transferHandle =
-            transferManager->UploadFile(stream, bucketName, path, mimeType,
-                                        Aws::Map<Aws::String, Aws::String>());
+            transferManager->UploadFile(
+                stream, bucketName, path, mimeType,
+                Aws::Map<Aws::String, Aws::String>(),
+                nullptr, contentEncoding);
 
         transferHandle->WaitUntilFinished();
 
diff --git a/src/libstore/s3.hh b/src/libstore/s3.hh
index 4f996400343c..95d612b66335 100644
--- a/src/libstore/s3.hh
+++ b/src/libstore/s3.hh
@@ -14,9 +14,9 @@ struct S3Helper
     ref<Aws::Client::ClientConfiguration> config;
     ref<Aws::S3::S3Client> client;
 
-    S3Helper(const std::string & profile, const std::string & region);
+    S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint);
 
-    ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region);
+    ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region, const std::string & endpoint);
 
     struct DownloadResult
     {
diff --git a/src/libstore/serve-protocol.hh b/src/libstore/serve-protocol.hh
index f67d1e2580a5..9fae6d5349f1 100644
--- a/src/libstore/serve-protocol.hh
+++ b/src/libstore/serve-protocol.hh
@@ -5,7 +5,7 @@ namespace nix {
 #define SERVE_MAGIC_1 0x390c9deb
 #define SERVE_MAGIC_2 0x5452eecb
 
-#define SERVE_PROTOCOL_VERSION 0x204
+#define SERVE_PROTOCOL_VERSION 0x205
 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
 #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
 
@@ -18,6 +18,7 @@ typedef enum {
     cmdBuildPaths = 6,
     cmdQueryClosure = 7,
     cmdBuildDerivation = 8,
+    cmdAddToStoreNar = 9,
 } ServeCommand;
 
 }
diff --git a/src/libstore/ssh.cc b/src/libstore/ssh.cc
index 033c580936ad..5e0e44935cca 100644
--- a/src/libstore/ssh.cc
+++ b/src/libstore/ssh.cc
@@ -4,8 +4,9 @@ namespace nix {
 
 SSHMaster::SSHMaster(const std::string & host, const std::string & keyFile, bool useMaster, bool compress, int logFD)
     : host(host)
+    , fakeSSH(host == "localhost")
     , keyFile(keyFile)
-    , useMaster(useMaster)
+    , useMaster(useMaster && !fakeSSH)
     , compress(compress)
     , logFD(logFD)
 {
@@ -45,12 +46,19 @@ std::unique_ptr<SSHMaster::Connection> SSHMaster::startCommand(const std::string
         if (logFD != -1 && dup2(logFD, STDERR_FILENO) == -1)
             throw SysError("duping over stderr");
 
-        Strings args = { "ssh", host.c_str(), "-x", "-a" };
-        addCommonSSHOpts(args);
-        if (socketPath != "")
-            args.insert(args.end(), {"-S", socketPath});
-        if (verbosity >= lvlChatty)
-            args.push_back("-v");
+        Strings args;
+
+        if (fakeSSH) {
+            args = { "bash", "-c" };
+        } else {
+            args = { "ssh", host.c_str(), "-x", "-a" };
+            addCommonSSHOpts(args);
+            if (socketPath != "")
+                args.insert(args.end(), {"-S", socketPath});
+            if (verbosity >= lvlChatty)
+                args.push_back("-v");
+        }
+
         args.push_back(command);
         execvp(args.begin()->c_str(), stringsToCharPtrs(args).data());
 
diff --git a/src/libstore/ssh.hh b/src/libstore/ssh.hh
index 1268e6d00054..4f0f0bd29f9f 100644
--- a/src/libstore/ssh.hh
+++ b/src/libstore/ssh.hh
@@ -10,6 +10,7 @@ class SSHMaster
 private:
 
     const std::string host;
+    bool fakeSSH;
     const std::string keyFile;
     const bool useMaster;
     const bool compress;
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index 9b0b7d6327e0..7a4a5f5eb85d 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -629,11 +629,12 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const PathSet & storePa
     Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
 
     std::atomic<size_t> nrDone{0};
+    std::atomic<size_t> nrFailed{0};
     std::atomic<uint64_t> bytesExpected{0};
     std::atomic<uint64_t> nrRunning{0};
 
     auto showProgress = [&]() {
-        act.progress(nrDone, missing.size(), nrRunning);
+        act.progress(nrDone, missing.size(), nrRunning, nrFailed);
     };
 
     ThreadPool pool;
@@ -662,7 +663,16 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const PathSet & storePa
             if (!dstStore->isValidPath(storePath)) {
                 MaintainCount<decltype(nrRunning)> mc(nrRunning);
                 showProgress();
-                copyStorePath(srcStore, dstStore, storePath, repair, checkSigs);
+                try {
+                    copyStorePath(srcStore, dstStore, storePath, repair, checkSigs);
+                } catch (Error &e) {
+                    nrFailed++;
+                    if (!settings.keepGoing)
+                        throw e;
+                    logger->log(lvlError, format("could not copy %s: %s") % storePath % e.what());
+                    showProgress();
+                    return;
+                }
             }
 
             nrDone++;
@@ -834,8 +844,24 @@ ref<Store> openStore(const std::string & uri_,
     if (q != std::string::npos) {
         for (auto s : tokenizeString<Strings>(uri.substr(q + 1), "&")) {
             auto e = s.find('=');
-            if (e != std::string::npos)
-                params[s.substr(0, e)] = s.substr(e + 1);
+            if (e != std::string::npos) {
+                auto value = s.substr(e + 1);
+                std::string decoded;
+                for (size_t i = 0; i < value.size(); ) {
+                    if (value[i] == '%') {
+                        if (i + 2 >= value.size())
+                            throw Error("invalid URI parameter '%s'", value);
+                        try {
+                            decoded += std::stoul(std::string(value, i + 1, 2), 0, 16);
+                            i += 3;
+                        } catch (...) {
+                            throw Error("invalid URI parameter '%s'", value);
+                        }
+                    } else
+                        decoded += value[i++];
+                }
+                params[s.substr(0, e)] = decoded;
+            }
         }
         uri = uri_.substr(0, q);
     }
diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc
index e1782f8c4bd9..53b62f62a76f 100644
--- a/src/libutil/compression.cc
+++ b/src/libutil/compression.cc
@@ -8,10 +8,8 @@
 #include <cstdio>
 #include <cstring>
 
-#if HAVE_BROTLI
 #include <brotli/decode.h>
 #include <brotli/encode.h>
-#endif // HAVE_BROTLI
 
 #include <iostream>
 
@@ -19,235 +17,258 @@ namespace nix {
 
 static const size_t bufSize = 32 * 1024;
 
-static void decompressNone(Source & source, Sink & sink)
+// Don't feed brotli too much at once.
+struct ChunkedCompressionSink : CompressionSink
 {
-    std::vector<unsigned char> buf(bufSize);
-    while (true) {
-        size_t n;
-        try {
-            n = source.read(buf.data(), buf.size());
-        } catch (EndOfFile &) {
-            break;
+    uint8_t outbuf[BUFSIZ];
+
+    void write(const unsigned char * data, size_t len) override
+    {
+        const size_t CHUNK_SIZE = sizeof(outbuf) << 2;
+        while (len) {
+            size_t n = std::min(CHUNK_SIZE, len);
+            writeInternal(data, n);
+            data += n;
+            len -= n;
         }
-        sink(buf.data(), n);
     }
-}
 
-static void decompressXZ(Source & source, Sink & sink)
+    virtual void writeInternal(const unsigned char * data, size_t len) = 0;
+};
+
+struct NoneSink : CompressionSink
 {
-    lzma_stream strm(LZMA_STREAM_INIT);
-
-    lzma_ret ret = lzma_stream_decoder(
-        &strm, UINT64_MAX, LZMA_CONCATENATED);
-    if (ret != LZMA_OK)
-        throw CompressionError("unable to initialise lzma decoder");
-
-    Finally free([&]() { lzma_end(&strm); });
-
-    lzma_action action = LZMA_RUN;
-    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
-    strm.next_in = nullptr;
-    strm.avail_in = 0;
-    strm.next_out = outbuf.data();
-    strm.avail_out = outbuf.size();
-    bool eof = false;
-
-    while (true) {
-        checkInterrupt();
-
-        if (strm.avail_in == 0 && !eof) {
-            strm.next_in = inbuf.data();
-            try {
-                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
-            } catch (EndOfFile &) {
-                eof = true;
-            }
-        }
+    Sink & nextSink;
+    NoneSink(Sink & nextSink) : nextSink(nextSink) { }
+    void finish() override { flush(); }
+    void write(const unsigned char * data, size_t len) override { nextSink(data, len); }
+};
 
-        if (strm.avail_in == 0)
-            action = LZMA_FINISH;
+struct XzDecompressionSink : CompressionSink
+{
+    Sink & nextSink;
+    uint8_t outbuf[BUFSIZ];
+    lzma_stream strm = LZMA_STREAM_INIT;
+    bool finished = false;
 
-        lzma_ret ret = lzma_code(&strm, action);
+    XzDecompressionSink(Sink & nextSink) : nextSink(nextSink)
+    {
+        lzma_ret ret = lzma_stream_decoder(
+            &strm, UINT64_MAX, LZMA_CONCATENATED);
+        if (ret != LZMA_OK)
+            throw CompressionError("unable to initialise lzma decoder");
 
-        if (strm.avail_out < outbuf.size()) {
-            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
-            strm.next_out = outbuf.data();
-            strm.avail_out = outbuf.size();
-        }
+        strm.next_out = outbuf;
+        strm.avail_out = sizeof(outbuf);
+    }
 
-        if (ret == LZMA_STREAM_END) return;
+    ~XzDecompressionSink()
+    {
+        lzma_end(&strm);
+    }
 
-        if (ret != LZMA_OK)
-            throw CompressionError("error %d while decompressing xz file", ret);
+    void finish() override
+    {
+        CompressionSink::flush();
+        write(nullptr, 0);
     }
-}
 
-static void decompressBzip2(Source & source, Sink & sink)
-{
-    bz_stream strm;
-    memset(&strm, 0, sizeof(strm));
-
-    int ret = BZ2_bzDecompressInit(&strm, 0, 0);
-    if (ret != BZ_OK)
-        throw CompressionError("unable to initialise bzip2 decoder");
-
-    Finally free([&]() { BZ2_bzDecompressEnd(&strm); });
-
-    std::vector<char> inbuf(bufSize), outbuf(bufSize);
-    strm.next_in = nullptr;
-    strm.avail_in = 0;
-    strm.next_out = outbuf.data();
-    strm.avail_out = outbuf.size();
-    bool eof = false;
-
-    while (true) {
-        checkInterrupt();
-
-        if (strm.avail_in == 0 && !eof) {
-            strm.next_in = inbuf.data();
-            try {
-                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
-            } catch (EndOfFile &) {
-                eof = true;
-            }
-        }
+    void write(const unsigned char * data, size_t len) override
+    {
+        strm.next_in = data;
+        strm.avail_in = len;
+
+        while (!finished && (!data || strm.avail_in)) {
+            checkInterrupt();
 
-        int ret = BZ2_bzDecompress(&strm);
+            lzma_ret ret = lzma_code(&strm, data ? LZMA_RUN : LZMA_FINISH);
+            if (ret != LZMA_OK && ret != LZMA_STREAM_END)
+                throw CompressionError("error %d while decompressing xz file", ret);
 
-        if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof)
-            throw CompressionError("bzip2 data ends prematurely");
+            finished = ret == LZMA_STREAM_END;
 
-        if (strm.avail_out < outbuf.size()) {
-            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
-            strm.next_out = outbuf.data();
-            strm.avail_out = outbuf.size();
+            if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
+                strm.next_out = outbuf;
+                strm.avail_out = sizeof(outbuf);
+            }
         }
+    }
+};
 
-        if (ret == BZ_STREAM_END) return;
+struct BzipDecompressionSink : ChunkedCompressionSink
+{
+    Sink & nextSink;
+    bz_stream strm;
+    bool finished = false;
 
+    BzipDecompressionSink(Sink & nextSink) : nextSink(nextSink)
+    {
+        memset(&strm, 0, sizeof(strm));
+        int ret = BZ2_bzDecompressInit(&strm, 0, 0);
         if (ret != BZ_OK)
-            throw CompressionError("error while decompressing bzip2 file");
+            throw CompressionError("unable to initialise bzip2 decoder");
+
+        strm.next_out = (char *) outbuf;
+        strm.avail_out = sizeof(outbuf);
     }
-}
 
-static void decompressBrotli(Source & source, Sink & sink)
-{
-#if !HAVE_BROTLI
-    RunOptions options(BROTLI, {"-d"});
-    options.standardIn = &source;
-    options.standardOut = &sink;
-    runProgram2(options);
-#else
-    auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
-    if (!s)
-        throw CompressionError("unable to initialize brotli decoder");
-
-    Finally free([s]() { BrotliDecoderDestroyInstance(s); });
-
-    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
-    const uint8_t * next_in = nullptr;
-    size_t avail_in = 0;
-    bool eof = false;
-
-    while (true) {
-        checkInterrupt();
-
-        if (avail_in == 0 && !eof) {
-            next_in = inbuf.data();
-            try {
-                avail_in = source.read((unsigned char *) next_in, inbuf.size());
-            } catch (EndOfFile &) {
-                eof = true;
+    ~BzipDecompressionSink()
+    {
+        BZ2_bzDecompressEnd(&strm);
+    }
+
+    void finish() override
+    {
+        flush();
+        write(nullptr, 0);
+    }
+
+    void writeInternal(const unsigned char * data, size_t len)
+    {
+        assert(len <= std::numeric_limits<decltype(strm.avail_in)>::max());
+
+        strm.next_in = (char *) data;
+        strm.avail_in = len;
+
+        while (strm.avail_in) {
+            checkInterrupt();
+
+            int ret = BZ2_bzDecompress(&strm);
+            if (ret != BZ_OK && ret != BZ_STREAM_END)
+                throw CompressionError("error while decompressing bzip2 file");
+
+            finished = ret == BZ_STREAM_END;
+
+            if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
+                strm.next_out = (char *) outbuf;
+                strm.avail_out = sizeof(outbuf);
             }
         }
+    }
+};
 
-        uint8_t * next_out = outbuf.data();
-        size_t avail_out = outbuf.size();
-
-        auto ret = BrotliDecoderDecompressStream(s,
-                &avail_in, &next_in,
-                &avail_out, &next_out,
-                nullptr);
-
-        switch (ret) {
-        case BROTLI_DECODER_RESULT_ERROR:
-            throw CompressionError("error while decompressing brotli file");
-        case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
-            if (eof)
-                throw CompressionError("incomplete or corrupt brotli file");
-            break;
-        case BROTLI_DECODER_RESULT_SUCCESS:
-            if (avail_in != 0)
-                throw CompressionError("unexpected input after brotli decompression");
-            break;
-        case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT:
-            // I'm not sure if this can happen, but abort if this happens with empty buffer
-            if (avail_out == outbuf.size())
-                throw CompressionError("brotli decompression requires larger buffer");
-            break;
-        }
+struct BrotliDecompressionSink : ChunkedCompressionSink
+{
+    Sink & nextSink;
+    BrotliDecoderState * state;
+    bool finished = false;
 
-        // Always ensure we have full buffer for next invocation
-        if (avail_out < outbuf.size())
-            sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out);
+    BrotliDecompressionSink(Sink & nextSink) : nextSink(nextSink)
+    {
+        state = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
+        if (!state)
+            throw CompressionError("unable to initialize brotli decoder");
+    }
 
-        if (ret == BROTLI_DECODER_RESULT_SUCCESS) return;
+    ~BrotliDecompressionSink()
+    {
+        BrotliDecoderDestroyInstance(state);
+    }
+
+    void finish() override
+    {
+        flush();
+        writeInternal(nullptr, 0);
     }
-#endif // HAVE_BROTLI
-}
+
+    void writeInternal(const unsigned char * data, size_t len)
+    {
+        const uint8_t * next_in = data;
+        size_t avail_in = len;
+        uint8_t * next_out = outbuf;
+        size_t avail_out = sizeof(outbuf);
+
+        while (!finished && (!data || avail_in)) {
+            checkInterrupt();
+
+            if (!BrotliDecoderDecompressStream(state,
+                    &avail_in, &next_in,
+                    &avail_out, &next_out,
+                    nullptr))
+                throw CompressionError("error while decompressing brotli file");
+
+            if (avail_out < sizeof(outbuf) || avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - avail_out);
+                next_out = outbuf;
+                avail_out = sizeof(outbuf);
+            }
+
+            finished = BrotliDecoderIsFinished(state);
+        }
+    }
+};
 
 ref<std::string> decompress(const std::string & method, const std::string & in)
 {
-    StringSource source(in);
-    StringSink sink;
-    decompress(method, source, sink);
-    return sink.s;
+    StringSink ssink;
+    auto sink = makeDecompressionSink(method, ssink);
+    (*sink)(in);
+    sink->finish();
+    return ssink.s;
 }
 
-void decompress(const std::string & method, Source & source, Sink & sink)
+ref<CompressionSink> makeDecompressionSink(const std::string & method, Sink & nextSink)
 {
-    if (method == "none")
-        return decompressNone(source, sink);
+    if (method == "none" || method == "")
+        return make_ref<NoneSink>(nextSink);
     else if (method == "xz")
-        return decompressXZ(source, sink);
+        return make_ref<XzDecompressionSink>(nextSink);
     else if (method == "bzip2")
-        return decompressBzip2(source, sink);
+        return make_ref<BzipDecompressionSink>(nextSink);
     else if (method == "br")
-        return decompressBrotli(source, sink);
+        return make_ref<BrotliDecompressionSink>(nextSink);
     else
         throw UnknownCompressionMethod("unknown compression method '%s'", method);
 }
 
-struct NoneSink : CompressionSink
-{
-    Sink & nextSink;
-    NoneSink(Sink & nextSink) : nextSink(nextSink) { }
-    void finish() override { flush(); }
-    void write(const unsigned char * data, size_t len) override { nextSink(data, len); }
-};
-
-struct XzSink : CompressionSink
+struct XzCompressionSink : CompressionSink
 {
     Sink & nextSink;
     uint8_t outbuf[BUFSIZ];
     lzma_stream strm = LZMA_STREAM_INIT;
     bool finished = false;
 
-    template <typename F>
-    XzSink(Sink & nextSink, F&& initEncoder) : nextSink(nextSink) {
-        lzma_ret ret = initEncoder();
+    XzCompressionSink(Sink & nextSink, bool parallel) : nextSink(nextSink)
+    {
+        lzma_ret ret;
+        bool done = false;
+
+        if (parallel) {
+#ifdef HAVE_LZMA_MT
+            lzma_mt mt_options = {};
+            mt_options.flags = 0;
+            mt_options.timeout = 300; // Using the same setting as the xz cmd line
+            mt_options.preset = LZMA_PRESET_DEFAULT;
+            mt_options.filters = NULL;
+            mt_options.check = LZMA_CHECK_CRC64;
+            mt_options.threads = lzma_cputhreads();
+            mt_options.block_size = 0;
+            if (mt_options.threads == 0)
+                mt_options.threads = 1;
+            // FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the
+            // number of threads.
+            ret = lzma_stream_encoder_mt(&strm, &mt_options);
+            done = true;
+#else
+            printMsg(lvlError, "warning: parallel compression requested but not supported for metho  d '%1%', falling back to single-threaded compression", method);
+#endif
+        }
+
+        if (!done)
+            ret = lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64);
+
         if (ret != LZMA_OK)
             throw CompressionError("unable to initialise lzma encoder");
+
         // FIXME: apply the x86 BCJ filter?
 
         strm.next_out = outbuf;
         strm.avail_out = sizeof(outbuf);
     }
-    XzSink(Sink & nextSink) : XzSink(nextSink, [this]() {
-        return lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64);
-    }) {}
 
-    ~XzSink()
+    ~XzCompressionSink()
     {
         lzma_end(&strm);
     }
@@ -255,43 +276,25 @@ struct XzSink : CompressionSink
     void finish() override
     {
         CompressionSink::flush();
-
-        assert(!finished);
-        finished = true;
-
-        while (true) {
-            checkInterrupt();
-
-            lzma_ret ret = lzma_code(&strm, LZMA_FINISH);
-            if (ret != LZMA_OK && ret != LZMA_STREAM_END)
-                throw CompressionError("error while flushing xz file");
-
-            if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
-                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
-                strm.next_out = outbuf;
-                strm.avail_out = sizeof(outbuf);
-            }
-
-            if (ret == LZMA_STREAM_END) break;
-        }
+        write(nullptr, 0);
     }
 
     void write(const unsigned char * data, size_t len) override
     {
-        assert(!finished);
-
         strm.next_in = data;
         strm.avail_in = len;
 
-        while (strm.avail_in) {
+        while (!finished && (!data || strm.avail_in)) {
             checkInterrupt();
 
-            lzma_ret ret = lzma_code(&strm, LZMA_RUN);
-            if (ret != LZMA_OK)
-                throw CompressionError("error while compressing xz file");
+            lzma_ret ret = lzma_code(&strm, data ? LZMA_RUN : LZMA_FINISH);
+            if (ret != LZMA_OK && ret != LZMA_STREAM_END)
+                throw CompressionError("error %d while compressing xz file", ret);
+
+            finished = ret == LZMA_STREAM_END;
 
-            if (strm.avail_out == 0) {
-                nextSink(outbuf, sizeof(outbuf));
+            if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
                 strm.next_out = outbuf;
                 strm.avail_out = sizeof(outbuf);
             }
@@ -299,46 +302,24 @@ struct XzSink : CompressionSink
     }
 };
 
-#ifdef HAVE_LZMA_MT
-struct ParallelXzSink : public XzSink
-{
-  ParallelXzSink(Sink &nextSink) : XzSink(nextSink, [this]() {
-        lzma_mt mt_options = {};
-        mt_options.flags = 0;
-        mt_options.timeout = 300; // Using the same setting as the xz cmd line
-        mt_options.preset = LZMA_PRESET_DEFAULT;
-        mt_options.filters = NULL;
-        mt_options.check = LZMA_CHECK_CRC64;
-        mt_options.threads = lzma_cputhreads();
-        mt_options.block_size = 0;
-        if (mt_options.threads == 0)
-            mt_options.threads = 1;
-        // FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the
-        // number of threads.
-        return lzma_stream_encoder_mt(&strm, &mt_options);
-  }) {}
-};
-#endif
-
-struct BzipSink : CompressionSink
+struct BzipCompressionSink : ChunkedCompressionSink
 {
     Sink & nextSink;
-    char outbuf[BUFSIZ];
     bz_stream strm;
     bool finished = false;
 
-    BzipSink(Sink & nextSink) : nextSink(nextSink)
+    BzipCompressionSink(Sink & nextSink) : nextSink(nextSink)
     {
         memset(&strm, 0, sizeof(strm));
         int ret = BZ2_bzCompressInit(&strm, 9, 0, 30);
         if (ret != BZ_OK)
             throw CompressionError("unable to initialise bzip2 encoder");
 
-        strm.next_out = outbuf;
+        strm.next_out = (char *) outbuf;
         strm.avail_out = sizeof(outbuf);
     }
 
-    ~BzipSink()
+    ~BzipCompressionSink()
     {
         BZ2_bzCompressEnd(&strm);
     }
@@ -346,114 +327,49 @@ struct BzipSink : CompressionSink
     void finish() override
     {
         flush();
-
-        assert(!finished);
-        finished = true;
-
-        while (true) {
-            checkInterrupt();
-
-            int ret = BZ2_bzCompress(&strm, BZ_FINISH);
-            if (ret != BZ_FINISH_OK && ret != BZ_STREAM_END)
-                throw CompressionError("error while flushing bzip2 file");
-
-            if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
-                nextSink((unsigned char *) outbuf, sizeof(outbuf) - strm.avail_out);
-                strm.next_out = outbuf;
-                strm.avail_out = sizeof(outbuf);
-            }
-
-            if (ret == BZ_STREAM_END) break;
-        }
-    }
-
-    void write(const unsigned char * data, size_t len) override
-    {
-        /* Bzip2's 'avail_in' parameter is an unsigned int, so we need
-           to split the input into chunks of at most 4 GiB. */
-        while (len) {
-            auto n = std::min((size_t) std::numeric_limits<decltype(strm.avail_in)>::max(), len);
-            writeInternal(data, n);
-            data += n;
-            len -= n;
-        }
+        writeInternal(nullptr, 0);
     }
 
     void writeInternal(const unsigned char * data, size_t len)
     {
-        assert(!finished);
         assert(len <= std::numeric_limits<decltype(strm.avail_in)>::max());
 
         strm.next_in = (char *) data;
         strm.avail_in = len;
 
-        while (strm.avail_in) {
+        while (!finished && (!data || strm.avail_in)) {
             checkInterrupt();
 
-            int ret = BZ2_bzCompress(&strm, BZ_RUN);
-            if (ret != BZ_OK)
-                CompressionError("error while compressing bzip2 file");
+            int ret = BZ2_bzCompress(&strm, data ? BZ_RUN : BZ_FINISH);
+            if (ret != BZ_RUN_OK && ret != BZ_FINISH_OK && ret != BZ_STREAM_END)
+                throw CompressionError("error %d while compressing bzip2 file", ret);
 
-            if (strm.avail_out == 0) {
-                nextSink((unsigned char *) outbuf, sizeof(outbuf));
-                strm.next_out = outbuf;
+            finished = ret == BZ_STREAM_END;
+
+            if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
+                strm.next_out = (char *) outbuf;
                 strm.avail_out = sizeof(outbuf);
             }
         }
     }
 };
 
-struct LambdaCompressionSink : CompressionSink
-{
-    Sink & nextSink;
-    std::string data;
-    using CompressFnTy = std::function<std::string(const std::string&)>;
-    CompressFnTy compressFn;
-    LambdaCompressionSink(Sink& nextSink, CompressFnTy compressFn)
-        : nextSink(nextSink)
-        , compressFn(std::move(compressFn))
-    {
-    };
-
-    void finish() override
-    {
-        flush();
-        nextSink(compressFn(data));
-    }
-
-    void write(const unsigned char * data, size_t len) override
-    {
-        checkInterrupt();
-        this->data.append((const char *) data, len);
-    }
-};
-
-struct BrotliCmdSink : LambdaCompressionSink
-{
-    BrotliCmdSink(Sink& nextSink)
-        : LambdaCompressionSink(nextSink, [](const std::string& data) {
-            return runProgram(BROTLI, true, {}, data);
-        })
-    {
-    }
-};
-
-#if HAVE_BROTLI
-struct BrotliSink : CompressionSink
+struct BrotliCompressionSink : ChunkedCompressionSink
 {
     Sink & nextSink;
     uint8_t outbuf[BUFSIZ];
     BrotliEncoderState *state;
     bool finished = false;
 
-    BrotliSink(Sink & nextSink) : nextSink(nextSink)
+    BrotliCompressionSink(Sink & nextSink) : nextSink(nextSink)
     {
         state = BrotliEncoderCreateInstance(nullptr, nullptr, nullptr);
         if (!state)
             throw CompressionError("unable to initialise brotli encoder");
     }
 
-    ~BrotliSink()
+    ~BrotliCompressionSink()
     {
         BrotliEncoderDestroyInstance(state);
     }
@@ -461,94 +377,47 @@ struct BrotliSink : CompressionSink
     void finish() override
     {
         flush();
-        assert(!finished);
-
-        const uint8_t *next_in = nullptr;
-        size_t avail_in = 0;
-        uint8_t *next_out = outbuf;
-        size_t avail_out = sizeof(outbuf);
-        while (!finished) {
-            checkInterrupt();
-
-            if (!BrotliEncoderCompressStream(state,
-                        BROTLI_OPERATION_FINISH,
-                        &avail_in, &next_in,
-                        &avail_out, &next_out,
-                        nullptr))
-                throw CompressionError("error while finishing brotli file");
-
-            finished = BrotliEncoderIsFinished(state);
-            if (avail_out == 0 || finished) {
-                nextSink(outbuf, sizeof(outbuf) - avail_out);
-                next_out = outbuf;
-                avail_out = sizeof(outbuf);
-            }
-        }
-    }
-
-    void write(const unsigned char * data, size_t len) override
-    {
-        // Don't feed brotli too much at once
-        const size_t CHUNK_SIZE = sizeof(outbuf) << 2;
-        while (len) {
-          size_t n = std::min(CHUNK_SIZE, len);
-          writeInternal(data, n);
-          data += n;
-          len -= n;
-        }
+        writeInternal(nullptr, 0);
     }
 
     void writeInternal(const unsigned char * data, size_t len)
     {
-        assert(!finished);
-
-        const uint8_t *next_in = data;
+        const uint8_t * next_in = data;
         size_t avail_in = len;
-        uint8_t *next_out = outbuf;
+        uint8_t * next_out = outbuf;
         size_t avail_out = sizeof(outbuf);
 
-        while (avail_in > 0) {
+        while (!finished && (!data || avail_in)) {
             checkInterrupt();
 
             if (!BrotliEncoderCompressStream(state,
-                      BROTLI_OPERATION_PROCESS,
-                      &avail_in, &next_in,
-                      &avail_out, &next_out,
-                      nullptr))
-                throw CompressionError("error while compressing brotli file");
+                    data ? BROTLI_OPERATION_PROCESS : BROTLI_OPERATION_FINISH,
+                    &avail_in, &next_in,
+                    &avail_out, &next_out,
+                    nullptr))
+                throw CompressionError("error while compressing brotli compression");
 
             if (avail_out < sizeof(outbuf) || avail_in == 0) {
                 nextSink(outbuf, sizeof(outbuf) - avail_out);
                 next_out = outbuf;
                 avail_out = sizeof(outbuf);
             }
+
+            finished = BrotliEncoderIsFinished(state);
         }
     }
 };
-#endif // HAVE_BROTLI
 
 ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel)
 {
-    if (parallel) {
-#ifdef HAVE_LZMA_MT
-        if (method == "xz")
-            return make_ref<ParallelXzSink>(nextSink);
-#endif
-        printMsg(lvlError, format("Warning: parallel compression requested but not supported for method '%1%', falling back to single-threaded compression") % method);
-    }
-
     if (method == "none")
         return make_ref<NoneSink>(nextSink);
     else if (method == "xz")
-        return make_ref<XzSink>(nextSink);
+        return make_ref<XzCompressionSink>(nextSink, parallel);
     else if (method == "bzip2")
-        return make_ref<BzipSink>(nextSink);
+        return make_ref<BzipCompressionSink>(nextSink);
     else if (method == "br")
-#if HAVE_BROTLI
-        return make_ref<BrotliSink>(nextSink);
-#else
-        return make_ref<BrotliCmdSink>(nextSink);
-#endif
+        return make_ref<BrotliCompressionSink>(nextSink);
     else
         throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
 }
diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh
index f7a3e3fbd32e..dd666a4e19fd 100644
--- a/src/libutil/compression.hh
+++ b/src/libutil/compression.hh
@@ -8,17 +8,17 @@
 
 namespace nix {
 
-ref<std::string> decompress(const std::string & method, const std::string & in);
-
-void decompress(const std::string & method, Source & source, Sink & sink);
-
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
-
 struct CompressionSink : BufferedSink
 {
     virtual void finish() = 0;
 };
 
+ref<std::string> decompress(const std::string & method, const std::string & in);
+
+ref<CompressionSink> makeDecompressionSink(const std::string & method, Sink & nextSink);
+
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
+
 ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel = false);
 
 MakeError(UnknownCompressionMethod, Error);
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index 21803edd056a..b2c49d911b34 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -157,6 +157,10 @@ size_t StringSource::read(unsigned char * data, size_t len)
 }
 
 
+#if BOOST_VERSION >= 106300 && BOOST_VERSION < 106600
+#error Coroutines are broken in this version of Boost!
+#endif
+
 std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun)
 {
     struct SinkToSource : Source
diff --git a/src/nix-build/nix-build.cc b/src/nix-build/nix-build.cc
index de0e9118fd21..34f1cba9ddac 100755
--- a/src/nix-build/nix-build.cc
+++ b/src/nix-build/nix-build.cc
@@ -85,7 +85,6 @@ void mainWrapped(int argc, char * * argv)
     BuildMode buildMode = bmNormal;
     bool readStdin = false;
 
-    auto shell = getEnv("SHELL", "/bin/sh");
     std::string envCommand; // interactive shell
     Strings envExclude;
 
@@ -99,6 +98,9 @@ void mainWrapped(int argc, char * * argv)
 
     std::string outLink = "./result";
 
+    // List of environment variables kept for --pure
+    std::set<string> keepVars{"HOME", "USER", "LOGNAME", "DISPLAY", "PATH", "TERM", "IN_NIX_SHELL", "TZ", "PAGER", "NIX_BUILD_SHELL", "SHLVL"};
+
     Strings args;
     for (int i = 1; i < argc; ++i)
         args.push_back(argv[i]);
@@ -218,6 +220,9 @@ void mainWrapped(int argc, char * * argv)
             }
         }
 
+        else if (*arg == "--keep")
+            keepVars.insert(getArg(*arg, arg, end));
+
         else if (*arg == "-")
             readStdin = true;
 
@@ -368,7 +373,6 @@ void mainWrapped(int argc, char * * argv)
         auto tmp = getEnv("TMPDIR", getEnv("XDG_RUNTIME_DIR", "/tmp"));
 
         if (pure) {
-            std::set<string> keepVars{"HOME", "USER", "LOGNAME", "DISPLAY", "PATH", "TERM", "IN_NIX_SHELL", "TZ", "PAGER", "NIX_BUILD_SHELL", "SHLVL"};
             decltype(env) newEnv;
             for (auto & i : env)
                 if (keepVars.count(i.first))
diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc
index a2e54b93c05f..644fa6681de3 100644
--- a/src/nix-daemon/nix-daemon.cc
+++ b/src/nix-daemon/nix-daemon.cc
@@ -707,6 +707,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
 
         logger->startWork();
 
+        // FIXME: race if addToStore doesn't read source?
         store.cast<Store>()->addToStore(info, *source, (RepairFlag) repair,
             dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr);
 
diff --git a/src/nix-prefetch-url/nix-prefetch-url.cc b/src/nix-prefetch-url/nix-prefetch-url.cc
index 50b2c2803ec9..a3b025723cf1 100644
--- a/src/nix-prefetch-url/nix-prefetch-url.cc
+++ b/src/nix-prefetch-url/nix-prefetch-url.cc
@@ -9,6 +9,10 @@
 
 #include <iostream>
 
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
 using namespace nix;
 
 
@@ -160,14 +164,20 @@ int main(int argc, char * * argv)
 
             auto actualUri = resolveMirrorUri(*state, uri);
 
-            /* Download the file. */
-            DownloadRequest req(actualUri);
-            req.decompress = false;
-            auto result = getDownloader()->download(req);
-
             AutoDelete tmpDir(createTempDir(), true);
             Path tmpFile = (Path) tmpDir + "/tmp";
-            writeFile(tmpFile, *result.data);
+
+            /* Download the file. */
+            {
+                AutoCloseFD fd = open(tmpFile.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0600);
+                if (!fd) throw SysError("creating temporary file '%s'", tmpFile);
+
+                FdSink sink(fd.get());
+
+                DownloadRequest req(actualUri);
+                req.decompress = false;
+                getDownloader()->download(std::move(req), sink);
+            }
 
             /* Optionally unpack the file. */
             if (unpack) {
@@ -191,7 +201,7 @@ int main(int argc, char * * argv)
 
             /* FIXME: inefficient; addToStore() will also hash
                this. */
-            hash = unpack ? hashPath(ht, tmpFile).first : hashString(ht, *result.data);
+            hash = unpack ? hashPath(ht, tmpFile).first : hashFile(ht, tmpFile);
 
             if (expectedHash != Hash(ht) && expectedHash != hash)
                 throw Error(format("hash mismatch for '%1%'") % uri);
diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc
index e1e27ceef94d..fe68f681ae28 100644
--- a/src/nix-store/nix-store.cc
+++ b/src/nix-store/nix-store.cc
@@ -860,7 +860,7 @@ static void opServe(Strings opFlags, Strings opArgs)
             }
 
             case cmdDumpStorePath:
-                dumpPath(readStorePath(*store, in), out);
+                store->narFromPath(readStorePath(*store, in), out);
                 break;
 
             case cmdImportPaths: {
@@ -924,6 +924,28 @@ static void opServe(Strings opFlags, Strings opArgs)
                 break;
             }
 
+            case cmdAddToStoreNar: {
+                if (!writeAllowed) throw Error("importing paths is not allowed");
+
+                ValidPathInfo info;
+                info.path = readStorePath(*store, in);
+                in >> info.deriver;
+                if (!info.deriver.empty())
+                    store->assertStorePath(info.deriver);
+                info.narHash = Hash(readString(in), htSHA256);
+                info.references = readStorePaths<PathSet>(*store, in);
+                in >> info.registrationTime >> info.narSize >> info.ultimate;
+                info.sigs = readStrings<StringSet>(in);
+                in >> info.ca;
+
+                // FIXME: race if addToStore doesn't read source?
+                store->addToStore(info, in, NoRepair, NoCheckSigs);
+
+                out << 1; // indicate success
+
+                break;
+            }
+
             default:
                 throw Error(format("unknown serve command %1%") % cmd);
         }
diff --git a/src/nix/copy.cc b/src/nix/copy.cc
index e4e6c3e303ed..91711c8b46da 100644
--- a/src/nix/copy.cc
+++ b/src/nix/copy.cc
@@ -72,6 +72,10 @@ struct CmdCopy : StorePathsCommand
                 "To populate the current folder build output to a S3 binary cache:",
                 "nix copy --to s3://my-bucket?region=eu-west-1"
             },
+            Example{
+                "To populate the current folder build output to an S3-compatible binary cache:",
+                "nix copy --to s3://my-bucket?region=eu-west-1&endpoint=example.com"
+            },
 #endif
         };
     }