about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libexpr/common-opts.cc2
-rw-r--r--src/libexpr/parser.y2
-rw-r--r--src/libexpr/primops.cc2
-rw-r--r--src/libstore/builtins.cc10
-rw-r--r--src/libstore/download.cc550
-rw-r--r--src/libstore/download.hh35
-rw-r--r--src/libstore/http-binary-cache-store.cc26
-rwxr-xr-xsrc/nix-channel/nix-channel.cc8
-rw-r--r--src/nix-prefetch-url/nix-prefetch-url.cc2
9 files changed, 430 insertions, 207 deletions
diff --git a/src/libexpr/common-opts.cc b/src/libexpr/common-opts.cc
index 8a7989aac663..06d6ed87df94 100644
--- a/src/libexpr/common-opts.cc
+++ b/src/libexpr/common-opts.cc
@@ -55,7 +55,7 @@ bool parseSearchPathArg(Strings::iterator & i,
 Path lookupFileArg(EvalState & state, string s)
 {
     if (isUri(s))
-        return makeDownloader()->downloadCached(state.store, s, true);
+        return getDownloader()->downloadCached(state.store, s, true);
     else if (s.size() > 2 && s.at(0) == '<' && s.at(s.size() - 1) == '>') {
         Path p = s.substr(1, s.size() - 2);
         return state.findFile(p);
diff --git a/src/libexpr/parser.y b/src/libexpr/parser.y
index 776e5cb39b81..3f7eed16f0f4 100644
--- a/src/libexpr/parser.y
+++ b/src/libexpr/parser.y
@@ -662,7 +662,7 @@ std::pair<bool, std::string> EvalState::resolveSearchPathElem(const SearchPathEl
                 // FIXME: support specifying revision/branch
                 res = { true, exportGit(store, elem.second, "master") };
             else
-                res = { true, makeDownloader()->downloadCached(store, elem.second, true) };
+                res = { true, getDownloader()->downloadCached(store, elem.second, true) };
         } catch (DownloadError & e) {
             printMsg(lvlError, format("warning: Nix search path entry ‘%1%’ cannot be downloaded, ignoring") % elem.second);
             res = { false, "" };
diff --git a/src/libexpr/primops.cc b/src/libexpr/primops.cc
index 3b965f209bb2..e460000841ac 100644
--- a/src/libexpr/primops.cc
+++ b/src/libexpr/primops.cc
@@ -1769,7 +1769,7 @@ void fetch(EvalState & state, const Pos & pos, Value * * args, Value & v,
     if (state.restricted && !expectedHash)
         throw Error(format("‘%1%’ is not allowed in restricted mode") % who);
 
-    Path res = makeDownloader()->downloadCached(state.store, url, unpack, name, expectedHash);
+    Path res = getDownloader()->downloadCached(state.store, url, unpack, name, expectedHash);
     mkString(v, res, PathSet({res}));
 }
 
diff --git a/src/libstore/builtins.cc b/src/libstore/builtins.cc
index d3194a905733..a30f30906f01 100644
--- a/src/libstore/builtins.cc
+++ b/src/libstore/builtins.cc
@@ -17,13 +17,15 @@ void builtinFetchurl(const BasicDerivation & drv)
     auto fetch = [&](const string & url) {
         /* No need to do TLS verification, because we check the hash of
            the result anyway. */
-        DownloadOptions options;
-        options.verifyTLS = false;
+        DownloadRequest request(url);
+        request.verifyTLS = false;
 
         /* Show a progress indicator, even though stderr is not a tty. */
-        options.showProgress = DownloadOptions::yes;
+        request.showProgress = DownloadRequest::yes;
 
-        auto data = makeDownloader()->download(url, options);
+        /* Note: have to use a fresh downloader here because we're in
+           a forked process. */
+        auto data = makeDownloader()->download(request);
         assert(data.data);
 
         return data.data;
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index ed7e124d25f4..2aca0a975bfe 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -5,10 +5,15 @@
 #include "store-api.hh"
 #include "archive.hh"
 
+#include <unistd.h>
+#include <fcntl.h>
+
 #include <curl/curl.h>
 
 #include <iostream>
 #include <thread>
+#include <cmath>
+#include <random>
 
 
 namespace nix {
@@ -30,225 +35,428 @@ std::string resolveUri(const std::string & uri)
 
 struct CurlDownloader : public Downloader
 {
-    CURL * curl;
-    ref<std::string> data;
-    string etag, status, expectedETag, effectiveUrl;
-
-    struct curl_slist * requestHeaders;
+    CURLM * curlm = 0;
 
-    bool showProgress;
-    double prevProgressTime{0}, startTime{0};
-    unsigned int moveBack{1};
+    std::random_device rd;
+    std::mt19937 mt19937;
 
-    size_t writeCallback(void * contents, size_t size, size_t nmemb)
+    struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
     {
-        size_t realSize = size * nmemb;
-        data->append((char *) contents, realSize);
-        return realSize;
-    }
+        CurlDownloader & downloader;
+        DownloadRequest request;
+        DownloadResult result;
+        bool done = false; // whether the promise has been set
+        std::promise<DownloadResult> promise;
+        CURL * req = 0;
+        bool active = false; // whether the handle has been added to the multi object
+        std::string status;
+
+        bool showProgress = false;
+        double prevProgressTime{0}, startTime{0};
+        unsigned int moveBack{1};
+
+        unsigned int attempt = 0;
+
+        /* Don't start this download until the specified time point
+           has been reached. */
+        std::chrono::steady_clock::time_point embargo;
+
+        struct curl_slist * requestHeaders = 0;
+
+        DownloadItem(CurlDownloader & downloader, const DownloadRequest & request)
+            : downloader(downloader), request(request)
+        {
+            showProgress =
+                request.showProgress == DownloadRequest::yes ||
+                (request.showProgress == DownloadRequest::automatic && isatty(STDERR_FILENO));
+
+            if (!request.expectedETag.empty())
+                requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
+        }
 
-    static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
-    {
-        return ((CurlDownloader *) userp)->writeCallback(contents, size, nmemb);
-    }
+        ~DownloadItem()
+        {
+            if (req) {
+                if (active)
+                    curl_multi_remove_handle(downloader.curlm, req);
+                curl_easy_cleanup(req);
+            }
+            if (requestHeaders) curl_slist_free_all(requestHeaders);
+            try {
+                if (!done)
+                    fail(DownloadError(Transient, format("download of ‘%s’ was interrupted") % request.uri));
+            } catch (...) {
+                ignoreException();
+            }
+        }
 
-    size_t headerCallback(void * contents, size_t size, size_t nmemb)
-    {
-        size_t realSize = size * nmemb;
-        string line = string((char *) contents, realSize);
-        printMsg(lvlVomit, format("got header: %1%") % trim(line));
-        if (line.compare(0, 5, "HTTP/") == 0) { // new response starts
-            etag = "";
-            auto ss = tokenizeString<vector<string>>(line, " ");
-            status = ss.size() >= 2 ? ss[1] : "";
-        } else {
-            auto i = line.find(':');
-            if (i != string::npos) {
-                string name = trim(string(line, 0, i));
-                if (name == "ETag") { // FIXME: case
-                    etag = trim(string(line, i + 1));
-                    /* Hack to work around a GitHub bug: it sends
-                       ETags, but ignores If-None-Match. So if we get
-                       the expected ETag on a 200 response, then shut
-                       down the connection because we already have the
-                       data. */
-                    printMsg(lvlDebug, format("got ETag: %1%") % etag);
-                    if (etag == expectedETag && status == "200") {
-                        printMsg(lvlDebug, format("shutting down on 200 HTTP response with expected ETag"));
-                        return 0;
+        template<class T>
+        void fail(const T & e)
+        {
+            promise.set_exception(std::make_exception_ptr(e));
+            done = true;
+        }
+
+        size_t writeCallback(void * contents, size_t size, size_t nmemb)
+        {
+            size_t realSize = size * nmemb;
+            result.data->append((char *) contents, realSize);
+            return realSize;
+        }
+
+        static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
+        {
+            return ((DownloadItem *) userp)->writeCallback(contents, size, nmemb);
+        }
+
+        size_t headerCallback(void * contents, size_t size, size_t nmemb)
+        {
+            size_t realSize = size * nmemb;
+            std::string line((char *) contents, realSize);
+            printMsg(lvlVomit, format("got header for ‘%s’: %s") % request.uri % trim(line));
+            if (line.compare(0, 5, "HTTP/") == 0) { // new response starts
+                result.etag = "";
+                auto ss = tokenizeString<vector<string>>(line, " ");
+                status = ss.size() >= 2 ? ss[1] : "";
+                result.data = std::make_shared<std::string>();
+            } else {
+                auto i = line.find(':');
+                if (i != string::npos) {
+                    string name = toLower(trim(string(line, 0, i)));
+                    if (name == "etag") {
+                        result.etag = trim(string(line, i + 1));
+                        /* Hack to work around a GitHub bug: it sends
+                           ETags, but ignores If-None-Match. So if we get
+                           the expected ETag on a 200 response, then shut
+                           down the connection because we already have the
+                           data. */
+                        if (result.etag == request.expectedETag && status == "200") {
+                            debug(format("shutting down on 200 HTTP response with expected ETag"));
+                            return 0;
+                        }
                     }
                 }
             }
+            return realSize;
         }
-        return realSize;
-    }
 
-    static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
-    {
-        return ((CurlDownloader *) userp)->headerCallback(contents, size, nmemb);
-    }
+        static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
+        {
+            return ((DownloadItem *) userp)->headerCallback(contents, size, nmemb);
+        }
 
-    int progressCallback(double dltotal, double dlnow)
-    {
-        if (showProgress) {
-            double now = getTime();
-            if (prevProgressTime <= now - 1) {
-                string s = (format(" [%1$.0f/%2$.0f KiB, %3$.1f KiB/s]")
-                    % (dlnow / 1024.0)
-                    % (dltotal / 1024.0)
-                    % (now == startTime ? 0 : dlnow / 1024.0 / (now - startTime))).str();
-                std::cerr << "\e[" << moveBack << "D" << s;
-                moveBack = s.size();
+        int progressCallback(double dltotal, double dlnow)
+        {
+            if (showProgress) {
+                double now = getTime();
+                if (prevProgressTime <= now - 1) {
+                    string s = (format(" [%1$.0f/%2$.0f KiB, %3$.1f KiB/s]")
+                        % (dlnow / 1024.0)
+                        % (dltotal / 1024.0)
+                        % (now == startTime ? 0 : dlnow / 1024.0 / (now - startTime))).str();
+                    std::cerr << "\e[" << moveBack << "D" << s;
+                    moveBack = s.size();
+                    std::cerr.flush();
+                    prevProgressTime = now;
+                }
+            }
+            return _isInterrupted;
+        }
+
+        static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow)
+        {
+            return ((DownloadItem *) userp)->progressCallback(dltotal, dlnow);
+        }
+
+        void init()
+        {
+            // FIXME: handle parallel downloads.
+            if (showProgress) {
+                std::cerr << (format("downloading ‘%1%’... ") % request.uri);
                 std::cerr.flush();
-                prevProgressTime = now;
+                startTime = getTime();
             }
+
+            if (!req) req = curl_easy_init();
+
+            curl_easy_reset(req);
+            curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str());
+            curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
+            curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
+            curl_easy_setopt(req, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str());
+            curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
+            curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
+            curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper);
+            curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
+            curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, DownloadItem::headerCallbackWrapper);
+            curl_easy_setopt(req, CURLOPT_HEADERDATA, this);
+
+            curl_easy_setopt(req, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper);
+            curl_easy_setopt(req, CURLOPT_PROGRESSDATA, this);
+            curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0);
+
+            curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders);
+
+            if (request.head)
+                curl_easy_setopt(req, CURLOPT_NOBODY, 1);
+
+            if (request.verifyTLS)
+                curl_easy_setopt(req, CURLOPT_CAINFO, getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt").c_str());
+            else {
+                curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0);
+                curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
+            }
+
+            result.data = std::make_shared<std::string>();
         }
-        return _isInterrupted;
-    }
 
-    static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow)
-    {
-        return ((CurlDownloader *) userp)->progressCallback(dltotal, dlnow);
-    }
+        void finish(CURLcode code)
+        {
+            if (showProgress)
+                //std::cerr << "\e[" << moveBack << "D\e[K\n";
+                std::cerr << "\n";
 
-    CurlDownloader()
-        : data(make_ref<std::string>())
-    {
-        requestHeaders = 0;
+            long httpStatus = 0;
+            curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);
 
-        curl = curl_easy_init();
-        if (!curl) throw nix::Error("unable to initialize curl");
-    }
+            char * effectiveUrlCStr;
+            curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr);
+            if (effectiveUrlCStr)
+                result.effectiveUrl = effectiveUrlCStr;
 
-    ~CurlDownloader()
-    {
-        if (curl) curl_easy_cleanup(curl);
-        if (requestHeaders) curl_slist_free_all(requestHeaders);
-    }
+            debug(format("finished download of ‘%s’; curl status = %d, HTTP status = %d, body = %d bytes")
+                % request.uri % code % httpStatus % (result.data ? result.data->size() : 0));
 
-    bool fetch(const string & url, const DownloadOptions & options)
-    {
-        showProgress =
-            options.showProgress == DownloadOptions::yes ||
-            (options.showProgress == DownloadOptions::automatic && isatty(STDERR_FILENO));
+            if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
+                code = CURLE_OK;
+                httpStatus = 304;
+            }
+
+            if (code == CURLE_OK &&
+                (httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
+            {
+                result.cached = httpStatus == 304;
+                promise.set_value(result);
+                done = true;
+            } else {
+                Error err =
+                    (httpStatus == 404 || code == CURLE_FILE_COULDNT_READ_FILE) ? NotFound :
+                    httpStatus == 403 ? Forbidden :
+                    (httpStatus == 408 || httpStatus == 500 || httpStatus == 503
+                        || httpStatus == 504  || httpStatus == 522 || httpStatus == 524
+                        || code == CURLE_COULDNT_RESOLVE_HOST) ? Transient :
+                    Misc;
 
-        curl_easy_reset(curl);
+                attempt++;
 
-        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
-        curl_easy_setopt(curl, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str());
-        curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1);
+                auto exc =
+                    httpStatus != 0
+                    ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus)
+                    : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code);
+
+                /* If this is a transient error, then maybe retry the
+                   download after a while. */
+                if (err == Transient && attempt < request.tries) {
+                    int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
+                    printMsg(lvlError, format("warning: %s; retrying in %d ms") % exc.what() % ms);
+                    embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
+                    downloader.enqueueItem(shared_from_this());
+                }
+                else
+                    fail(exc);
+            }
+        }
+    };
 
-        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallbackWrapper);
-        curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) this);
+    struct State
+    {
+        bool quit = false;
+        std::vector<std::shared_ptr<DownloadItem>> incoming;
+    };
 
-        curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallbackWrapper);
-        curl_easy_setopt(curl, CURLOPT_HEADERDATA, (void *) this);
+    Sync<State> state_;
 
-        curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper);
-        curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, (void *) this);
-        curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0);
+    /* We can't use a std::condition_variable to wake up the curl
+       thread, because it only monitors file descriptors. So use a
+       pipe instead. */
+    Pipe wakeupPipe;
 
-        curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
+    std::thread workerThread;
 
-        curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
+    CurlDownloader()
+    {
+        static std::once_flag globalInit;
+        std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
 
-        if (options.verifyTLS)
-            curl_easy_setopt(curl, CURLOPT_CAINFO, getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt").c_str());
-        else {
-            curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0);
-            curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0);
-        }
+        curlm = curl_multi_init();
 
-        data = make_ref<std::string>();
+        curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
+        curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, 25); // FIXME: configurable
 
-        if (requestHeaders) {
-            curl_slist_free_all(requestHeaders);
-            requestHeaders = 0;
-        }
+        wakeupPipe.create();
+        fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
+
+        workerThread = std::thread([&]() { workerThreadEntry(); });
+    }
 
-        if (!options.expectedETag.empty()) {
-            this->expectedETag = options.expectedETag;
-            requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + options.expectedETag).c_str());
+    ~CurlDownloader()
+    {
+        /* Signal the worker thread to exit. */
+        {
+            auto state(state_.lock());
+            state->quit = true;
         }
+        writeFull(wakeupPipe.writeSide.get(), " ");
 
-        curl_easy_setopt(curl, CURLOPT_HTTPHEADER, requestHeaders);
+        workerThread.join();
 
-        if (options.head)
-            curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
+        if (curlm) curl_multi_cleanup(curlm);
+    }
 
-        if (showProgress) {
-            std::cerr << (format("downloading ‘%1%’... ") % url);
-            std::cerr.flush();
-            startTime = getTime();
-        }
+    void workerThreadMain()
+    {
+        std::map<CURL *, std::shared_ptr<DownloadItem>> items;
+
+        bool quit;
+
+        std::chrono::steady_clock::time_point nextWakeup;
+
+        while (!quit) {
+            checkInterrupt();
+
+            /* Let curl do its thing. */
+            int running;
+            CURLMcode mc = curl_multi_perform(curlm, &running);
+            if (mc != CURLM_OK)
+                throw nix::Error(format("unexpected error from curl_multi_perform(): %s") % curl_multi_strerror(mc));
+
+            /* Set the promises of any finished requests. */
+            CURLMsg * msg;
+            int left;
+            while ((msg = curl_multi_info_read(curlm, &left))) {
+                if (msg->msg == CURLMSG_DONE) {
+                    auto i = items.find(msg->easy_handle);
+                    assert(i != items.end());
+                    i->second->finish(msg->data.result);
+                    curl_multi_remove_handle(curlm, i->second->req);
+                    i->second->active = false;
+                    items.erase(i);
+                }
+            }
 
-        CURLcode res = curl_easy_perform(curl);
-        if (showProgress)
-            //std::cerr << "\e[" << moveBack << "D\e[K\n";
-            std::cerr << "\n";
-        checkInterrupt();
-        if (res == CURLE_WRITE_ERROR && etag == options.expectedETag) return false;
-
-        long httpStatus = -1;
-        curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpStatus);
-
-        if (res != CURLE_OK) {
-            Error err =
-                httpStatus == 404 ? NotFound :
-                httpStatus == 403 ? Forbidden :
-                (httpStatus == 408 || httpStatus == 500 || httpStatus == 503
-                 || httpStatus == 504  || httpStatus == 522 || httpStatus == 524
-                 || res == CURLE_COULDNT_RESOLVE_HOST) ? Transient :
-                Misc;
-            if (res == CURLE_HTTP_RETURNED_ERROR && httpStatus != -1)
-                throw DownloadError(err, format("unable to download ‘%s’: HTTP error %d")
-                    % url % httpStatus);
-            else
-                throw DownloadError(err, format("unable to download ‘%s’: %s (%d)")
-                    % url % curl_easy_strerror(res) % res);
-        }
+            /* Wait for activity, including wakeup events. */
+            int numfds = 0;
+            struct curl_waitfd extraFDs[1];
+            extraFDs[0].fd = wakeupPipe.readSide.get();
+            extraFDs[0].events = CURL_WAIT_POLLIN;
+            extraFDs[0].revents = 0;
+            auto sleepTimeMs =
+                nextWakeup != std::chrono::steady_clock::time_point()
+                ? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count())
+                : 1000000000;
+            //printMsg(lvlVomit, format("download thread waiting for %d ms") % sleepTimeMs);
+            mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds);
+            if (mc != CURLM_OK)
+                throw nix::Error(format("unexpected error from curl_multi_wait(): %s") % curl_multi_strerror(mc));
+
+            nextWakeup = std::chrono::steady_clock::time_point();
+
+            /* Add new curl requests from the incoming requests queue,
+               except for requests that are embargoed (waiting for a
+               retry timeout to expire). FIXME: should use a priority
+               queue for the embargoed items to prevent repeated O(n)
+               checks. */
+            if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
+                char buf[1024];
+                auto res = read(extraFDs[0].fd, buf, sizeof(buf));
+                if (res == -1 && errno != EINTR)
+                    throw SysError("reading curl wakeup socket");
+            }
 
-        char *effectiveUrlCStr;
-        curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr);
-        if (effectiveUrlCStr)
-            effectiveUrl = effectiveUrlCStr;
+            std::vector<std::shared_ptr<DownloadItem>> incoming, embargoed;
+            auto now = std::chrono::steady_clock::now();
+
+            {
+                auto state(state_.lock());
+                for (auto & item: state->incoming) {
+                    if (item->embargo <= now)
+                        incoming.push_back(item);
+                    else {
+                        embargoed.push_back(item);
+                        if (nextWakeup == std::chrono::steady_clock::time_point()
+                            || item->embargo < nextWakeup)
+                            nextWakeup = item->embargo;
+                    }
+                }
+                state->incoming = embargoed;
+                quit = state->quit;
+            }
 
-        if (httpStatus == 304) return false;
+            for (auto & item : incoming) {
+                debug(format("starting download of %s") % item->request.uri);
+                item->init();
+                curl_multi_add_handle(curlm, item->req);
+                item->active = true;
+                items[item->req] = item;
+            }
+        }
 
-        return true;
+        debug("download thread shutting down");
     }
 
-    DownloadResult download(string url, const DownloadOptions & options) override
+    void workerThreadEntry()
     {
-        size_t attempt = 0;
+        try {
+            workerThreadMain();
+        } catch (Interrupted & e) {
+        } catch (std::exception & e) {
+            printMsg(lvlError, format("unexpected error in download thread: %s") % e.what());
+        }
 
-        while (true) {
-            try {
-                DownloadResult res;
-                if (fetch(resolveUri(url), options)) {
-                    res.cached = false;
-                    res.data = data;
-                } else
-                    res.cached = true;
-                res.effectiveUrl = effectiveUrl;
-                res.etag = etag;
-                return res;
-            } catch (DownloadError & e) {
-                attempt++;
-                if (e.error != Transient || attempt >= options.tries) throw;
-                auto ms = options.baseRetryTimeMs * (1 << (attempt - 1));
-                printMsg(lvlError, format("warning: %s; retrying in %d ms") % e.what() % ms);
-                std::this_thread::sleep_for(std::chrono::milliseconds(ms));
-            }
+        {
+            auto state(state_.lock());
+            state->incoming.clear();
+            state->quit = true;
         }
     }
+
+    void enqueueItem(std::shared_ptr<DownloadItem> item)
+    {
+        {
+            auto state(state_.lock());
+            if (state->quit)
+                throw nix::Error("cannot enqueue download request because the download thread is shutting down");
+            state->incoming.push_back(item);
+        }
+        writeFull(wakeupPipe.writeSide.get(), " ");
+    }
+
+    std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) override
+    {
+        auto item = std::make_shared<DownloadItem>(*this, request);
+        enqueueItem(item);
+        return item->promise.get_future();
+    }
 };
 
+ref<Downloader> getDownloader()
+{
+    static std::shared_ptr<Downloader> downloader;
+    static std::once_flag downloaderCreated;
+    std::call_once(downloaderCreated, [&]() { downloader = makeDownloader(); });
+    return ref<Downloader>(downloader);
+}
+
 ref<Downloader> makeDownloader()
 {
     return make_ref<CurlDownloader>();
 }
 
+DownloadResult Downloader::download(const DownloadRequest & request)
+{
+    return enqueueDownload(request).get();
+}
+
 Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl)
 {
     auto url = resolveUri(url_);
@@ -303,9 +511,9 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa
     if (!skip) {
 
         try {
-            DownloadOptions options;
-            options.expectedETag = expectedETag;
-            auto res = download(url, options);
+            DownloadRequest request(url);
+            request.expectedETag = expectedETag;
+            auto res = download(request);
             if (effectiveUrl)
                 *effectiveUrl = res.effectiveUrl;
 
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index 011b85f4721b..6b90ff20273a 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -4,24 +4,30 @@
 #include "hash.hh"
 
 #include <string>
+#include <future>
 
 namespace nix {
 
-struct DownloadOptions
+struct DownloadRequest
 {
+    std::string uri;
     std::string expectedETag;
     bool verifyTLS = true;
     enum { yes, no, automatic } showProgress = yes;
     bool head = false;
     size_t tries = 1;
-    unsigned int baseRetryTimeMs = 100;
+    unsigned int baseRetryTimeMs = 250;
+
+    DownloadRequest(const std::string & uri) : uri(uri) { }
 };
 
 struct DownloadResult
 {
+    enum Status { Success, NotFound, Forbidden, Misc, Transient };
+    Status status;
     bool cached;
-    string etag;
-    string effectiveUrl;
+    std::string etag;
+    std::string effectiveUrl;
     std::shared_ptr<std::string> data;
 };
 
@@ -29,14 +35,29 @@ class Store;
 
 struct Downloader
 {
-    virtual DownloadResult download(string url, const DownloadOptions & options) = 0;
+    /* Enqueue a download request, returning a future to the result of
+       the download. The future may throw a DownloadError
+       exception. */
+    virtual std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) = 0;
+
+    /* Synchronously download a file. */
+    DownloadResult download(const DownloadRequest & request);
 
-    Path downloadCached(ref<Store> store, const string & url, bool unpack, string name = "",
-        const Hash & expectedHash = Hash(), string * effectiveUrl = nullptr);
+    /* Check if the specified file is already in ~/.cache/nix/tarballs
+       and is more recent than ‘tarball-ttl’ seconds. Otherwise,
+       use the recorded ETag to verify if the server has a more
+       recent version, and if so, download it to the Nix store. */
+    Path downloadCached(ref<Store> store, const string & uri, bool unpack, string name = "",
+        const Hash & expectedHash = Hash(), string * effectiveUri = nullptr);
 
     enum Error { NotFound, Forbidden, Misc, Transient };
 };
 
+/* Return a shared Downloader object. Using this object is preferred
+   because it enables connection reuse and HTTP/2 multiplexing. */
+ref<Downloader> getDownloader();
+
+/* Return a new Downloader object. */
 ref<Downloader> makeDownloader();
 
 class DownloadError : public Error
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index bdcd2fd3998b..91ee6fcb69e2 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -13,17 +13,12 @@ private:
 
     Path cacheUri;
 
-    Pool<Downloader> downloaders;
-
 public:
 
     HttpBinaryCacheStore(
         const Params & params, const Path & _cacheUri)
         : BinaryCacheStore(params)
         , cacheUri(_cacheUri)
-        , downloaders(
-            std::numeric_limits<size_t>::max(),
-            []() { return makeDownloader(); })
     {
         if (cacheUri.back() == '/')
             cacheUri.pop_back();
@@ -54,12 +49,11 @@ protected:
     bool fileExists(const std::string & path) override
     {
         try {
-            auto downloader(downloaders.get());
-            DownloadOptions options;
-            options.showProgress = DownloadOptions::no;
-            options.head = true;
-            options.tries = 5;
-            downloader->download(cacheUri + "/" + path, options);
+            DownloadRequest request(cacheUri + "/" + path);
+            request.showProgress = DownloadRequest::no;
+            request.head = true;
+            request.tries = 5;
+            getDownloader()->download(request);
             return true;
         } catch (DownloadError & e) {
             /* S3 buckets return 403 if a file doesn't exist and the
@@ -77,13 +71,11 @@ protected:
 
     std::shared_ptr<std::string> getFile(const std::string & path) override
     {
-        auto downloader(downloaders.get());
-        DownloadOptions options;
-        options.showProgress = DownloadOptions::no;
-        options.tries = 5;
-        options.baseRetryTimeMs = 1000;
+        DownloadRequest request(cacheUri + "/" + path);
+        request.showProgress = DownloadRequest::no;
+        request.tries = 8;
         try {
-            return downloader->download(cacheUri + "/" + path, options).data;
+            return getDownloader()->download(request).data;
         } catch (DownloadError & e) {
             if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
                 return 0;
diff --git a/src/nix-channel/nix-channel.cc b/src/nix-channel/nix-channel.cc
index 0f7858aa53a5..5b4c2181996c 100755
--- a/src/nix-channel/nix-channel.cc
+++ b/src/nix-channel/nix-channel.cc
@@ -85,7 +85,7 @@ static void update(const StringSet & channelNames)
         // got redirected in the process, so that we can grab the various parts of a nix channel
         // definition from a consistent location if the redirect changes mid-download.
         auto effectiveUrl = string{};
-        auto dl = makeDownloader();
+        auto dl = getDownloader();
         auto filename = dl->downloadCached(store, url, false, "", Hash(), &effectiveUrl);
         url = chomp(std::move(effectiveUrl));
 
@@ -114,10 +114,10 @@ static void update(const StringSet & channelNames)
         if (!unpacked) {
             // The URL doesn't unpack directly, so let's try treating it like a full channel folder with files in it
             // Check if the channel advertises a binary cache.
-            DownloadOptions opts;
-            opts.showProgress = DownloadOptions::no;
+            DownloadRequest request(url + "/binary-cache-url");
+            request.showProgress = DownloadRequest::no;
             try {
-                auto dlRes = dl->download(url + "/binary-cache-url", opts);
+                auto dlRes = dl->download(request);
                 extraAttrs = "binaryCacheURL = \"" + *dlRes.data + "\";";
             } catch (DownloadError & e) {
             }
diff --git a/src/nix-prefetch-url/nix-prefetch-url.cc b/src/nix-prefetch-url/nix-prefetch-url.cc
index 00f5ae28d1dc..2bf2b2e5c3b1 100644
--- a/src/nix-prefetch-url/nix-prefetch-url.cc
+++ b/src/nix-prefetch-url/nix-prefetch-url.cc
@@ -158,7 +158,7 @@ int main(int argc, char * * argv)
             auto actualUri = resolveMirrorUri(state, uri);
 
             /* Download the file. */
-            auto result = makeDownloader()->download(actualUri, DownloadOptions());
+            auto result = getDownloader()->download(DownloadRequest(actualUri));
 
             AutoDelete tmpDir(createTempDir(), true);
             Path tmpFile = (Path) tmpDir + "/tmp";