diff options
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r-- | src/libstore/download.cc | 67 |
1 files changed, 55 insertions, 12 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 7a2af237ee8f..2267b5d35178 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -8,7 +8,6 @@ #include "compression.hh" #include "pathlocks.hh" #include "finally.hh" -#include "retry.hh" #ifdef ENABLE_S3 #include <aws/core/client/ClientConfiguration.h> @@ -20,9 +19,11 @@ #include <curl/curl.h> #include <algorithm> +#include <cmath> #include <cstring> #include <iostream> #include <queue> +#include <random> #include <thread> using namespace std::string_literals; @@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader { CURLM * curlm = 0; + std::random_device rd; + std::mt19937 mt19937; + struct DownloadItem : public std::enable_shared_from_this<DownloadItem> { CurlDownloader & downloader; @@ -57,6 +61,12 @@ struct CurlDownloader : public Downloader bool active = false; // whether the handle has been added to the multi object std::string status; + unsigned int attempt = 0; + + /* Don't start this download until the specified time point + has been reached. */ + std::chrono::steady_clock::time_point embargo; + struct curl_slist * requestHeaders = 0; std::string encoding; @@ -377,7 +387,9 @@ struct CurlDownloader : public Downloader } } - fail( + attempt++; + + auto exc = code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri)) : httpStatus != 0 @@ -388,15 +400,31 @@ struct CurlDownloader : public Downloader ) : DownloadError(err, fmt("unable to %s '%s': %s (%d)", - request.verb(), request.uri, curl_easy_strerror(code), code))); + request.verb(), request.uri, curl_easy_strerror(code), code)); + + /* If this is a transient error, then maybe retry the + download after a while. */ + if (err == Transient && attempt < request.tries) { + int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); + printError(format("warning: %s; retrying in %d ms") % exc.what() % ms); + embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); + downloader.enqueueItem(shared_from_this()); + } + else + fail(exc); } } }; struct State { + struct EmbargoComparator { + bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) { + return i1->embargo > i2->embargo; + } + }; bool quit = false; - std::vector<std::shared_ptr<DownloadItem>> incoming; + std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming; }; Sync<State> state_; @@ -409,6 +437,7 @@ struct CurlDownloader : public Downloader std::thread workerThread; CurlDownloader() + : mt19937(rd()) { static std::once_flag globalInit; std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); @@ -502,7 +531,9 @@ struct CurlDownloader : public Downloader nextWakeup = std::chrono::steady_clock::time_point(); - /* Add new curl requests from the incoming requests queue. */ + /* Add new curl requests from the incoming requests queue, + except for requests that are embargoed (waiting for a + retry timeout to expire). */ if (extraFDs[0].revents & CURL_WAIT_POLLIN) { char buf[1024]; auto res = read(extraFDs[0].fd, buf, sizeof(buf)); @@ -511,9 +542,22 @@ struct CurlDownloader : public Downloader } std::vector<std::shared_ptr<DownloadItem>> incoming; + auto now = std::chrono::steady_clock::now(); + { auto state(state_.lock()); - std::swap(state->incoming, incoming); + while (!state->incoming.empty()) { + auto item = state->incoming.top(); + if (item->embargo <= now) { + incoming.push_back(item); + state->incoming.pop(); + } else { + if (nextWakeup == std::chrono::steady_clock::time_point() + || item->embargo < nextWakeup) + nextWakeup = item->embargo; + break; + } + } quit = state->quit; } @@ -540,7 +584,7 @@ struct CurlDownloader : public Downloader { auto state(state_.lock()); - state->incoming.clear(); + while (!state->incoming.empty()) state->incoming.pop(); state->quit = true; } } @@ -556,7 +600,7 @@ struct CurlDownloader : public Downloader auto state(state_.lock()); if (state->quit) throw nix::Error("cannot enqueue download request because the download thread is shutting down"); - state->incoming.push_back(item); + state->incoming.push(item); } writeFull(wakeupPipe.writeSide.get(), " "); } @@ -639,9 +683,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & DownloadResult Downloader::download(const DownloadRequest & request) { - return retry<DownloadResult>(request.tries, [&]() { - return enqueueDownload(request).get(); - }); + return enqueueDownload(request).get(); } void Downloader::download(DownloadRequest && request, Sink & sink) @@ -827,7 +869,7 @@ CachedDownloadResult Downloader::downloadCached( writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n"); } catch (DownloadError & e) { if (storePath.empty()) throw; - warn("%s; using cached result", e.msg()); + printError(format("warning: %1%; using cached result") % e.msg()); result.etag = expectedETag; } } @@ -878,4 +920,5 @@ bool isUri(const string & s) return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh"; } + } |