about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libexpr/eval.cc170
-rw-r--r--src/libexpr/lexer.l8
-rw-r--r--src/libexpr/primops.cc16
-rw-r--r--src/libexpr/primops/fetchGit.cc2
-rw-r--r--src/libexpr/primops/fetchMercurial.cc2
-rw-r--r--src/libexpr/value.hh2
-rw-r--r--src/libstore/binary-cache-store.cc23
-rw-r--r--src/libstore/build.cc15
-rw-r--r--src/libstore/builtins/fetchurl.cc22
-rw-r--r--src/libstore/download.cc89
-rw-r--r--src/libstore/download.hh4
-rw-r--r--src/libstore/http-binary-cache-store.cc41
-rw-r--r--src/libstore/legacy-ssh-store.cc62
-rw-r--r--src/libstore/local-store.cc8
-rw-r--r--src/libstore/s3-binary-cache-store.cc105
-rw-r--r--src/libstore/s3.hh4
-rw-r--r--src/libstore/serve-protocol.hh3
-rw-r--r--src/libstore/ssh.cc22
-rw-r--r--src/libstore/ssh.hh1
-rw-r--r--src/libstore/store-api.cc22
-rw-r--r--src/libstore/store-api.hh1
-rw-r--r--src/libutil/compression.cc613
-rw-r--r--src/libutil/compression.hh12
-rw-r--r--src/libutil/serialise.cc19
-rw-r--r--src/libutil/serialise.hh6
-rw-r--r--src/libutil/util.cc4
-rwxr-xr-xsrc/nix-build/nix-build.cc11
-rwxr-xr-xsrc/nix-channel/nix-channel.cc2
-rw-r--r--src/nix-daemon/nix-daemon.cc1
-rw-r--r--src/nix-env/nix-env.cc8
-rw-r--r--src/nix-store/nix-store.cc24
-rw-r--r--src/nix/copy.cc4
-rw-r--r--src/nix/installables.cc2
-rw-r--r--src/nix/main.cc1
-rw-r--r--src/nix/path-info.cc36
-rw-r--r--src/nix/progress-bar.cc5
-rw-r--r--src/nix/repl.cc12
-rw-r--r--src/nix/run.cc26
-rw-r--r--src/nix/upgrade-nix.cc39
39 files changed, 811 insertions, 636 deletions
diff --git a/src/libexpr/eval.cc b/src/libexpr/eval.cc
index 2c8a0eb422fc..ab407e56907c 100644
--- a/src/libexpr/eval.cc
+++ b/src/libexpr/eval.cc
@@ -6,12 +6,15 @@
 #include "globals.hh"
 #include "eval-inline.hh"
 #include "download.hh"
+#include "json.hh"
 
 #include <algorithm>
 #include <cstring>
 #include <unistd.h>
 #include <sys/time.h>
 #include <sys/resource.h>
+#include <iostream>
+#include <fstream>
 
 #include <sys/time.h>
 #include <sys/resource.h>
@@ -23,7 +26,6 @@
 
 #endif
 
-
 namespace nix {
 
 
@@ -349,19 +351,25 @@ Path EvalState::checkSourcePath(const Path & path_)
 
     bool found = false;
 
+    /* First canonicalize the path without symlinks, so we make sure an
+     * attacker can't append ../../... to a path that would be in allowedPaths
+     * and thus leak symlink targets.
+     */
+    Path abspath = canonPath(path_);
+
     for (auto & i : *allowedPaths) {
-        if (isDirOrInDir(path_, i)) {
+        if (isDirOrInDir(abspath, i)) {
             found = true;
             break;
         }
     }
 
     if (!found)
-        throw RestrictedPathError("access to path '%1%' is forbidden in restricted mode", path_);
+        throw RestrictedPathError("access to path '%1%' is forbidden in restricted mode", abspath);
 
     /* Resolve symlinks. */
-    debug(format("checking access to '%s'") % path_);
-    Path path = canonPath(path_, true);
+    debug(format("checking access to '%s'") % abspath);
+    Path path = canonPath(abspath, true);
 
     for (auto & i : *allowedPaths) {
         if (isDirOrInDir(path, i)) {
@@ -1076,6 +1084,8 @@ void EvalState::callPrimOp(Value & fun, Value & arg, Value & v, const Pos & pos)
 
 void EvalState::callFunction(Value & fun, Value & arg, Value & v, const Pos & pos)
 {
+    forceValue(fun, pos);
+
     if (fun.type == tPrimOp || fun.type == tPrimOpApp) {
         callPrimOp(fun, arg, v, pos);
         return;
@@ -1091,10 +1101,8 @@ void EvalState::callFunction(Value & fun, Value & arg, Value & v, const Pos & po
         auto & fun2 = *allocValue();
         fun2 = fun;
         /* !!! Should we use the attr pos here? */
-        forceValue(*found->value, pos);
         Value v2;
         callFunction(*found->value, fun2, v2, pos);
-        forceValue(v2, pos);
         return callFunction(v2, arg, v, pos);
       }
     }
@@ -1181,7 +1189,6 @@ void EvalState::autoCallFunction(Bindings & args, Value & fun, Value & res)
     if (fun.type == tAttrs) {
         auto found = fun.attrs->find(sFunctor);
         if (found != fun.attrs->end()) {
-            forceValue(*found->value);
             Value * v = allocValue();
             callFunction(*found->value, fun, *v, noPos);
             forceValue(*v);
@@ -1565,7 +1572,6 @@ string EvalState::coerceToString(const Pos & pos, Value & v, PathSet & context,
     if (v.type == tAttrs) {
         auto i = v.attrs->find(sToString);
         if (i != v.attrs->end()) {
-            forceValue(*i->value, pos);
             Value v1;
             callFunction(*i->value, v, v1, pos);
             return coerceToString(pos, v1, context, coerceMore, copyToStore);
@@ -1719,12 +1725,9 @@ bool EvalState::eqValues(Value & v1, Value & v2)
     }
 }
 
-
 void EvalState::printStats()
 {
     bool showStats = getEnv("NIX_SHOW_STATS", "0") != "0";
-    Verbosity v = showStats ? lvlInfo : lvlDebug;
-    printMsg(v, "evaluation statistics:");
 
     struct rusage buf;
     getrusage(RUSAGE_SELF, &buf);
@@ -1735,62 +1738,101 @@ void EvalState::printStats()
     uint64_t bValues = nrValues * sizeof(Value);
     uint64_t bAttrsets = nrAttrsets * sizeof(Bindings) + nrAttrsInAttrsets * sizeof(Attr);
 
-    printMsg(v, format("  time elapsed: %1%") % cpuTime);
-    printMsg(v, format("  size of a value: %1%") % sizeof(Value));
-    printMsg(v, format("  size of an attr: %1%") % sizeof(Attr));
-    printMsg(v, format("  environments allocated count: %1%") % nrEnvs);
-    printMsg(v, format("  environments allocated bytes: %1%") % bEnvs);
-    printMsg(v, format("  list elements count: %1%") % nrListElems);
-    printMsg(v, format("  list elements bytes: %1%") % bLists);
-    printMsg(v, format("  list concatenations: %1%") % nrListConcats);
-    printMsg(v, format("  values allocated count: %1%") % nrValues);
-    printMsg(v, format("  values allocated bytes: %1%") % bValues);
-    printMsg(v, format("  sets allocated: %1% (%2% bytes)") % nrAttrsets % bAttrsets);
-    printMsg(v, format("  right-biased unions: %1%") % nrOpUpdates);
-    printMsg(v, format("  values copied in right-biased unions: %1%") % nrOpUpdateValuesCopied);
-    printMsg(v, format("  symbols in symbol table: %1%") % symbols.size());
-    printMsg(v, format("  size of symbol table: %1%") % symbols.totalSize());
-    printMsg(v, format("  number of thunks: %1%") % nrThunks);
-    printMsg(v, format("  number of thunks avoided: %1%") % nrAvoided);
-    printMsg(v, format("  number of attr lookups: %1%") % nrLookups);
-    printMsg(v, format("  number of primop calls: %1%") % nrPrimOpCalls);
-    printMsg(v, format("  number of function calls: %1%") % nrFunctionCalls);
-    printMsg(v, format("  total allocations: %1% bytes") % (bEnvs + bLists + bValues + bAttrsets));
-
 #if HAVE_BOEHMGC
     GC_word heapSize, totalBytes;
     GC_get_heap_usage_safe(&heapSize, 0, 0, 0, &totalBytes);
-    printMsg(v, format("  current Boehm heap size: %1% bytes") % heapSize);
-    printMsg(v, format("  total Boehm heap allocations: %1% bytes") % totalBytes);
 #endif
-
-    if (countCalls) {
-        v = lvlInfo;
-
-        printMsg(v, format("calls to %1% primops:") % primOpCalls.size());
-        typedef std::multimap<size_t, Symbol> PrimOpCalls_;
-        PrimOpCalls_ primOpCalls_;
-        for (auto & i : primOpCalls)
-            primOpCalls_.insert(std::pair<size_t, Symbol>(i.second, i.first));
-        for (auto i = primOpCalls_.rbegin(); i != primOpCalls_.rend(); ++i)
-            printMsg(v, format("%1$10d %2%") % i->first % i->second);
-
-        printMsg(v, format("calls to %1% functions:") % functionCalls.size());
-        typedef std::multimap<size_t, ExprLambda *> FunctionCalls_;
-        FunctionCalls_ functionCalls_;
-        for (auto & i : functionCalls)
-            functionCalls_.insert(std::pair<size_t, ExprLambda *>(i.second, i.first));
-        for (auto i = functionCalls_.rbegin(); i != functionCalls_.rend(); ++i)
-            printMsg(v, format("%1$10d %2%") % i->first % i->second->showNamePos());
-
-        printMsg(v, format("evaluations of %1% attributes:") % attrSelects.size());
-        typedef std::multimap<size_t, Pos> AttrSelects_;
-        AttrSelects_ attrSelects_;
-        for (auto & i : attrSelects)
-            attrSelects_.insert(std::pair<size_t, Pos>(i.second, i.first));
-        for (auto i = attrSelects_.rbegin(); i != attrSelects_.rend(); ++i)
-            printMsg(v, format("%1$10d %2%") % i->first % i->second);
-
+    if (showStats) {
+        auto outPath = getEnv("NIX_SHOW_STATS_PATH","-");
+        std::fstream fs;
+        if (outPath != "-")
+            fs.open(outPath, std::fstream::out);
+        JSONObject topObj(outPath == "-" ? std::cerr : fs, true);
+        topObj.attr("cpuTime",cpuTime);
+        {
+            auto envs = topObj.object("envs");
+            envs.attr("number", nrEnvs);
+            envs.attr("elements", nrValuesInEnvs);
+            envs.attr("bytes", bEnvs);
+        }
+        {
+            auto lists = topObj.object("list");
+            lists.attr("elements", nrListElems);
+            lists.attr("bytes", bLists);
+            lists.attr("concats", nrListConcats);
+        }
+        {
+            auto values = topObj.object("values");
+            values.attr("number", nrValues);
+            values.attr("bytes", bValues);
+        }
+        {
+            auto syms = topObj.object("symbols");
+            syms.attr("number", symbols.size());
+            syms.attr("bytes", symbols.totalSize());
+        }
+        {
+            auto sets = topObj.object("sets");
+            sets.attr("number", nrAttrsets);
+            sets.attr("bytes", bAttrsets);
+            sets.attr("elements", nrAttrsInAttrsets);
+        }
+        {
+            auto sizes = topObj.object("sizes");
+            sizes.attr("Env", sizeof(Env));
+            sizes.attr("Value", sizeof(Value));
+            sizes.attr("Bindings", sizeof(Bindings));
+            sizes.attr("Attr", sizeof(Attr));
+        }
+        topObj.attr("nrOpUpdates", nrOpUpdates);
+        topObj.attr("nrOpUpdateValuesCopied", nrOpUpdateValuesCopied);
+        topObj.attr("nrThunks", nrThunks);
+        topObj.attr("nrAvoided", nrAvoided);
+        topObj.attr("nrLookups", nrLookups);
+        topObj.attr("nrPrimOpCalls", nrPrimOpCalls);
+        topObj.attr("nrFunctionCalls", nrFunctionCalls);
+#if HAVE_BOEHMGC
+        {
+            auto gc = topObj.object("gc");
+            gc.attr("heapSize", heapSize);
+            gc.attr("totalBytes", totalBytes);
+        }
+#endif
+        if (countCalls) {
+            {
+                auto obj = topObj.object("primops");
+                for (auto & i : primOpCalls)
+                    obj.attr(i.first, i.second);
+            }
+            {
+                auto list = topObj.list("functions");
+                for (auto & i : functionCalls) {
+                    auto obj = list.object();
+                    if (i.first->name.set())
+                        obj.attr("name", (const string &) i.first->name);
+                    else
+                        obj.attr("name", nullptr);
+                    if (i.first->pos) {
+                        obj.attr("file", (const string &) i.first->pos.file);
+                        obj.attr("line", i.first->pos.line);
+                        obj.attr("column", i.first->pos.column);
+                    }
+                    obj.attr("count", i.second);
+                }
+            }
+            {
+                auto list = topObj.list("attributes");
+                for (auto & i : attrSelects) {
+                    auto obj = list.object();
+                    if (i.first) {
+                        obj.attr("file", (const string &) i.first.file);
+                        obj.attr("line", i.first.line);
+                        obj.attr("column", i.first.column);
+                    }
+                    obj.attr("count", i.second);
+                }
+            }
+        }
     }
 }
 
diff --git a/src/libexpr/lexer.l b/src/libexpr/lexer.l
index 29ca327c1e4e..a052447d3dce 100644
--- a/src/libexpr/lexer.l
+++ b/src/libexpr/lexer.l
@@ -12,6 +12,8 @@
 
 
 %{
+#include <boost/lexical_cast.hpp>
+
 #include "nixexpr.hh"
 #include "parser-tab.hh"
 
@@ -124,9 +126,11 @@ or          { return OR_KW; }
 
 {ID}        { yylval->id = strdup(yytext); return ID; }
 {INT}       { errno = 0;
-              yylval->n = strtol(yytext, 0, 10);
-              if (errno != 0)
+              try {
+                  yylval->n = boost::lexical_cast<int64_t>(yytext);
+              } catch (const boost::bad_lexical_cast &) {
                   throw ParseError(format("invalid integer '%1%'") % yytext);
+              }
               return INT;
             }
 {FLOAT}     { errno = 0;
diff --git a/src/libexpr/primops.cc b/src/libexpr/primops.cc
index 311a32b0b30f..7372134e2c9b 100644
--- a/src/libexpr/primops.cc
+++ b/src/libexpr/primops.cc
@@ -866,7 +866,7 @@ static void prim_baseNameOf(EvalState & state, const Pos & pos, Value * * args,
 static void prim_dirOf(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
     PathSet context;
-    Path dir = dirOf(state.coerceToPath(pos, *args[0], context));
+    Path dir = dirOf(state.coerceToString(pos, *args[0], context, false, false));
     if (args[0]->type == tPath) mkPath(v, dir.c_str()); else mkString(v, dir, context);
 }
 
@@ -1359,7 +1359,6 @@ static void prim_functionArgs(EvalState & state, const Pos & pos, Value * * args
 /* Apply a function to every element of an attribute set. */
 static void prim_mapAttrs(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
-    state.forceFunction(*args[0], pos);
     state.forceAttrs(*args[1], pos);
 
     state.mkAttrs(v, args[1]->attrs->size());
@@ -1368,7 +1367,7 @@ static void prim_mapAttrs(EvalState & state, const Pos & pos, Value * * args, Va
         Value * vName = state.allocValue();
         Value * vFun2 = state.allocValue();
         mkString(*vName, i.name);
-        state.callFunction(*args[0], *vName, *vFun2, pos);
+        mkApp(*vFun2, *args[0], *vName);
         mkApp(*state.allocAttr(v, i.name), *vFun2, *i.value);
     }
 }
@@ -1429,7 +1428,6 @@ static void prim_tail(EvalState & state, const Pos & pos, Value * * args, Value
 /* Apply a function to every element of a list. */
 static void prim_map(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
-    state.forceFunction(*args[0], pos);
     state.forceList(*args[1], pos);
 
     state.mkList(v, args[1]->listSize());
@@ -1558,7 +1556,6 @@ static void prim_all(EvalState & state, const Pos & pos, Value * * args, Value &
 
 static void prim_genList(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
-    state.forceFunction(*args[0], pos);
     auto len = state.forceInt(*args[1], pos);
 
     if (len < 0)
@@ -1683,6 +1680,8 @@ static void prim_concatMap(EvalState & state, const Pos & pos, Value * * args, V
 
 static void prim_add(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
+    state.forceValue(*args[0], pos);
+    state.forceValue(*args[1], pos);
     if (args[0]->type == tFloat || args[1]->type == tFloat)
         mkFloat(v, state.forceFloat(*args[0], pos) + state.forceFloat(*args[1], pos));
     else
@@ -1692,6 +1691,8 @@ static void prim_add(EvalState & state, const Pos & pos, Value * * args, Value &
 
 static void prim_sub(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
+    state.forceValue(*args[0], pos);
+    state.forceValue(*args[1], pos);
     if (args[0]->type == tFloat || args[1]->type == tFloat)
         mkFloat(v, state.forceFloat(*args[0], pos) - state.forceFloat(*args[1], pos));
     else
@@ -1701,6 +1702,8 @@ static void prim_sub(EvalState & state, const Pos & pos, Value * * args, Value &
 
 static void prim_mul(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
+    state.forceValue(*args[0], pos);
+    state.forceValue(*args[1], pos);
     if (args[0]->type == tFloat || args[1]->type == tFloat)
         mkFloat(v, state.forceFloat(*args[0], pos) * state.forceFloat(*args[1], pos));
     else
@@ -1710,6 +1713,9 @@ static void prim_mul(EvalState & state, const Pos & pos, Value * * args, Value &
 
 static void prim_div(EvalState & state, const Pos & pos, Value * * args, Value & v)
 {
+    state.forceValue(*args[0], pos);
+    state.forceValue(*args[1], pos);
+
     NixFloat f2 = state.forceFloat(*args[1], pos);
     if (f2 == 0) throw EvalError(format("division by zero, at %1%") % pos);
 
diff --git a/src/libexpr/primops/fetchGit.cc b/src/libexpr/primops/fetchGit.cc
index 7aa98e0bfab3..0c6539959bf6 100644
--- a/src/libexpr/primops/fetchGit.cc
+++ b/src/libexpr/primops/fetchGit.cc
@@ -219,8 +219,6 @@ static void prim_fetchGit(EvalState & state, const Pos & pos, Value * * args, Va
     } else
         url = state.coerceToString(pos, *args[0], context, false, false);
 
-    if (!isUri(url)) url = absPath(url);
-
     // FIXME: git externals probably can be used to bypass the URI
     // whitelist. Ah well.
     state.checkURI(url);
diff --git a/src/libexpr/primops/fetchMercurial.cc b/src/libexpr/primops/fetchMercurial.cc
index 9d35f6d0d6d7..97cda2458c9b 100644
--- a/src/libexpr/primops/fetchMercurial.cc
+++ b/src/libexpr/primops/fetchMercurial.cc
@@ -184,8 +184,6 @@ static void prim_fetchMercurial(EvalState & state, const Pos & pos, Value * * ar
     } else
         url = state.coerceToString(pos, *args[0], context, false, false);
 
-    if (!isUri(url)) url = absPath(url);
-
     // FIXME: git externals probably can be used to bypass the URI
     // whitelist. Ah well.
     state.checkURI(url);
diff --git a/src/libexpr/value.hh b/src/libexpr/value.hh
index 809772f7c084..e1ec87d3b84c 100644
--- a/src/libexpr/value.hh
+++ b/src/libexpr/value.hh
@@ -43,7 +43,7 @@ class XMLWriter;
 class JSONPlaceholder;
 
 
-typedef long NixInt;
+typedef int64_t NixInt;
 typedef double NixFloat;
 
 /* External values must descend from ExternalValueBase, so that
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index 76c0a1a891b8..4527ee6ba660 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -217,17 +217,6 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
 {
     auto info = queryPathInfo(storePath).cast<const NarInfo>();
 
-    auto source = sinkToSource([this, url{info->url}](Sink & sink) {
-        try {
-            getFile(url, sink);
-        } catch (NoSuchBinaryCacheFile & e) {
-            throw SubstituteGone(e.what());
-        }
-    });
-
-    stats.narRead++;
-    //stats.narReadCompressedBytes += nar->size(); // FIXME
-
     uint64_t narSize = 0;
 
     LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
@@ -235,8 +224,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
         narSize += len;
     });
 
-    decompress(info->compression, *source, wrapperSink);
+    auto decompressor = makeDecompressionSink(info->compression, wrapperSink);
 
+    try {
+        getFile(info->url, *decompressor);
+    } catch (NoSuchBinaryCacheFile & e) {
+        throw SubstituteGone(e.what());
+    }
+
+    decompressor->finish();
+
+    stats.narRead++;
+    //stats.narReadCompressedBytes += nar->size(); // FIXME
     stats.narReadBytes += narSize;
 }
 
diff --git a/src/libstore/build.cc b/src/libstore/build.cc
index d75ca0be86ef..1402bd097c35 100644
--- a/src/libstore/build.cc
+++ b/src/libstore/build.cc
@@ -2007,7 +2007,7 @@ void DerivationGoal::startBuilder()
 
         /* Create /etc/hosts with localhost entry. */
         if (!fixedOutput)
-            writeFile(chrootRootDir + "/etc/hosts", "127.0.0.1 localhost\n");
+            writeFile(chrootRootDir + "/etc/hosts", "127.0.0.1 localhost\n::1 localhost\n");
 
         /* Make the closure of the inputs available in the chroot,
            rather than the whole Nix store.  This prevents any access
@@ -3682,6 +3682,19 @@ void SubstitutionGoal::tryNext()
     } catch (InvalidPath &) {
         tryNext();
         return;
+    } catch (SubstituterDisabled &) {
+        if (settings.tryFallback) {
+            tryNext();
+            return;
+        }
+        throw;
+    } catch (Error & e) {
+        if (settings.tryFallback) {
+            printError(e.what());
+            tryNext();
+            return;
+        }
+        throw;
     }
 
     /* Update the total expected download size. */
diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc
index 1f4abd374f54..92aec63a0379 100644
--- a/src/libstore/builtins/fetchurl.cc
+++ b/src/libstore/builtins/fetchurl.cc
@@ -24,6 +24,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
 
     Path storePath = getAttr("out");
     auto mainUrl = getAttr("url");
+    bool unpack = get(drv.env, "unpack", "") == "1";
 
     /* Note: have to use a fresh downloader here because we're in
        a forked process. */
@@ -39,21 +40,16 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
             request.verifyTLS = false;
             request.decompress = false;
 
-            downloader->download(std::move(request), sink);
+            auto decompressor = makeDecompressionSink(
+                unpack && hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink);
+            downloader->download(std::move(request), *decompressor);
+            decompressor->finish();
         });
 
-        if (get(drv.env, "unpack", "") == "1") {
-
-            if (hasSuffix(mainUrl, ".xz")) {
-                auto source2 = sinkToSource([&](Sink & sink) {
-                    decompress("xz", *source, sink);
-                });
-                restorePath(storePath, *source2);
-            } else
-                restorePath(storePath, *source);
-
-        } else
-              writeFile(storePath, *source);
+        if (unpack)
+            restorePath(storePath, *source);
+        else
+            writeFile(storePath, *source);
 
         auto executable = drv.env.find("executable");
         if (executable != drv.env.end() && executable->second == "1") {
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 07acd5d0e004..13913d031daa 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -58,16 +58,6 @@ std::string resolveUri(const std::string & uri)
         return uri;
 }
 
-ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data)
-{
-    if (encoding == "")
-        return data;
-    else if (encoding == "br")
-        return decompress(encoding, *data);
-    else
-        throw Error("unsupported Content-Encoding '%s'", encoding);
-}
-
 struct CurlDownloader : public Downloader
 {
     CURLM * curlm = 0;
@@ -106,6 +96,12 @@ struct CurlDownloader : public Downloader
                 fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
                 {request.uri}, request.parentAct)
             , callback(callback)
+            , finalSink([this](const unsigned char * data, size_t len) {
+                if (this->request.dataCallback)
+                    this->request.dataCallback((char *) data, len);
+                else
+                    this->result.data->append((char *) data, len);
+              })
         {
             if (!request.expectedETag.empty())
                 requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
@@ -129,22 +125,40 @@ struct CurlDownloader : public Downloader
             }
         }
 
-        template<class T>
-        void fail(const T & e)
+        void failEx(std::exception_ptr ex)
         {
             assert(!done);
             done = true;
-            callback.rethrow(std::make_exception_ptr(e));
+            callback.rethrow(ex);
         }
 
+        template<class T>
+        void fail(const T & e)
+        {
+            failEx(std::make_exception_ptr(e));
+        }
+
+        LambdaSink finalSink;
+        std::shared_ptr<CompressionSink> decompressionSink;
+
+        std::exception_ptr writeException;
+
         size_t writeCallback(void * contents, size_t size, size_t nmemb)
         {
-            size_t realSize = size * nmemb;
-            if (request.dataCallback)
-                request.dataCallback((char *) contents, realSize);
-            else
-                result.data->append((char *) contents, realSize);
-            return realSize;
+            try {
+                size_t realSize = size * nmemb;
+                result.bodySize += realSize;
+
+                if (!decompressionSink)
+                    decompressionSink = makeDecompressionSink(encoding, finalSink);
+
+                (*decompressionSink)((unsigned char *) contents, realSize);
+
+                return realSize;
+            } catch (...) {
+                writeException = std::current_exception();
+                return 0;
+            }
         }
 
         static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
@@ -162,6 +176,7 @@ struct CurlDownloader : public Downloader
                 auto ss = tokenizeString<vector<string>>(line, " ");
                 status = ss.size() >= 2 ? ss[1] : "";
                 result.data = std::make_shared<std::string>();
+                result.bodySize = 0;
                 encoding = "";
             } else {
                 auto i = line.find(':');
@@ -296,6 +311,7 @@ struct CurlDownloader : public Downloader
             curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
 
             result.data = std::make_shared<std::string>();
+            result.bodySize = 0;
         }
 
         void finish(CURLcode code)
@@ -309,29 +325,35 @@ struct CurlDownloader : public Downloader
                 result.effectiveUrl = effectiveUrlCStr;
 
             debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
-                request.verb(), request.uri, code, httpStatus, result.data ? result.data->size() : 0);
+                request.verb(), request.uri, code, httpStatus, result.bodySize);
+
+            if (decompressionSink)
+                decompressionSink->finish();
 
             if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
                 code = CURLE_OK;
                 httpStatus = 304;
             }
 
-            if (code == CURLE_OK &&
+            if (writeException)
+                failEx(writeException);
+
+            else if (code == CURLE_OK &&
                 (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
             {
                 result.cached = httpStatus == 304;
                 done = true;
 
                 try {
-                    if (request.decompress)
-                        result.data = decodeContent(encoding, ref<std::string>(result.data));
-                    act.progress(result.data->size(), result.data->size());
+                    act.progress(result.bodySize, result.bodySize);
                     callback(std::move(result));
                 } catch (...) {
                     done = true;
                     callback.rethrow();
                 }
-            } else {
+            }
+
+            else {
                 // We treat most errors as transient, but won't retry when hopeless
                 Error err = Transient;
 
@@ -366,6 +388,7 @@ struct CurlDownloader : public Downloader
                         case CURLE_UNKNOWN_OPTION:
                         case CURLE_SSL_CACERT_BADFILE:
                         case CURLE_TOO_MANY_REDIRECTS:
+                        case CURLE_WRITE_ERROR:
                             err = Misc;
                             break;
                         default: // Shut up warnings
@@ -598,7 +621,7 @@ struct CurlDownloader : public Downloader
             // FIXME: do this on a worker thread
             try {
 #ifdef ENABLE_S3
-                S3Helper s3Helper("", Aws::Region::US_EAST_1); // FIXME: make configurable
+                S3Helper s3Helper("", Aws::Region::US_EAST_1, ""); // FIXME: make configurable
                 auto slash = request.uri.find('/', 5);
                 if (slash == std::string::npos)
                     throw nix::Error("bad S3 URI '%s'", request.uri);
@@ -718,15 +741,17 @@ void Downloader::download(DownloadRequest && request, Sink & sink)
     while (true) {
         checkInterrupt();
 
-        if (state->quit) {
-            if (state->exc) std::rethrow_exception(state->exc);
-            break;
-        }
-
         /* If no data is available, then wait for the download thread
            to wake us up. */
-        if (state->data.empty())
+        if (state->data.empty()) {
+
+            if (state->quit) {
+                if (state->exc) std::rethrow_exception(state->exc);
+                break;
+            }
+
             state.wait(state->avail);
+        }
 
         /* If data is available, then flush it to the sink and wake up
            the download thread if it's blocked on a full buffer. */
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index da55df7a6e71..f0228f7d053a 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -38,6 +38,7 @@ struct DownloadResult
     std::string etag;
     std::string effectiveUrl;
     std::shared_ptr<std::string> data;
+    uint64_t bodySize = 0;
 };
 
 class Store;
@@ -87,7 +88,4 @@ public:
 
 bool isUri(const string & s);
 
-/* Decode data according to the Content-Encoding header. */
-ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data);
-
 }
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index ab524d523cf2..8da0e2f9d82a 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -13,6 +13,14 @@ private:
 
     Path cacheUri;
 
+    struct State
+    {
+        bool enabled = true;
+        std::chrono::steady_clock::time_point disabledUntil;
+    };
+
+    Sync<State> _state;
+
 public:
 
     HttpBinaryCacheStore(
@@ -46,8 +54,33 @@ public:
 
 protected:
 
+    void maybeDisable()
+    {
+        auto state(_state.lock());
+        if (state->enabled && settings.tryFallback) {
+            int t = 60;
+            printError("disabling binary cache '%s' for %s seconds", getUri(), t);
+            state->enabled = false;
+            state->disabledUntil = std::chrono::steady_clock::now() + std::chrono::seconds(t);
+        }
+    }
+
+    void checkEnabled()
+    {
+        auto state(_state.lock());
+        if (state->enabled) return;
+        if (std::chrono::steady_clock::now() > state->disabledUntil) {
+            state->enabled = true;
+            debug("re-enabling binary cache '%s'", getUri());
+            return;
+        }
+        throw SubstituterDisabled("substituter '%s' is disabled", getUri());
+    }
+
     bool fileExists(const std::string & path) override
     {
+        checkEnabled();
+
         try {
             DownloadRequest request(cacheUri + "/" + path);
             request.head = true;
@@ -59,6 +92,7 @@ protected:
                bucket is unlistable, so treat 403 as 404. */
             if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
                 return false;
+            maybeDisable();
             throw;
         }
     }
@@ -86,12 +120,14 @@ protected:
 
     void getFile(const std::string & path, Sink & sink) override
     {
+        checkEnabled();
         auto request(makeRequest(path));
         try {
             getDownloader()->download(std::move(request), sink);
         } catch (DownloadError & e) {
             if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
                 throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
+            maybeDisable();
             throw;
         }
     }
@@ -99,15 +135,18 @@ protected:
     void getFile(const std::string & path,
         Callback<std::shared_ptr<std::string>> callback) override
     {
+        checkEnabled();
+
         auto request(makeRequest(path));
 
         getDownloader()->enqueueDownload(request,
-            {[callback](std::future<DownloadResult> result) {
+            {[callback, this](std::future<DownloadResult> result) {
                 try {
                     callback(result.get().data);
                 } catch (DownloadError & e) {
                     if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
                         return callback(std::shared_ptr<std::string>());
+                    maybeDisable();
                     callback.rethrow();
                 } catch (...) {
                     callback.rethrow();
diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc
index 02d91ded04cd..88d2574e86ef 100644
--- a/src/libstore/legacy-ssh-store.cc
+++ b/src/libstore/legacy-ssh-store.cc
@@ -17,6 +17,7 @@ struct LegacySSHStore : public Store
     const Setting<Path> sshKey{this, "", "ssh-key", "path to an SSH private key"};
     const Setting<bool> compress{this, false, "compress", "whether to compress the connection"};
     const Setting<Path> remoteProgram{this, "nix-store", "remote-program", "path to the nix-store executable on the remote system"};
+    const Setting<std::string> remoteStore{this, "", "remote-store", "URI of the store on the remote system"};
 
     // Hack for getting remote build log output.
     const Setting<int> logFD{this, -1, "log-fd", "file descriptor to which SSH's stderr is connected"};
@@ -27,6 +28,7 @@ struct LegacySSHStore : public Store
         FdSink to;
         FdSource from;
         int remoteVersion;
+        bool good = true;
     };
 
     std::string host;
@@ -41,7 +43,7 @@ struct LegacySSHStore : public Store
         , connections(make_ref<Pool<Connection>>(
             std::max(1, (int) maxConnections),
             [this]() { return openConnection(); },
-            [](const ref<Connection> & r) { return true; }
+            [](const ref<Connection> & r) { return r->good; }
             ))
         , master(
             host,
@@ -56,7 +58,9 @@ struct LegacySSHStore : public Store
     ref<Connection> openConnection()
     {
         auto conn = make_ref<Connection>();
-        conn->sshConn = master.startCommand(fmt("%s --serve --write", remoteProgram));
+        conn->sshConn = master.startCommand(
+            fmt("%s --serve --write", remoteProgram)
+            + (remoteStore.get() == "" ? "" : " --store " + shellEscape(remoteStore.get())));
         conn->to = FdSink(conn->sshConn->in.get());
         conn->from = FdSource(conn->sshConn->out.get());
 
@@ -127,18 +131,48 @@ struct LegacySSHStore : public Store
 
         auto conn(connections->get());
 
-        conn->to
-            << cmdImportPaths
-            << 1;
-        copyNAR(source, conn->to);
-        conn->to
-            << exportMagic
-            << info.path
-            << info.references
-            << info.deriver
-            << 0
-            << 0;
-        conn->to.flush();
+        if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 5) {
+
+            conn->to
+                << cmdAddToStoreNar
+                << info.path
+                << info.deriver
+                << info.narHash.to_string(Base16, false)
+                << info.references
+                << info.registrationTime
+                << info.narSize
+                << info.ultimate
+                << info.sigs
+                << info.ca;
+            try {
+                copyNAR(source, conn->to);
+            } catch (...) {
+                conn->good = false;
+                throw;
+            }
+            conn->to.flush();
+
+        } else {
+
+            conn->to
+                << cmdImportPaths
+                << 1;
+            try {
+                copyNAR(source, conn->to);
+            } catch (...) {
+                conn->good = false;
+                throw;
+            }
+            conn->to
+                << exportMagic
+                << info.path
+                << info.references
+                << info.deriver
+                << 0
+                << 0;
+            conn->to.flush();
+
+        }
 
         if (readInt(conn->from) != 1)
             throw Error("failed to add path '%s' to remote host '%s', info.path, host");
diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc
index 3b2ba65f3b46..197b9d78995b 100644
--- a/src/libstore/local-store.cc
+++ b/src/libstore/local-store.cc
@@ -450,7 +450,7 @@ static void canonicalisePathMetaData_(const Path & path, uid_t fromUid, InodesSe
     ssize_t eaSize = llistxattr(path.c_str(), nullptr, 0);
 
     if (eaSize < 0) {
-        if (errno != ENOTSUP)
+        if (errno != ENOTSUP && errno != ENODATA)
             throw SysError("querying extended attributes of '%s'", path);
     } else if (eaSize > 0) {
         std::vector<char> eaBuf(eaSize);
@@ -880,6 +880,12 @@ void LocalStore::querySubstitutablePathInfos(const PathSet & paths,
                     narInfo ? narInfo->fileSize : 0,
                     info->narSize};
             } catch (InvalidPath) {
+            } catch (SubstituterDisabled) {
+            } catch (Error & e) {
+                if (settings.tryFallback)
+                    printError(e.what());
+                else
+                    throw;
             }
         }
     }
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index 26144ccb40cc..7711388f05a9 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -84,8 +84,8 @@ static void initAWS()
     });
 }
 
-S3Helper::S3Helper(const std::string & profile, const std::string & region)
-    : config(makeConfig(region))
+S3Helper::S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint)
+    : config(makeConfig(region, endpoint))
     , client(make_ref<Aws::S3::S3Client>(
             profile == ""
             ? std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>(
@@ -99,7 +99,7 @@ S3Helper::S3Helper(const std::string & profile, const std::string & region)
 #else
             Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
 #endif
-            false))
+            endpoint.empty()))
 {
 }
 
@@ -116,11 +116,14 @@ class RetryStrategy : public Aws::Client::DefaultRetryStrategy
     }
 };
 
-ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region)
+ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region, const string & endpoint)
 {
     initAWS();
     auto res = make_ref<Aws::Client::ClientConfiguration>();
     res->region = region;
+    if (!endpoint.empty()) {
+        res->endpointOverride = endpoint;
+    }
     res->requestTimeoutMs = 600 * 1000;
     res->retryStrategy = std::make_shared<RetryStrategy>();
     res->caFile = settings.caFile;
@@ -150,10 +153,8 @@ S3Helper::DownloadResult S3Helper::getObject(
         auto result = checkAws(fmt("AWS error fetching '%s'", key),
             client->GetObject(request));
 
-        res.data = decodeContent(
-            result.GetContentEncoding(),
-            make_ref<std::string>(
-                dynamic_cast<std::stringstream &>(result.GetBody()).str()));
+        res.data = decompress(result.GetContentEncoding(),
+            dynamic_cast<std::stringstream &>(result.GetBody()).str());
 
     } catch (S3Error & e) {
         if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
@@ -170,6 +171,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
 {
     const Setting<std::string> profile{this, "", "profile", "The name of the AWS configuration profile to use."};
     const Setting<std::string> region{this, Aws::Region::US_EAST_1, "region", {"aws-region"}};
+    const Setting<std::string> endpoint{this, "", "endpoint", "An optional override of the endpoint to use when talking to S3."};
     const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
     const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
     const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
@@ -186,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         const Params & params, const std::string & bucketName)
         : S3BinaryCacheStore(params)
         , bucketName(bucketName)
-        , s3Helper(profile, region)
+        , s3Helper(profile, region, endpoint)
     {
         diskCache = getNarInfoDiskCache();
     }
@@ -273,6 +275,9 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         return true;
     }
 
+    std::shared_ptr<TransferManager> transferManager;
+    std::once_flag transferManagerCreated;
+
     void uploadFile(const std::string & path, const std::string & data,
         const std::string & mimeType,
         const std::string & contentEncoding)
@@ -284,61 +289,49 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
             executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);
 
-        TransferManagerConfiguration transferConfig(executor.get());
-
-        transferConfig.s3Client = s3Helper.client;
-        transferConfig.bufferSize = bufferSize;
-
-        if (contentEncoding != "")
-            transferConfig.createMultipartUploadTemplate.SetContentEncoding(
-                contentEncoding);
-
-        transferConfig.uploadProgressCallback =
-            [&](const TransferManager *transferManager,
-                const std::shared_ptr<const TransferHandle>
-                    &transferHandle) {
-              //FIXME: find a way to properly abort the multipart upload.
-              checkInterrupt();
-              debug("upload progress ('%s'): '%d' of '%d' bytes",
-                             path,
-                             transferHandle->GetBytesTransferred(),
-                             transferHandle->GetBytesTotalSize());
-            };
+        std::call_once(transferManagerCreated, [&]() {
 
-        transferConfig.transferStatusUpdatedCallback =
-            [&](const TransferManager *,
-                const std::shared_ptr<const TransferHandle>
-                    &transferHandle) {
-              switch (transferHandle->GetStatus()) {
-                  case TransferStatus::COMPLETED:
-                      printTalkative("upload of '%s' completed", path);
-                      stats.put++;
-                      stats.putBytes += data.size();
-                      break;
-                  case TransferStatus::IN_PROGRESS:
-                      break;
-                  case TransferStatus::FAILED:
-                      throw Error("AWS error: failed to upload 's3://%s/%s'",
-                                  bucketName, path);
-                      break;
-                  default:
-                      throw Error("AWS error: transfer status of 's3://%s/%s' "
-                                  "in unexpected state",
-                                  bucketName, path);
-              };
-            };
+            TransferManagerConfiguration transferConfig(executor.get());
+
+            transferConfig.s3Client = s3Helper.client;
+            transferConfig.bufferSize = bufferSize;
 
-        std::shared_ptr<TransferManager> transferManager =
-            TransferManager::Create(transferConfig);
+            transferConfig.uploadProgressCallback =
+                [&](const TransferManager *transferManager,
+                    const std::shared_ptr<const TransferHandle>
+                    &transferHandle)
+                {
+                    //FIXME: find a way to properly abort the multipart upload.
+                    //checkInterrupt();
+                    debug("upload progress ('%s'): '%d' of '%d' bytes",
+                        path,
+                        transferHandle->GetBytesTransferred(),
+                        transferHandle->GetBytesTotalSize());
+                };
+
+            transferManager = TransferManager::Create(transferConfig);
+        });
 
         auto now1 = std::chrono::steady_clock::now();
 
         std::shared_ptr<TransferHandle> transferHandle =
-            transferManager->UploadFile(stream, bucketName, path, mimeType,
-                                        Aws::Map<Aws::String, Aws::String>());
+            transferManager->UploadFile(
+                stream, bucketName, path, mimeType,
+                Aws::Map<Aws::String, Aws::String>(),
+                nullptr, contentEncoding);
 
         transferHandle->WaitUntilFinished();
 
+        if (transferHandle->GetStatus() == TransferStatus::FAILED)
+            throw Error("AWS error: failed to upload 's3://%s/%s': %s",
+                bucketName, path, transferHandle->GetLastError().GetMessage());
+
+        if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
+            throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
+                bucketName, path);
+
+        printTalkative("upload of '%s' completed", path);
+
         auto now2 = std::chrono::steady_clock::now();
 
         auto duration =
@@ -349,6 +342,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
                   bucketName % path % data.size() % duration);
 
         stats.putTimeMs += duration;
+        stats.putBytes += data.size();
+        stats.put++;
     }
 
     void upsertFile(const std::string & path, const std::string & data,
diff --git a/src/libstore/s3.hh b/src/libstore/s3.hh
index 4f996400343c..95d612b66335 100644
--- a/src/libstore/s3.hh
+++ b/src/libstore/s3.hh
@@ -14,9 +14,9 @@ struct S3Helper
     ref<Aws::Client::ClientConfiguration> config;
     ref<Aws::S3::S3Client> client;
 
-    S3Helper(const std::string & profile, const std::string & region);
+    S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint);
 
-    ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region);
+    ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region, const std::string & endpoint);
 
     struct DownloadResult
     {
diff --git a/src/libstore/serve-protocol.hh b/src/libstore/serve-protocol.hh
index f67d1e2580a5..9fae6d5349f1 100644
--- a/src/libstore/serve-protocol.hh
+++ b/src/libstore/serve-protocol.hh
@@ -5,7 +5,7 @@ namespace nix {
 #define SERVE_MAGIC_1 0x390c9deb
 #define SERVE_MAGIC_2 0x5452eecb
 
-#define SERVE_PROTOCOL_VERSION 0x204
+#define SERVE_PROTOCOL_VERSION 0x205
 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
 #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
 
@@ -18,6 +18,7 @@ typedef enum {
     cmdBuildPaths = 6,
     cmdQueryClosure = 7,
     cmdBuildDerivation = 8,
+    cmdAddToStoreNar = 9,
 } ServeCommand;
 
 }
diff --git a/src/libstore/ssh.cc b/src/libstore/ssh.cc
index 033c580936ad..5e0e44935cca 100644
--- a/src/libstore/ssh.cc
+++ b/src/libstore/ssh.cc
@@ -4,8 +4,9 @@ namespace nix {
 
 SSHMaster::SSHMaster(const std::string & host, const std::string & keyFile, bool useMaster, bool compress, int logFD)
     : host(host)
+    , fakeSSH(host == "localhost")
     , keyFile(keyFile)
-    , useMaster(useMaster)
+    , useMaster(useMaster && !fakeSSH)
     , compress(compress)
     , logFD(logFD)
 {
@@ -45,12 +46,19 @@ std::unique_ptr<SSHMaster::Connection> SSHMaster::startCommand(const std::string
         if (logFD != -1 && dup2(logFD, STDERR_FILENO) == -1)
             throw SysError("duping over stderr");
 
-        Strings args = { "ssh", host.c_str(), "-x", "-a" };
-        addCommonSSHOpts(args);
-        if (socketPath != "")
-            args.insert(args.end(), {"-S", socketPath});
-        if (verbosity >= lvlChatty)
-            args.push_back("-v");
+        Strings args;
+
+        if (fakeSSH) {
+            args = { "bash", "-c" };
+        } else {
+            args = { "ssh", host.c_str(), "-x", "-a" };
+            addCommonSSHOpts(args);
+            if (socketPath != "")
+                args.insert(args.end(), {"-S", socketPath});
+            if (verbosity >= lvlChatty)
+                args.push_back("-v");
+        }
+
         args.push_back(command);
         execvp(args.begin()->c_str(), stringsToCharPtrs(args).data());
 
diff --git a/src/libstore/ssh.hh b/src/libstore/ssh.hh
index 1268e6d00054..4f0f0bd29f9f 100644
--- a/src/libstore/ssh.hh
+++ b/src/libstore/ssh.hh
@@ -10,6 +10,7 @@ class SSHMaster
 private:
 
     const std::string host;
+    bool fakeSSH;
     const std::string keyFile;
     const bool useMaster;
     const bool compress;
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index 1854353828df..1f42097fccfb 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -609,6 +609,8 @@ void copyStorePath(ref<Store> srcStore, ref<Store> dstStore,
             act.progress(total, info->narSize);
         });
         srcStore->narFromPath({storePath}, wrapperSink);
+    }, [&]() {
+	throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
     });
 
     dstStore->addToStore(*info, *source, repair, checkSigs);
@@ -844,8 +846,24 @@ ref<Store> openStore(const std::string & uri_,
     if (q != std::string::npos) {
         for (auto s : tokenizeString<Strings>(uri.substr(q + 1), "&")) {
             auto e = s.find('=');
-            if (e != std::string::npos)
-                params[s.substr(0, e)] = s.substr(e + 1);
+            if (e != std::string::npos) {
+                auto value = s.substr(e + 1);
+                std::string decoded;
+                for (size_t i = 0; i < value.size(); ) {
+                    if (value[i] == '%') {
+                        if (i + 2 >= value.size())
+                            throw Error("invalid URI parameter '%s'", value);
+                        try {
+                            decoded += std::stoul(std::string(value, i + 1, 2), 0, 16);
+                            i += 3;
+                        } catch (...) {
+                            throw Error("invalid URI parameter '%s'", value);
+                        }
+                    } else
+                        decoded += value[i++];
+                }
+                params[s.substr(0, e)] = decoded;
+            }
         }
         uri = uri_.substr(0, q);
     }
diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh
index 7c5b495a4482..099818ed6f69 100644
--- a/src/libstore/store-api.hh
+++ b/src/libstore/store-api.hh
@@ -23,6 +23,7 @@ MakeError(BuildError, Error) /* denotes a permanent build failure */
 MakeError(InvalidPath, Error)
 MakeError(Unsupported, Error)
 MakeError(SubstituteGone, Error)
+MakeError(SubstituterDisabled, Error)
 
 
 struct BasicDerivation;
diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc
index e1782f8c4bd9..0dd84e32034a 100644
--- a/src/libutil/compression.cc
+++ b/src/libutil/compression.cc
@@ -8,246 +8,265 @@
 #include <cstdio>
 #include <cstring>
 
-#if HAVE_BROTLI
 #include <brotli/decode.h>
 #include <brotli/encode.h>
-#endif // HAVE_BROTLI
 
 #include <iostream>
 
 namespace nix {
 
-static const size_t bufSize = 32 * 1024;
-
-static void decompressNone(Source & source, Sink & sink)
+// Don't feed brotli too much at once.
+struct ChunkedCompressionSink : CompressionSink
 {
-    std::vector<unsigned char> buf(bufSize);
-    while (true) {
-        size_t n;
-        try {
-            n = source.read(buf.data(), buf.size());
-        } catch (EndOfFile &) {
-            break;
+    uint8_t outbuf[32 * 1024];
+
+    void write(const unsigned char * data, size_t len) override
+    {
+        const size_t CHUNK_SIZE = sizeof(outbuf) << 2;
+        while (len) {
+            size_t n = std::min(CHUNK_SIZE, len);
+            writeInternal(data, n);
+            data += n;
+            len -= n;
         }
-        sink(buf.data(), n);
     }
-}
 
-static void decompressXZ(Source & source, Sink & sink)
+    virtual void writeInternal(const unsigned char * data, size_t len) = 0;
+};
+
+struct NoneSink : CompressionSink
 {
-    lzma_stream strm(LZMA_STREAM_INIT);
-
-    lzma_ret ret = lzma_stream_decoder(
-        &strm, UINT64_MAX, LZMA_CONCATENATED);
-    if (ret != LZMA_OK)
-        throw CompressionError("unable to initialise lzma decoder");
-
-    Finally free([&]() { lzma_end(&strm); });
-
-    lzma_action action = LZMA_RUN;
-    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
-    strm.next_in = nullptr;
-    strm.avail_in = 0;
-    strm.next_out = outbuf.data();
-    strm.avail_out = outbuf.size();
-    bool eof = false;
-
-    while (true) {
-        checkInterrupt();
-
-        if (strm.avail_in == 0 && !eof) {
-            strm.next_in = inbuf.data();
-            try {
-                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
-            } catch (EndOfFile &) {
-                eof = true;
-            }
-        }
+    Sink & nextSink;
+    NoneSink(Sink & nextSink) : nextSink(nextSink) { }
+    void finish() override { flush(); }
+    void write(const unsigned char * data, size_t len) override { nextSink(data, len); }
+};
 
-        if (strm.avail_in == 0)
-            action = LZMA_FINISH;
+struct XzDecompressionSink : CompressionSink
+{
+    Sink & nextSink;
+    uint8_t outbuf[BUFSIZ];
+    lzma_stream strm = LZMA_STREAM_INIT;
+    bool finished = false;
 
-        lzma_ret ret = lzma_code(&strm, action);
+    XzDecompressionSink(Sink & nextSink) : nextSink(nextSink)
+    {
+        lzma_ret ret = lzma_stream_decoder(
+            &strm, UINT64_MAX, LZMA_CONCATENATED);
+        if (ret != LZMA_OK)
+            throw CompressionError("unable to initialise lzma decoder");
 
-        if (strm.avail_out < outbuf.size()) {
-            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
-            strm.next_out = outbuf.data();
-            strm.avail_out = outbuf.size();
-        }
+        strm.next_out = outbuf;
+        strm.avail_out = sizeof(outbuf);
+    }
 
-        if (ret == LZMA_STREAM_END) return;
+    ~XzDecompressionSink()
+    {
+        lzma_end(&strm);
+    }
 
-        if (ret != LZMA_OK)
-            throw CompressionError("error %d while decompressing xz file", ret);
+    void finish() override
+    {
+        CompressionSink::flush();
+        write(nullptr, 0);
     }
-}
 
-static void decompressBzip2(Source & source, Sink & sink)
-{
-    bz_stream strm;
-    memset(&strm, 0, sizeof(strm));
-
-    int ret = BZ2_bzDecompressInit(&strm, 0, 0);
-    if (ret != BZ_OK)
-        throw CompressionError("unable to initialise bzip2 decoder");
-
-    Finally free([&]() { BZ2_bzDecompressEnd(&strm); });
-
-    std::vector<char> inbuf(bufSize), outbuf(bufSize);
-    strm.next_in = nullptr;
-    strm.avail_in = 0;
-    strm.next_out = outbuf.data();
-    strm.avail_out = outbuf.size();
-    bool eof = false;
-
-    while (true) {
-        checkInterrupt();
-
-        if (strm.avail_in == 0 && !eof) {
-            strm.next_in = inbuf.data();
-            try {
-                strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
-            } catch (EndOfFile &) {
-                eof = true;
-            }
-        }
+    void write(const unsigned char * data, size_t len) override
+    {
+        strm.next_in = data;
+        strm.avail_in = len;
+
+        while (!finished && (!data || strm.avail_in)) {
+            checkInterrupt();
 
-        int ret = BZ2_bzDecompress(&strm);
+            lzma_ret ret = lzma_code(&strm, data ? LZMA_RUN : LZMA_FINISH);
+            if (ret != LZMA_OK && ret != LZMA_STREAM_END)
+                throw CompressionError("error %d while decompressing xz file", ret);
 
-        if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof)
-            throw CompressionError("bzip2 data ends prematurely");
+            finished = ret == LZMA_STREAM_END;
 
-        if (strm.avail_out < outbuf.size()) {
-            sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
-            strm.next_out = outbuf.data();
-            strm.avail_out = outbuf.size();
+            if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
+                strm.next_out = outbuf;
+                strm.avail_out = sizeof(outbuf);
+            }
         }
+    }
+};
 
-        if (ret == BZ_STREAM_END) return;
+struct BzipDecompressionSink : ChunkedCompressionSink
+{
+    Sink & nextSink;
+    bz_stream strm;
+    bool finished = false;
 
+    BzipDecompressionSink(Sink & nextSink) : nextSink(nextSink)
+    {
+        memset(&strm, 0, sizeof(strm));
+        int ret = BZ2_bzDecompressInit(&strm, 0, 0);
         if (ret != BZ_OK)
-            throw CompressionError("error while decompressing bzip2 file");
+            throw CompressionError("unable to initialise bzip2 decoder");
+
+        strm.next_out = (char *) outbuf;
+        strm.avail_out = sizeof(outbuf);
     }
-}
 
-static void decompressBrotli(Source & source, Sink & sink)
-{
-#if !HAVE_BROTLI
-    RunOptions options(BROTLI, {"-d"});
-    options.standardIn = &source;
-    options.standardOut = &sink;
-    runProgram2(options);
-#else
-    auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
-    if (!s)
-        throw CompressionError("unable to initialize brotli decoder");
-
-    Finally free([s]() { BrotliDecoderDestroyInstance(s); });
-
-    std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
-    const uint8_t * next_in = nullptr;
-    size_t avail_in = 0;
-    bool eof = false;
-
-    while (true) {
-        checkInterrupt();
-
-        if (avail_in == 0 && !eof) {
-            next_in = inbuf.data();
-            try {
-                avail_in = source.read((unsigned char *) next_in, inbuf.size());
-            } catch (EndOfFile &) {
-                eof = true;
+    ~BzipDecompressionSink()
+    {
+        BZ2_bzDecompressEnd(&strm);
+    }
+
+    void finish() override
+    {
+        flush();
+        write(nullptr, 0);
+    }
+
+    void writeInternal(const unsigned char * data, size_t len) override
+    {
+        assert(len <= std::numeric_limits<decltype(strm.avail_in)>::max());
+
+        strm.next_in = (char *) data;
+        strm.avail_in = len;
+
+        while (strm.avail_in) {
+            checkInterrupt();
+
+            int ret = BZ2_bzDecompress(&strm);
+            if (ret != BZ_OK && ret != BZ_STREAM_END)
+                throw CompressionError("error while decompressing bzip2 file");
+
+            finished = ret == BZ_STREAM_END;
+
+            if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
+                strm.next_out = (char *) outbuf;
+                strm.avail_out = sizeof(outbuf);
             }
         }
+    }
+};
 
-        uint8_t * next_out = outbuf.data();
-        size_t avail_out = outbuf.size();
-
-        auto ret = BrotliDecoderDecompressStream(s,
-                &avail_in, &next_in,
-                &avail_out, &next_out,
-                nullptr);
-
-        switch (ret) {
-        case BROTLI_DECODER_RESULT_ERROR:
-            throw CompressionError("error while decompressing brotli file");
-        case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
-            if (eof)
-                throw CompressionError("incomplete or corrupt brotli file");
-            break;
-        case BROTLI_DECODER_RESULT_SUCCESS:
-            if (avail_in != 0)
-                throw CompressionError("unexpected input after brotli decompression");
-            break;
-        case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT:
-            // I'm not sure if this can happen, but abort if this happens with empty buffer
-            if (avail_out == outbuf.size())
-                throw CompressionError("brotli decompression requires larger buffer");
-            break;
-        }
+struct BrotliDecompressionSink : ChunkedCompressionSink
+{
+    Sink & nextSink;
+    BrotliDecoderState * state;
+    bool finished = false;
 
-        // Always ensure we have full buffer for next invocation
-        if (avail_out < outbuf.size())
-            sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out);
+    BrotliDecompressionSink(Sink & nextSink) : nextSink(nextSink)
+    {
+        state = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
+        if (!state)
+            throw CompressionError("unable to initialize brotli decoder");
+    }
 
-        if (ret == BROTLI_DECODER_RESULT_SUCCESS) return;
+    ~BrotliDecompressionSink()
+    {
+        BrotliDecoderDestroyInstance(state);
     }
-#endif // HAVE_BROTLI
-}
+
+    void finish() override
+    {
+        flush();
+        writeInternal(nullptr, 0);
+    }
+
+    void writeInternal(const unsigned char * data, size_t len) override
+    {
+        const uint8_t * next_in = data;
+        size_t avail_in = len;
+        uint8_t * next_out = outbuf;
+        size_t avail_out = sizeof(outbuf);
+
+        while (!finished && (!data || avail_in)) {
+            checkInterrupt();
+
+            if (!BrotliDecoderDecompressStream(state,
+                    &avail_in, &next_in,
+                    &avail_out, &next_out,
+                    nullptr))
+                throw CompressionError("error while decompressing brotli file");
+
+            if (avail_out < sizeof(outbuf) || avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - avail_out);
+                next_out = outbuf;
+                avail_out = sizeof(outbuf);
+            }
+
+            finished = BrotliDecoderIsFinished(state);
+        }
+    }
+};
 
 ref<std::string> decompress(const std::string & method, const std::string & in)
 {
-    StringSource source(in);
-    StringSink sink;
-    decompress(method, source, sink);
-    return sink.s;
+    StringSink ssink;
+    auto sink = makeDecompressionSink(method, ssink);
+    (*sink)(in);
+    sink->finish();
+    return ssink.s;
 }
 
-void decompress(const std::string & method, Source & source, Sink & sink)
+ref<CompressionSink> makeDecompressionSink(const std::string & method, Sink & nextSink)
 {
-    if (method == "none")
-        return decompressNone(source, sink);
+    if (method == "none" || method == "")
+        return make_ref<NoneSink>(nextSink);
     else if (method == "xz")
-        return decompressXZ(source, sink);
+        return make_ref<XzDecompressionSink>(nextSink);
     else if (method == "bzip2")
-        return decompressBzip2(source, sink);
+        return make_ref<BzipDecompressionSink>(nextSink);
     else if (method == "br")
-        return decompressBrotli(source, sink);
+        return make_ref<BrotliDecompressionSink>(nextSink);
     else
         throw UnknownCompressionMethod("unknown compression method '%s'", method);
 }
 
-struct NoneSink : CompressionSink
-{
-    Sink & nextSink;
-    NoneSink(Sink & nextSink) : nextSink(nextSink) { }
-    void finish() override { flush(); }
-    void write(const unsigned char * data, size_t len) override { nextSink(data, len); }
-};
-
-struct XzSink : CompressionSink
+struct XzCompressionSink : CompressionSink
 {
     Sink & nextSink;
     uint8_t outbuf[BUFSIZ];
     lzma_stream strm = LZMA_STREAM_INIT;
     bool finished = false;
 
-    template <typename F>
-    XzSink(Sink & nextSink, F&& initEncoder) : nextSink(nextSink) {
-        lzma_ret ret = initEncoder();
+    XzCompressionSink(Sink & nextSink, bool parallel) : nextSink(nextSink)
+    {
+        lzma_ret ret;
+        bool done = false;
+
+        if (parallel) {
+#ifdef HAVE_LZMA_MT
+            lzma_mt mt_options = {};
+            mt_options.flags = 0;
+            mt_options.timeout = 300; // Using the same setting as the xz cmd line
+            mt_options.preset = LZMA_PRESET_DEFAULT;
+            mt_options.filters = NULL;
+            mt_options.check = LZMA_CHECK_CRC64;
+            mt_options.threads = lzma_cputhreads();
+            mt_options.block_size = 0;
+            if (mt_options.threads == 0)
+                mt_options.threads = 1;
+            // FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the
+            // number of threads.
+            ret = lzma_stream_encoder_mt(&strm, &mt_options);
+            done = true;
+#else
+            printMsg(lvlError, "warning: parallel XZ compression requested but not supported, falling back to single-threaded compression");
+#endif
+        }
+
+        if (!done)
+            ret = lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64);
+
         if (ret != LZMA_OK)
             throw CompressionError("unable to initialise lzma encoder");
+
         // FIXME: apply the x86 BCJ filter?
 
         strm.next_out = outbuf;
         strm.avail_out = sizeof(outbuf);
     }
-    XzSink(Sink & nextSink) : XzSink(nextSink, [this]() {
-        return lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64);
-    }) {}
 
-    ~XzSink()
+    ~XzCompressionSink()
     {
         lzma_end(&strm);
     }
@@ -255,43 +274,25 @@ struct XzSink : CompressionSink
     void finish() override
     {
         CompressionSink::flush();
-
-        assert(!finished);
-        finished = true;
-
-        while (true) {
-            checkInterrupt();
-
-            lzma_ret ret = lzma_code(&strm, LZMA_FINISH);
-            if (ret != LZMA_OK && ret != LZMA_STREAM_END)
-                throw CompressionError("error while flushing xz file");
-
-            if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
-                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
-                strm.next_out = outbuf;
-                strm.avail_out = sizeof(outbuf);
-            }
-
-            if (ret == LZMA_STREAM_END) break;
-        }
+        write(nullptr, 0);
     }
 
     void write(const unsigned char * data, size_t len) override
     {
-        assert(!finished);
-
         strm.next_in = data;
         strm.avail_in = len;
 
-        while (strm.avail_in) {
+        while (!finished && (!data || strm.avail_in)) {
             checkInterrupt();
 
-            lzma_ret ret = lzma_code(&strm, LZMA_RUN);
-            if (ret != LZMA_OK)
-                throw CompressionError("error while compressing xz file");
+            lzma_ret ret = lzma_code(&strm, data ? LZMA_RUN : LZMA_FINISH);
+            if (ret != LZMA_OK && ret != LZMA_STREAM_END)
+                throw CompressionError("error %d while compressing xz file", ret);
+
+            finished = ret == LZMA_STREAM_END;
 
-            if (strm.avail_out == 0) {
-                nextSink(outbuf, sizeof(outbuf));
+            if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
                 strm.next_out = outbuf;
                 strm.avail_out = sizeof(outbuf);
             }
@@ -299,46 +300,24 @@ struct XzSink : CompressionSink
     }
 };
 
-#ifdef HAVE_LZMA_MT
-struct ParallelXzSink : public XzSink
-{
-  ParallelXzSink(Sink &nextSink) : XzSink(nextSink, [this]() {
-        lzma_mt mt_options = {};
-        mt_options.flags = 0;
-        mt_options.timeout = 300; // Using the same setting as the xz cmd line
-        mt_options.preset = LZMA_PRESET_DEFAULT;
-        mt_options.filters = NULL;
-        mt_options.check = LZMA_CHECK_CRC64;
-        mt_options.threads = lzma_cputhreads();
-        mt_options.block_size = 0;
-        if (mt_options.threads == 0)
-            mt_options.threads = 1;
-        // FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the
-        // number of threads.
-        return lzma_stream_encoder_mt(&strm, &mt_options);
-  }) {}
-};
-#endif
-
-struct BzipSink : CompressionSink
+struct BzipCompressionSink : ChunkedCompressionSink
 {
     Sink & nextSink;
-    char outbuf[BUFSIZ];
     bz_stream strm;
     bool finished = false;
 
-    BzipSink(Sink & nextSink) : nextSink(nextSink)
+    BzipCompressionSink(Sink & nextSink) : nextSink(nextSink)
     {
         memset(&strm, 0, sizeof(strm));
         int ret = BZ2_bzCompressInit(&strm, 9, 0, 30);
         if (ret != BZ_OK)
             throw CompressionError("unable to initialise bzip2 encoder");
 
-        strm.next_out = outbuf;
+        strm.next_out = (char *) outbuf;
         strm.avail_out = sizeof(outbuf);
     }
 
-    ~BzipSink()
+    ~BzipCompressionSink()
     {
         BZ2_bzCompressEnd(&strm);
     }
@@ -346,114 +325,49 @@ struct BzipSink : CompressionSink
     void finish() override
     {
         flush();
-
-        assert(!finished);
-        finished = true;
-
-        while (true) {
-            checkInterrupt();
-
-            int ret = BZ2_bzCompress(&strm, BZ_FINISH);
-            if (ret != BZ_FINISH_OK && ret != BZ_STREAM_END)
-                throw CompressionError("error while flushing bzip2 file");
-
-            if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
-                nextSink((unsigned char *) outbuf, sizeof(outbuf) - strm.avail_out);
-                strm.next_out = outbuf;
-                strm.avail_out = sizeof(outbuf);
-            }
-
-            if (ret == BZ_STREAM_END) break;
-        }
-    }
-
-    void write(const unsigned char * data, size_t len) override
-    {
-        /* Bzip2's 'avail_in' parameter is an unsigned int, so we need
-           to split the input into chunks of at most 4 GiB. */
-        while (len) {
-            auto n = std::min((size_t) std::numeric_limits<decltype(strm.avail_in)>::max(), len);
-            writeInternal(data, n);
-            data += n;
-            len -= n;
-        }
+        writeInternal(nullptr, 0);
     }
 
-    void writeInternal(const unsigned char * data, size_t len)
+    void writeInternal(const unsigned char * data, size_t len) override
     {
-        assert(!finished);
         assert(len <= std::numeric_limits<decltype(strm.avail_in)>::max());
 
         strm.next_in = (char *) data;
         strm.avail_in = len;
 
-        while (strm.avail_in) {
+        while (!finished && (!data || strm.avail_in)) {
             checkInterrupt();
 
-            int ret = BZ2_bzCompress(&strm, BZ_RUN);
-            if (ret != BZ_OK)
-                CompressionError("error while compressing bzip2 file");
+            int ret = BZ2_bzCompress(&strm, data ? BZ_RUN : BZ_FINISH);
+            if (ret != BZ_RUN_OK && ret != BZ_FINISH_OK && ret != BZ_STREAM_END)
+                throw CompressionError("error %d while compressing bzip2 file", ret);
 
-            if (strm.avail_out == 0) {
-                nextSink((unsigned char *) outbuf, sizeof(outbuf));
-                strm.next_out = outbuf;
+            finished = ret == BZ_STREAM_END;
+
+            if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) {
+                nextSink(outbuf, sizeof(outbuf) - strm.avail_out);
+                strm.next_out = (char *) outbuf;
                 strm.avail_out = sizeof(outbuf);
             }
         }
     }
 };
 
-struct LambdaCompressionSink : CompressionSink
-{
-    Sink & nextSink;
-    std::string data;
-    using CompressFnTy = std::function<std::string(const std::string&)>;
-    CompressFnTy compressFn;
-    LambdaCompressionSink(Sink& nextSink, CompressFnTy compressFn)
-        : nextSink(nextSink)
-        , compressFn(std::move(compressFn))
-    {
-    };
-
-    void finish() override
-    {
-        flush();
-        nextSink(compressFn(data));
-    }
-
-    void write(const unsigned char * data, size_t len) override
-    {
-        checkInterrupt();
-        this->data.append((const char *) data, len);
-    }
-};
-
-struct BrotliCmdSink : LambdaCompressionSink
-{
-    BrotliCmdSink(Sink& nextSink)
-        : LambdaCompressionSink(nextSink, [](const std::string& data) {
-            return runProgram(BROTLI, true, {}, data);
-        })
-    {
-    }
-};
-
-#if HAVE_BROTLI
-struct BrotliSink : CompressionSink
+struct BrotliCompressionSink : ChunkedCompressionSink
 {
     Sink & nextSink;
     uint8_t outbuf[BUFSIZ];
     BrotliEncoderState *state;
     bool finished = false;
 
-    BrotliSink(Sink & nextSink) : nextSink(nextSink)
+    BrotliCompressionSink(Sink & nextSink) : nextSink(nextSink)
     {
         state = BrotliEncoderCreateInstance(nullptr, nullptr, nullptr);
         if (!state)
             throw CompressionError("unable to initialise brotli encoder");
     }
 
-    ~BrotliSink()
+    ~BrotliCompressionSink()
     {
         BrotliEncoderDestroyInstance(state);
     }
@@ -461,94 +375,47 @@ struct BrotliSink : CompressionSink
     void finish() override
     {
         flush();
-        assert(!finished);
-
-        const uint8_t *next_in = nullptr;
-        size_t avail_in = 0;
-        uint8_t *next_out = outbuf;
-        size_t avail_out = sizeof(outbuf);
-        while (!finished) {
-            checkInterrupt();
-
-            if (!BrotliEncoderCompressStream(state,
-                        BROTLI_OPERATION_FINISH,
-                        &avail_in, &next_in,
-                        &avail_out, &next_out,
-                        nullptr))
-                throw CompressionError("error while finishing brotli file");
-
-            finished = BrotliEncoderIsFinished(state);
-            if (avail_out == 0 || finished) {
-                nextSink(outbuf, sizeof(outbuf) - avail_out);
-                next_out = outbuf;
-                avail_out = sizeof(outbuf);
-            }
-        }
+        writeInternal(nullptr, 0);
     }
 
-    void write(const unsigned char * data, size_t len) override
+    void writeInternal(const unsigned char * data, size_t len) override
     {
-        // Don't feed brotli too much at once
-        const size_t CHUNK_SIZE = sizeof(outbuf) << 2;
-        while (len) {
-          size_t n = std::min(CHUNK_SIZE, len);
-          writeInternal(data, n);
-          data += n;
-          len -= n;
-        }
-    }
-
-    void writeInternal(const unsigned char * data, size_t len)
-    {
-        assert(!finished);
-
-        const uint8_t *next_in = data;
+        const uint8_t * next_in = data;
         size_t avail_in = len;
-        uint8_t *next_out = outbuf;
+        uint8_t * next_out = outbuf;
         size_t avail_out = sizeof(outbuf);
 
-        while (avail_in > 0) {
+        while (!finished && (!data || avail_in)) {
             checkInterrupt();
 
             if (!BrotliEncoderCompressStream(state,
-                      BROTLI_OPERATION_PROCESS,
-                      &avail_in, &next_in,
-                      &avail_out, &next_out,
-                      nullptr))
-                throw CompressionError("error while compressing brotli file");
+                    data ? BROTLI_OPERATION_PROCESS : BROTLI_OPERATION_FINISH,
+                    &avail_in, &next_in,
+                    &avail_out, &next_out,
+                    nullptr))
+                throw CompressionError("error while compressing brotli compression");
 
             if (avail_out < sizeof(outbuf) || avail_in == 0) {
                 nextSink(outbuf, sizeof(outbuf) - avail_out);
                 next_out = outbuf;
                 avail_out = sizeof(outbuf);
             }
+
+            finished = BrotliEncoderIsFinished(state);
         }
     }
 };
-#endif // HAVE_BROTLI
 
 ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel)
 {
-    if (parallel) {
-#ifdef HAVE_LZMA_MT
-        if (method == "xz")
-            return make_ref<ParallelXzSink>(nextSink);
-#endif
-        printMsg(lvlError, format("Warning: parallel compression requested but not supported for method '%1%', falling back to single-threaded compression") % method);
-    }
-
     if (method == "none")
         return make_ref<NoneSink>(nextSink);
     else if (method == "xz")
-        return make_ref<XzSink>(nextSink);
+        return make_ref<XzCompressionSink>(nextSink, parallel);
     else if (method == "bzip2")
-        return make_ref<BzipSink>(nextSink);
+        return make_ref<BzipCompressionSink>(nextSink);
     else if (method == "br")
-#if HAVE_BROTLI
-        return make_ref<BrotliSink>(nextSink);
-#else
-        return make_ref<BrotliCmdSink>(nextSink);
-#endif
+        return make_ref<BrotliCompressionSink>(nextSink);
     else
         throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
 }
diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh
index f7a3e3fbd32e..dd666a4e19fd 100644
--- a/src/libutil/compression.hh
+++ b/src/libutil/compression.hh
@@ -8,17 +8,17 @@
 
 namespace nix {
 
-ref<std::string> decompress(const std::string & method, const std::string & in);
-
-void decompress(const std::string & method, Source & source, Sink & sink);
-
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
-
 struct CompressionSink : BufferedSink
 {
     virtual void finish() = 0;
 };
 
+ref<std::string> decompress(const std::string & method, const std::string & in);
+
+ref<CompressionSink> makeDecompressionSink(const std::string & method, Sink & nextSink);
+
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
+
 ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel = false);
 
 MakeError(UnknownCompressionMethod, Error);
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index 21803edd056a..17448f70efb6 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -157,16 +157,24 @@ size_t StringSource::read(unsigned char * data, size_t len)
 }
 
 
-std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun)
+#if BOOST_VERSION >= 106300 && BOOST_VERSION < 106600
+#error Coroutines are broken in this version of Boost!
+#endif
+
+std::unique_ptr<Source> sinkToSource(
+    std::function<void(Sink &)> fun,
+    std::function<void()> eof)
 {
     struct SinkToSource : Source
     {
         typedef boost::coroutines2::coroutine<std::string> coro_t;
 
+        std::function<void()> eof;
         coro_t::pull_type coro;
 
-        SinkToSource(std::function<void(Sink &)> fun)
-            : coro([&](coro_t::push_type & yield) {
+        SinkToSource(std::function<void(Sink &)> fun, std::function<void()> eof)
+            : eof(eof)
+            , coro([&](coro_t::push_type & yield) {
                 LambdaSink sink([&](const unsigned char * data, size_t len) {
                     if (len) yield(std::string((const char *) data, len));
                 });
@@ -180,8 +188,7 @@ std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun)
 
         size_t read(unsigned char * data, size_t len) override
         {
-            if (!coro)
-                throw EndOfFile("coroutine has finished");
+            if (!coro) { eof(); abort(); }
 
             if (pos == cur.size()) {
                 if (!cur.empty()) coro();
@@ -197,7 +204,7 @@ std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun)
         }
     };
 
-    return std::make_unique<SinkToSource>(fun);
+    return std::make_unique<SinkToSource>(fun, eof);
 }
 
 
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 14b62fdb6774..4b6ad5da5b9c 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -214,7 +214,11 @@ struct LambdaSource : Source
 
 /* Convert a function that feeds data into a Sink into a Source. The
    Source executes the function as a coroutine. */
-std::unique_ptr<Source> sinkToSource(std::function<void(Sink &)> fun);
+std::unique_ptr<Source> sinkToSource(
+    std::function<void(Sink &)> fun,
+    std::function<void()> eof = []() {
+        throw EndOfFile("coroutine has finished");
+    });
 
 
 void writePadding(size_t len, Sink & sink);
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index 6bc64ae75a42..03f0be705c1d 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -167,7 +167,7 @@ Path dirOf(const Path & path)
 {
     Path::size_type pos = path.rfind('/');
     if (pos == string::npos)
-        throw Error(format("invalid file name '%1%'") % path);
+        return ".";
     return pos == 0 ? "/" : Path(path, 0, pos);
 }
 
@@ -468,7 +468,7 @@ static Lazy<Path> getHome2([]() {
         std::vector<char> buf(16384);
         struct passwd pwbuf;
         struct passwd * pw;
-        if (getpwuid_r(getuid(), &pwbuf, buf.data(), buf.size(), &pw) != 0
+        if (getpwuid_r(geteuid(), &pwbuf, buf.data(), buf.size(), &pw) != 0
             || !pw || !pw->pw_dir || !pw->pw_dir[0])
             throw Error("cannot determine user's home directory");
         homeDir = pw->pw_dir;
diff --git a/src/nix-build/nix-build.cc b/src/nix-build/nix-build.cc
index de0e9118fd21..94d3a27560fe 100755
--- a/src/nix-build/nix-build.cc
+++ b/src/nix-build/nix-build.cc
@@ -85,7 +85,6 @@ void mainWrapped(int argc, char * * argv)
     BuildMode buildMode = bmNormal;
     bool readStdin = false;
 
-    auto shell = getEnv("SHELL", "/bin/sh");
     std::string envCommand; // interactive shell
     Strings envExclude;
 
@@ -99,6 +98,9 @@ void mainWrapped(int argc, char * * argv)
 
     std::string outLink = "./result";
 
+    // List of environment variables kept for --pure
+    std::set<string> keepVars{"HOME", "USER", "LOGNAME", "DISPLAY", "PATH", "TERM", "IN_NIX_SHELL", "TZ", "PAGER", "NIX_BUILD_SHELL", "SHLVL"};
+
     Strings args;
     for (int i = 1; i < argc; ++i)
         args.push_back(argv[i]);
@@ -218,6 +220,9 @@ void mainWrapped(int argc, char * * argv)
             }
         }
 
+        else if (*arg == "--keep")
+            keepVars.insert(getArg(*arg, arg, end));
+
         else if (*arg == "-")
             readStdin = true;
 
@@ -300,6 +305,8 @@ void mainWrapped(int argc, char * * argv)
         }
     }
 
+    state->printStats();
+
     auto buildPaths = [&](const PathSet & paths) {
         /* Note: we do this even when !printMissing to efficiently
            fetch binary cache data. */
@@ -368,7 +375,6 @@ void mainWrapped(int argc, char * * argv)
         auto tmp = getEnv("TMPDIR", getEnv("XDG_RUNTIME_DIR", "/tmp"));
 
         if (pure) {
-            std::set<string> keepVars{"HOME", "USER", "LOGNAME", "DISPLAY", "PATH", "TERM", "IN_NIX_SHELL", "TZ", "PAGER", "NIX_BUILD_SHELL", "SHLVL"};
             decltype(env) newEnv;
             for (auto & i : env)
                 if (keepVars.count(i.first))
@@ -415,7 +421,6 @@ void mainWrapped(int argc, char * * argv)
                 R"s([ -n "$PS1" ] && PS1='\n\[\033[1;32m\][nix-shell:\w]\$\[\033[0m\] '; )s"
                 "if [ \"$(type -t runHook)\" = function ]; then runHook shellHook; fi; "
                 "unset NIX_ENFORCE_PURITY; "
-                "unset NIX_INDENT_MAKE; "
                 "shopt -u nullglob; "
                 "unset TZ; %4%"
                 "%5%",
diff --git a/src/nix-channel/nix-channel.cc b/src/nix-channel/nix-channel.cc
index 55ebda438965..2083d3df5cab 100755
--- a/src/nix-channel/nix-channel.cc
+++ b/src/nix-channel/nix-channel.cc
@@ -169,7 +169,7 @@ int main(int argc, char ** argv)
 
         // Figure out the name of the channels profile.
         ;
-        auto pw = getpwuid(getuid());
+        auto pw = getpwuid(geteuid());
         std::string name = pw ? pw->pw_name : getEnv("USER", "");
         if (name.empty())
             throw Error("cannot figure out user name");
diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc
index a2e54b93c05f..644fa6681de3 100644
--- a/src/nix-daemon/nix-daemon.cc
+++ b/src/nix-daemon/nix-daemon.cc
@@ -707,6 +707,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
 
         logger->startWork();
 
+        // FIXME: race if addToStore doesn't read source?
         store.cast<Store>()->addToStore(info, *source, (RepairFlag) repair,
             dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr);
 
diff --git a/src/nix-env/nix-env.cc b/src/nix-env/nix-env.cc
index a43b103f6ec6..f9c8a8d313e7 100644
--- a/src/nix-env/nix-env.cc
+++ b/src/nix-env/nix-env.cc
@@ -150,10 +150,8 @@ static void loadSourceExpr(EvalState & state, const Path & path, Value & v)
     if (stat(path.c_str(), &st) == -1)
         throw SysError(format("getting information about '%1%'") % path);
 
-    if (isNixExpr(path, st)) {
+    if (isNixExpr(path, st))
         state.evalFile(path, v);
-        return;
-    }
 
     /* The path is a directory.  Put the Nix expressions in the
        directory in a set, with the file name of each expression as
@@ -161,13 +159,15 @@ static void loadSourceExpr(EvalState & state, const Path & path, Value & v)
        set flat, not nested, to make it easier for a user to have a
        ~/.nix-defexpr directory that includes some system-wide
        directory). */
-    if (S_ISDIR(st.st_mode)) {
+    else if (S_ISDIR(st.st_mode)) {
         state.mkAttrs(v, 1024);
         state.mkList(*state.allocAttr(v, state.symbols.create("_combineChannels")), 0);
         StringSet attrs;
         getAllExprs(state, path, attrs, v);
         v.attrs->sort();
     }
+
+    else throw Error("path '%s' is not a directory or a Nix expression", path);
 }
 
 
diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc
index e1e27ceef94d..fe68f681ae28 100644
--- a/src/nix-store/nix-store.cc
+++ b/src/nix-store/nix-store.cc
@@ -860,7 +860,7 @@ static void opServe(Strings opFlags, Strings opArgs)
             }
 
             case cmdDumpStorePath:
-                dumpPath(readStorePath(*store, in), out);
+                store->narFromPath(readStorePath(*store, in), out);
                 break;
 
             case cmdImportPaths: {
@@ -924,6 +924,28 @@ static void opServe(Strings opFlags, Strings opArgs)
                 break;
             }
 
+            case cmdAddToStoreNar: {
+                if (!writeAllowed) throw Error("importing paths is not allowed");
+
+                ValidPathInfo info;
+                info.path = readStorePath(*store, in);
+                in >> info.deriver;
+                if (!info.deriver.empty())
+                    store->assertStorePath(info.deriver);
+                info.narHash = Hash(readString(in), htSHA256);
+                info.references = readStorePaths<PathSet>(*store, in);
+                in >> info.registrationTime >> info.narSize >> info.ultimate;
+                info.sigs = readStrings<StringSet>(in);
+                in >> info.ca;
+
+                // FIXME: race if addToStore doesn't read source?
+                store->addToStore(info, in, NoRepair, NoCheckSigs);
+
+                out << 1; // indicate success
+
+                break;
+            }
+
             default:
                 throw Error(format("unknown serve command %1%") % cmd);
         }
diff --git a/src/nix/copy.cc b/src/nix/copy.cc
index e4e6c3e303ed..91711c8b46da 100644
--- a/src/nix/copy.cc
+++ b/src/nix/copy.cc
@@ -72,6 +72,10 @@ struct CmdCopy : StorePathsCommand
                 "To populate the current folder build output to a S3 binary cache:",
                 "nix copy --to s3://my-bucket?region=eu-west-1"
             },
+            Example{
+                "To populate the current folder build output to an S3-compatible binary cache:",
+                "nix copy --to s3://my-bucket?region=eu-west-1&endpoint=example.com"
+            },
 #endif
         };
     }
diff --git a/src/nix/installables.cc b/src/nix/installables.cc
index 0be992b03c5a..0c1ad3ab3db0 100644
--- a/src/nix/installables.cc
+++ b/src/nix/installables.cc
@@ -96,7 +96,7 @@ struct InstallableStorePath : Installable
 
     Buildables toBuildables() override
     {
-        return {{"", {{"out", storePath}}}};
+        return {{isDerivation(storePath) ? storePath : "", {{"out", storePath}}}};
     }
 };
 
diff --git a/src/nix/main.cc b/src/nix/main.cc
index 9cd5d21c84b6..69791e223c22 100644
--- a/src/nix/main.cc
+++ b/src/nix/main.cc
@@ -24,7 +24,6 @@ struct NixArgs : virtual MultiCommand, virtual MixCommonArgs
     {
         mkFlag()
             .longName("help")
-            .shortName('h')
             .description("show usage information")
             .handler([&]() { showHelpAndExit(); });
 
diff --git a/src/nix/path-info.cc b/src/nix/path-info.cc
index 47caa401d3c9..dea5f0557b81 100644
--- a/src/nix/path-info.cc
+++ b/src/nix/path-info.cc
@@ -4,8 +4,8 @@
 #include "json.hh"
 #include "common-args.hh"
 
-#include <iomanip>
 #include <algorithm>
+#include <array>
 
 using namespace nix;
 
@@ -13,12 +13,14 @@ struct CmdPathInfo : StorePathsCommand, MixJSON
 {
     bool showSize = false;
     bool showClosureSize = false;
+    bool humanReadable = false;
     bool showSigs = false;
 
     CmdPathInfo()
     {
         mkFlag('s', "size", "print size of the NAR dump of each path", &showSize);
         mkFlag('S', "closure-size", "print sum size of the NAR dumps of the closure of each path", &showClosureSize);
+        mkFlag('h', "human-readable", "with -s and -S, print sizes like 1K 234M 5.67G etc.", &humanReadable);
         mkFlag(0, "sigs", "show signatures", &showSigs);
     }
 
@@ -40,6 +42,10 @@ struct CmdPathInfo : StorePathsCommand, MixJSON
                 "nix path-info -rS /run/current-system | sort -nk2"
             },
             Example{
+                "To show a package's closure size and all its dependencies with human readable sizes:",
+                "nix path-info -rsSh nixpkgs.rust"
+            },
+            Example{
                 "To check the existence of a path in a binary cache:",
                 "nix path-info -r /nix/store/7qvk5c91...-geeqie-1.1 --store https://cache.nixos.org/"
             },
@@ -58,6 +64,25 @@ struct CmdPathInfo : StorePathsCommand, MixJSON
         };
     }
 
+    void printSize(unsigned long long value)
+    {
+        if (!humanReadable) {
+            std::cout << fmt("\t%11d", value);
+            return;
+        }
+
+        static const std::array<char, 9> idents{{
+            ' ', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'
+        }};
+        size_t power = 0;
+        double res = value;
+        while (res > 1024 && power < idents.size()) {
+            ++power;
+            res /= 1024;
+        }
+        std::cout << fmt("\t%6.1f%c", res, idents.at(power));
+    }
+
     void run(ref<Store> store, Paths storePaths) override
     {
         size_t pathLen = 0;
@@ -78,13 +103,16 @@ struct CmdPathInfo : StorePathsCommand, MixJSON
                 auto info = store->queryPathInfo(storePath);
                 storePath = info->path; // FIXME: screws up padding
 
-                std::cout << storePath << std::string(std::max(0, (int) pathLen - (int) storePath.size()), ' ');
+                std::cout << storePath;
+
+                if (showSize || showClosureSize || showSigs)
+                    std::cout << std::string(std::max(0, (int) pathLen - (int) storePath.size()), ' ');
 
                 if (showSize)
-                    std::cout << '\t' << std::setw(11) << info->narSize;
+                    printSize(info->narSize);
 
                 if (showClosureSize)
-                    std::cout << '\t' << std::setw(11) << store->getClosureSize(storePath).first;
+                    printSize(store->getClosureSize(storePath).first);
 
                 if (showSigs) {
                     std::cout << '\t';
diff --git a/src/nix/progress-bar.cc b/src/nix/progress-bar.cc
index 8093d8761c4d..40b905ba3243 100644
--- a/src/nix/progress-bar.cc
+++ b/src/nix/progress-bar.cc
@@ -75,10 +75,9 @@ public:
         updateThread = std::thread([&]() {
             auto state(state_.lock());
             while (state->active) {
-                auto r = state.wait_for(updateCV, std::chrono::seconds(1));
+                state.wait(updateCV);
                 draw(*state);
-                if (r == std::cv_status::no_timeout)
-                  state.wait_for(quitCV, std::chrono::milliseconds(50));
+                state.wait_for(quitCV, std::chrono::milliseconds(50));
             }
         });
     }
diff --git a/src/nix/repl.cc b/src/nix/repl.cc
index 1eb716006375..1bbe256b2d8b 100644
--- a/src/nix/repl.cc
+++ b/src/nix/repl.cc
@@ -31,6 +31,7 @@ struct NixRepl
 {
     string curDir;
     EvalState state;
+    Bindings * autoArgs;
 
     Strings loadedFiles;
 
@@ -173,9 +174,14 @@ void NixRepl::mainLoop(const std::vector<std::string> & files)
             printMsg(lvlError, format(error + "%1%%2%") % (settings.showTrace ? e.prefix() : "") % e.msg());
         }
 
+        if (input.size() > 0) {
+            // Remove trailing newline before adding to history
+            input.erase(input.size() - 1);
+            linenoiseHistoryAdd(input.c_str());
+        }
+
         // We handled the current input fully, so we should clear it
         // and read brand new input.
-        linenoiseHistoryAdd(input.c_str());
         input.clear();
         std::cout << std::endl;
     }
@@ -441,8 +447,7 @@ void NixRepl::loadFile(const Path & path)
     loadedFiles.push_back(path);
     Value v, v2;
     state.evalFile(lookupFileArg(state, path), v);
-    Bindings & bindings(*state.allocBindings(0));
-    state.autoCallFunction(bindings, v, v2);
+    state.autoCallFunction(*autoArgs, v, v2);
     addAttrsToScope(v2);
 }
 
@@ -694,6 +699,7 @@ struct CmdRepl : StoreCommand, MixEvalArgs
     void run(ref<Store> store) override
     {
         auto repl = std::make_unique<NixRepl>(searchPath, openStore());
+        repl->autoArgs = getAutoArgs(repl->state);
         repl->mainLoop(files);
     }
 };
diff --git a/src/nix/run.cc b/src/nix/run.cc
index d04e106e037b..35b763345872 100644
--- a/src/nix/run.cc
+++ b/src/nix/run.cc
@@ -7,11 +7,14 @@
 #include "finally.hh"
 #include "fs-accessor.hh"
 #include "progress-bar.hh"
+#include "affinity.hh"
 
 #if __linux__
 #include <sys/mount.h>
 #endif
 
+#include <queue>
+
 using namespace nix;
 
 std::string chrootHelperName = "__run_in_chroot";
@@ -121,10 +124,27 @@ struct CmdRun : InstallablesCommand
                 unsetenv(var.c_str());
         }
 
+        std::unordered_set<Path> done;
+        std::queue<Path> todo;
+        for (auto & path : outPaths) todo.push(path);
+
         auto unixPath = tokenizeString<Strings>(getEnv("PATH"), ":");
-        for (auto & path : outPaths)
-            if (accessor->stat(path + "/bin").type != FSAccessor::tMissing)
+
+        while (!todo.empty()) {
+            Path path = todo.front();
+            todo.pop();
+            if (!done.insert(path).second) continue;
+
+            if (true)
                 unixPath.push_front(path + "/bin");
+
+            auto propPath = path + "/nix-support/propagated-user-env-packages";
+            if (accessor->stat(propPath).type == FSAccessor::tRegular) {
+                for (auto & p : tokenizeString<Paths>(readFile(propPath)))
+                    todo.push(p);
+            }
+        }
+
         setenv("PATH", concatStringsSep(":", unixPath).c_str(), 1);
 
         std::string cmd = *command.begin();
@@ -135,6 +155,8 @@ struct CmdRun : InstallablesCommand
 
         restoreSignals();
 
+        restoreAffinity();
+
         /* If this is a diverted store (i.e. its "logical" location
            (typically /nix/store) differs from its "physical" location
            (e.g. /home/eelco/nix/store), then run the command in a
diff --git a/src/nix/upgrade-nix.cc b/src/nix/upgrade-nix.cc
index e23ae792369c..35c44a70cf52 100644
--- a/src/nix/upgrade-nix.cc
+++ b/src/nix/upgrade-nix.cc
@@ -1,14 +1,18 @@
 #include "command.hh"
+#include "common-args.hh"
 #include "store-api.hh"
 #include "download.hh"
 #include "eval.hh"
 #include "attr-path.hh"
+#include "names.hh"
+#include "progress-bar.hh"
 
 using namespace nix;
 
-struct CmdUpgradeNix : StoreCommand
+struct CmdUpgradeNix : MixDryRun, StoreCommand
 {
     Path profileDir;
+    std::string storePathsUrl = "https://github.com/NixOS/nixpkgs/raw/master/nixos/modules/installer/tools/nix-fallback-paths.nix";
 
     CmdUpgradeNix()
     {
@@ -18,6 +22,12 @@ struct CmdUpgradeNix : StoreCommand
             .labels({"profile-dir"})
             .description("the Nix profile to upgrade")
             .dest(&profileDir);
+
+        mkFlag()
+            .longName("nix-store-paths-url")
+            .labels({"url"})
+            .description("URL of the file that contains the store paths of the latest Nix release")
+            .dest(&storePathsUrl);
     }
 
     std::string name() override
@@ -59,6 +69,14 @@ struct CmdUpgradeNix : StoreCommand
             storePath = getLatestNix(store);
         }
 
+        auto version = DrvName(storePathToName(storePath)).version;
+
+        if (dryRun) {
+            stopProgressBar();
+            printError("would upgrade to version %s", version);
+            return;
+        }
+
         {
             Activity act(*logger, lvlInfo, actUnknown, fmt("downloading '%s'...", storePath));
             store->ensurePath(storePath);
@@ -72,11 +90,15 @@ struct CmdUpgradeNix : StoreCommand
                 throw Error("could not verify that '%s' works", program);
         }
 
+        stopProgressBar();
+
         {
             Activity act(*logger, lvlInfo, actUnknown, fmt("installing '%s' into profile '%s'...", storePath, profileDir));
             runProgram(settings.nixBinDir + "/nix-env", false,
                 {"--profile", profileDir, "-i", storePath, "--no-sandbox"});
         }
+
+        printError(ANSI_GREEN "upgrade to version %s done" ANSI_NORMAL, version);
     }
 
     /* Return the profile in which Nix is installed. */
@@ -98,11 +120,18 @@ struct CmdUpgradeNix : StoreCommand
         if (hasPrefix(where, "/run/current-system"))
             throw Error("Nix on NixOS must be upgraded via 'nixos-rebuild'");
 
-        Path profileDir;
-        Path userEnv;
+        Path profileDir = dirOf(where);
+
+        // Resolve profile to /nix/var/nix/profiles/<name> link.
+        while (canonPath(profileDir).find("/profiles/") == std::string::npos && isLink(profileDir))
+            profileDir = readLink(profileDir);
+
+        printInfo("found profile '%s'", profileDir);
+
+        Path userEnv = canonPath(profileDir, true);
 
         if (baseNameOf(where) != "bin" ||
-            !hasSuffix(userEnv = canonPath(profileDir = dirOf(where), true), "user-environment"))
+            !hasSuffix(userEnv, "user-environment"))
             throw Error("directory '%s' does not appear to be part of a Nix profile", where);
 
         if (!store->isValidPath(userEnv))
@@ -115,7 +144,7 @@ struct CmdUpgradeNix : StoreCommand
     Path getLatestNix(ref<Store> store)
     {
         // FIXME: use nixos.org?
-        auto req = DownloadRequest("https://github.com/NixOS/nixpkgs/raw/master/nixos/modules/installer/tools/nix-fallback-paths.nix");
+        auto req = DownloadRequest(storePathsUrl);
         auto res = getDownloader()->download(req);
 
         auto state = std::make_unique<EvalState>(Strings(), store);