about summary refs log tree commit diff
path: root/src/libstore/download.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r--src/libstore/download.cc67
1 files changed, 55 insertions, 12 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 7a2af237ee8f..2267b5d35178 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -8,7 +8,6 @@
 #include "compression.hh"
 #include "pathlocks.hh"
 #include "finally.hh"
-#include "retry.hh"
 
 #ifdef ENABLE_S3
 #include <aws/core/client/ClientConfiguration.h>
@@ -20,9 +19,11 @@
 #include <curl/curl.h>
 
 #include <algorithm>
+#include <cmath>
 #include <cstring>
 #include <iostream>
 #include <queue>
+#include <random>
 #include <thread>
 
 using namespace std::string_literals;
@@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader
 {
     CURLM * curlm = 0;
 
+    std::random_device rd;
+    std::mt19937 mt19937;
+
     struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
     {
         CurlDownloader & downloader;
@@ -57,6 +61,12 @@ struct CurlDownloader : public Downloader
         bool active = false; // whether the handle has been added to the multi object
         std::string status;
 
+        unsigned int attempt = 0;
+
+        /* Don't start this download until the specified time point
+           has been reached. */
+        std::chrono::steady_clock::time_point embargo;
+
         struct curl_slist * requestHeaders = 0;
 
         std::string encoding;
@@ -377,7 +387,9 @@ struct CurlDownloader : public Downloader
                     }
                 }
 
-                fail(
+                attempt++;
+
+                auto exc =
                     code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
                     ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
                     : httpStatus != 0
@@ -388,15 +400,31 @@ struct CurlDownloader : public Downloader
                         )
                     : DownloadError(err,
                         fmt("unable to %s '%s': %s (%d)",
-                            request.verb(), request.uri, curl_easy_strerror(code), code)));
+                            request.verb(), request.uri, curl_easy_strerror(code), code));
+
+                /* If this is a transient error, then maybe retry the
+                   download after a while. */
+                if (err == Transient && attempt < request.tries) {
+                    int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
+                    printError(format("warning: %s; retrying in %d ms") % exc.what() % ms);
+                    embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
+                    downloader.enqueueItem(shared_from_this());
+                }
+                else
+                    fail(exc);
             }
         }
     };
 
     struct State
     {
+        struct EmbargoComparator {
+            bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
+                return i1->embargo > i2->embargo;
+            }
+        };
         bool quit = false;
-        std::vector<std::shared_ptr<DownloadItem>> incoming;
+        std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
     };
 
     Sync<State> state_;
@@ -409,6 +437,7 @@ struct CurlDownloader : public Downloader
     std::thread workerThread;
 
     CurlDownloader()
+        : mt19937(rd())
     {
         static std::once_flag globalInit;
         std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@@ -502,7 +531,9 @@ struct CurlDownloader : public Downloader
 
             nextWakeup = std::chrono::steady_clock::time_point();
 
-            /* Add new curl requests from the incoming requests queue. */
+            /* Add new curl requests from the incoming requests queue,
+               except for requests that are embargoed (waiting for a
+               retry timeout to expire). */
             if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
                 char buf[1024];
                 auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -511,9 +542,22 @@ struct CurlDownloader : public Downloader
             }
 
             std::vector<std::shared_ptr<DownloadItem>> incoming;
+            auto now = std::chrono::steady_clock::now();
+
             {
                 auto state(state_.lock());
-                std::swap(state->incoming, incoming);
+                while (!state->incoming.empty()) {
+                    auto item = state->incoming.top();
+                    if (item->embargo <= now) {
+                        incoming.push_back(item);
+                        state->incoming.pop();
+                    } else {
+                        if (nextWakeup == std::chrono::steady_clock::time_point()
+                            || item->embargo < nextWakeup)
+                            nextWakeup = item->embargo;
+                        break;
+                    }
+                }
                 quit = state->quit;
             }
 
@@ -540,7 +584,7 @@ struct CurlDownloader : public Downloader
 
         {
             auto state(state_.lock());
-            state->incoming.clear();
+            while (!state->incoming.empty()) state->incoming.pop();
             state->quit = true;
         }
     }
@@ -556,7 +600,7 @@ struct CurlDownloader : public Downloader
             auto state(state_.lock());
             if (state->quit)
                 throw nix::Error("cannot enqueue download request because the download thread is shutting down");
-            state->incoming.push_back(item);
+            state->incoming.push(item);
         }
         writeFull(wakeupPipe.writeSide.get(), " ");
     }
@@ -639,9 +683,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
 
 DownloadResult Downloader::download(const DownloadRequest & request)
 {
-    return retry<DownloadResult>(request.tries, [&]() {
-        return enqueueDownload(request).get();
-    });
+    return enqueueDownload(request).get();
 }
 
 void Downloader::download(DownloadRequest && request, Sink & sink)
@@ -827,7 +869,7 @@ CachedDownloadResult Downloader::downloadCached(
             writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
         } catch (DownloadError & e) {
             if (storePath.empty()) throw;
-            warn("%s; using cached result", e.msg());
+            printError(format("warning: %1%; using cached result") % e.msg());
             result.etag = expectedETag;
         }
     }
@@ -878,4 +920,5 @@ bool isUri(const string & s)
     return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
 }
 
+
 }