about summary refs log tree commit diff
path: root/src/libstore/download.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r--src/libstore/download.cc103
1 files changed, 85 insertions, 18 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 6ce9525c38de..91087eebcfcb 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -8,7 +8,6 @@
 #include "compression.hh"
 #include "pathlocks.hh"
 #include "finally.hh"
-#include "retry.hh"
 
 #ifdef ENABLE_S3
 #include <aws/core/client/ClientConfiguration.h>
@@ -20,9 +19,11 @@
 #include <curl/curl.h>
 
 #include <algorithm>
+#include <cmath>
 #include <cstring>
 #include <iostream>
 #include <queue>
+#include <random>
 #include <thread>
 
 using namespace std::string_literals;
@@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader
 {
     CURLM * curlm = 0;
 
+    std::random_device rd;
+    std::mt19937 mt19937;
+
     struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
     {
         CurlDownloader & downloader;
@@ -57,10 +61,20 @@ struct CurlDownloader : public Downloader
         bool active = false; // whether the handle has been added to the multi object
         std::string status;
 
+        unsigned int attempt = 0;
+
+        /* Don't start this download until the specified time point
+           has been reached. */
+        std::chrono::steady_clock::time_point embargo;
+
         struct curl_slist * requestHeaders = 0;
 
         std::string encoding;
 
+        bool acceptRanges = false;
+
+        curl_off_t writtenToSink = 0;
+
         DownloadItem(CurlDownloader & downloader,
             const DownloadRequest & request,
             Callback<DownloadResult> callback)
@@ -71,9 +85,10 @@ struct CurlDownloader : public Downloader
                 {request.uri}, request.parentAct)
             , callback(callback)
             , finalSink([this](const unsigned char * data, size_t len) {
-                if (this->request.dataCallback)
+                if (this->request.dataCallback) {
+                    writtenToSink += len;
                     this->request.dataCallback((char *) data, len);
-                else
+                } else
                     this->result.data->append((char *) data, len);
               })
         {
@@ -151,6 +166,7 @@ struct CurlDownloader : public Downloader
                 status = ss.size() >= 2 ? ss[1] : "";
                 result.data = std::make_shared<std::string>();
                 result.bodySize = 0;
+                acceptRanges = false;
                 encoding = "";
             } else {
                 auto i = line.find(':');
@@ -168,7 +184,9 @@ struct CurlDownloader : public Downloader
                             return 0;
                         }
                     } else if (name == "content-encoding")
-                        encoding = trim(string(line, i + 1));;
+                        encoding = trim(string(line, i + 1));
+                    else if (name == "accept-ranges" && toLower(trim(std::string(line, i + 1))) == "bytes")
+                        acceptRanges = true;
                 }
             }
             return realSize;
@@ -244,6 +262,8 @@ struct CurlDownloader : public Downloader
             #if LIBCURL_VERSION_NUM >= 0x072f00
             if (downloadSettings.enableHttp2)
                 curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
+            else
+                curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
             #endif
             curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper);
             curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
@@ -284,6 +304,9 @@ struct CurlDownloader : public Downloader
             curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str());
             curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
 
+            if (writtenToSink)
+                curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink);
+
             result.data = std::make_shared<std::string>();
             result.bodySize = 0;
         }
@@ -318,7 +341,7 @@ struct CurlDownloader : public Downloader
                 failEx(writeException);
 
             else if (code == CURLE_OK &&
-                (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
+                (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 206 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
             {
                 result.cached = httpStatus == 304;
                 done = true;
@@ -375,7 +398,9 @@ struct CurlDownloader : public Downloader
                     }
                 }
 
-                fail(
+                attempt++;
+
+                auto exc =
                     code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
                     ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
                     : httpStatus != 0
@@ -386,15 +411,41 @@ struct CurlDownloader : public Downloader
                         )
                     : DownloadError(err,
                         fmt("unable to %s '%s': %s (%d)",
-                            request.verb(), request.uri, curl_easy_strerror(code), code)));
+                            request.verb(), request.uri, curl_easy_strerror(code), code));
+
+                /* If this is a transient error, then maybe retry the
+                   download after a while. If we're writing to a
+                   sink, we can only retry if the server supports
+                   ranged requests. */
+                if (err == Transient
+                    && attempt < request.tries
+                    && (!this->request.dataCallback
+                        || writtenToSink == 0
+                        || (acceptRanges && encoding.empty())))
+                {
+                    int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
+                    if (writtenToSink)
+                        warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
+                    else
+                        warn("%s; retrying in %d ms", exc.what(), ms);
+                    embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
+                    downloader.enqueueItem(shared_from_this());
+                }
+                else
+                    fail(exc);
             }
         }
     };
 
     struct State
     {
+        struct EmbargoComparator {
+            bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
+                return i1->embargo > i2->embargo;
+            }
+        };
         bool quit = false;
-        std::vector<std::shared_ptr<DownloadItem>> incoming;
+        std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
     };
 
     Sync<State> state_;
@@ -407,6 +458,7 @@ struct CurlDownloader : public Downloader
     std::thread workerThread;
 
     CurlDownloader()
+        : mt19937(rd())
     {
         static std::once_flag globalInit;
         std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@@ -500,7 +552,9 @@ struct CurlDownloader : public Downloader
 
             nextWakeup = std::chrono::steady_clock::time_point();
 
-            /* Add new curl requests from the incoming requests queue. */
+            /* Add new curl requests from the incoming requests queue,
+               except for requests that are embargoed (waiting for a
+               retry timeout to expire). */
             if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
                 char buf[1024];
                 auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -509,9 +563,22 @@ struct CurlDownloader : public Downloader
             }
 
             std::vector<std::shared_ptr<DownloadItem>> incoming;
+            auto now = std::chrono::steady_clock::now();
+
             {
                 auto state(state_.lock());
-                std::swap(state->incoming, incoming);
+                while (!state->incoming.empty()) {
+                    auto item = state->incoming.top();
+                    if (item->embargo <= now) {
+                        incoming.push_back(item);
+                        state->incoming.pop();
+                    } else {
+                        if (nextWakeup == std::chrono::steady_clock::time_point()
+                            || item->embargo < nextWakeup)
+                            nextWakeup = item->embargo;
+                        break;
+                    }
+                }
                 quit = state->quit;
             }
 
@@ -533,12 +600,12 @@ struct CurlDownloader : public Downloader
             workerThreadMain();
         } catch (nix::Interrupted & e) {
         } catch (std::exception & e) {
-            printError(format("unexpected error in download thread: %s") % e.what());
+            printError("unexpected error in download thread: %s", e.what());
         }
 
         {
             auto state(state_.lock());
-            state->incoming.clear();
+            while (!state->incoming.empty()) state->incoming.pop();
             state->quit = true;
         }
     }
@@ -554,7 +621,7 @@ struct CurlDownloader : public Downloader
             auto state(state_.lock());
             if (state->quit)
                 throw nix::Error("cannot enqueue download request because the download thread is shutting down");
-            state->incoming.push_back(item);
+            state->incoming.push(item);
         }
         writeFull(wakeupPipe.writeSide.get(), " ");
     }
@@ -637,9 +704,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
 
 DownloadResult Downloader::download(const DownloadRequest & request)
 {
-    return retry<DownloadResult>(request.tries, [&]() {
-        return enqueueDownload(request).get();
-    });
+    return enqueueDownload(request).get();
 }
 
 void Downloader::download(DownloadRequest && request, Sink & sink)
@@ -825,7 +890,7 @@ CachedDownloadResult Downloader::downloadCached(
             writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
         } catch (DownloadError & e) {
             if (storePath.empty()) throw;
-            warn("%s; using cached result", e.msg());
+            warn("warning: %s; using cached result", e.msg());
             result.etag = expectedETag;
         }
     }
@@ -853,10 +918,11 @@ CachedDownloadResult Downloader::downloadCached(
     }
 
     if (expectedStorePath != "" && storePath != expectedStorePath) {
+        unsigned int statusCode = 102;
         Hash gotHash = request.unpack
             ? hashPath(request.expectedHash.type, store->toRealPath(storePath)).first
             : hashFile(request.expectedHash.type, store->toRealPath(storePath));
-        throw nix::Error("hash mismatch in file downloaded from '%s':\n  wanted: %s\n  got:    %s",
+        throw nix::Error(statusCode, "hash mismatch in file downloaded from '%s':\n  wanted: %s\n  got:    %s",
             url, request.expectedHash.to_string(), gotHash.to_string());
     }
 
@@ -875,4 +941,5 @@ bool isUri(const string & s)
     return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
 }
 
+
 }