about summary refs log tree commit diff
path: root/src/libstore/download.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r--src/libstore/download.cc67
1 files changed, 12 insertions, 55 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 8e94646d49ac..43d231e30365 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -8,6 +8,7 @@
 #include "compression.hh"
 #include "pathlocks.hh"
 #include "finally.hh"
+#include "retry.hh"
 
 #ifdef ENABLE_S3
 #include <aws/core/client/ClientConfiguration.h>
@@ -19,11 +20,9 @@
 #include <curl/curl.h>
 
 #include <algorithm>
-#include <cmath>
 #include <cstring>
 #include <iostream>
 #include <queue>
-#include <random>
 #include <thread>
 
 using namespace std::string_literals;
@@ -62,9 +61,6 @@ struct CurlDownloader : public Downloader
 {
     CURLM * curlm = 0;
 
-    std::random_device rd;
-    std::mt19937 mt19937;
-
     struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
     {
         CurlDownloader & downloader;
@@ -77,12 +73,6 @@ struct CurlDownloader : public Downloader
         bool active = false; // whether the handle has been added to the multi object
         std::string status;
 
-        unsigned int attempt = 0;
-
-        /* Don't start this download until the specified time point
-           has been reached. */
-        std::chrono::steady_clock::time_point embargo;
-
         struct curl_slist * requestHeaders = 0;
 
         std::string encoding;
@@ -401,9 +391,7 @@ struct CurlDownloader : public Downloader
                     }
                 }
 
-                attempt++;
-
-                auto exc =
+                fail(
                     code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
                     ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
                     : httpStatus != 0
@@ -414,31 +402,15 @@ struct CurlDownloader : public Downloader
                         )
                     : DownloadError(err,
                         fmt("unable to %s '%s': %s (%d)",
-                            request.verb(), request.uri, curl_easy_strerror(code), code));
-
-                /* If this is a transient error, then maybe retry the
-                   download after a while. */
-                if (err == Transient && attempt < request.tries) {
-                    int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
-                    printError(format("warning: %s; retrying in %d ms") % exc.what() % ms);
-                    embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
-                    downloader.enqueueItem(shared_from_this());
-                }
-                else
-                    fail(exc);
+                            request.verb(), request.uri, curl_easy_strerror(code), code)));
             }
         }
     };
 
     struct State
     {
-        struct EmbargoComparator {
-            bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
-                return i1->embargo > i2->embargo;
-            }
-        };
         bool quit = false;
-        std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
+        std::vector<std::shared_ptr<DownloadItem>> incoming;
     };
 
     Sync<State> state_;
@@ -451,7 +423,6 @@ struct CurlDownloader : public Downloader
     std::thread workerThread;
 
     CurlDownloader()
-        : mt19937(rd())
     {
         static std::once_flag globalInit;
         std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@@ -545,9 +516,7 @@ struct CurlDownloader : public Downloader
 
             nextWakeup = std::chrono::steady_clock::time_point();
 
-            /* Add new curl requests from the incoming requests queue,
-               except for requests that are embargoed (waiting for a
-               retry timeout to expire). */
+            /* Add new curl requests from the incoming requests queue. */
             if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
                 char buf[1024];
                 auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -556,22 +525,9 @@ struct CurlDownloader : public Downloader
             }
 
             std::vector<std::shared_ptr<DownloadItem>> incoming;
-            auto now = std::chrono::steady_clock::now();
-
             {
                 auto state(state_.lock());
-                while (!state->incoming.empty()) {
-                    auto item = state->incoming.top();
-                    if (item->embargo <= now) {
-                        incoming.push_back(item);
-                        state->incoming.pop();
-                    } else {
-                        if (nextWakeup == std::chrono::steady_clock::time_point()
-                            || item->embargo < nextWakeup)
-                            nextWakeup = item->embargo;
-                        break;
-                    }
-                }
+                std::swap(state->incoming, incoming);
                 quit = state->quit;
             }
 
@@ -598,7 +554,7 @@ struct CurlDownloader : public Downloader
 
         {
             auto state(state_.lock());
-            while (!state->incoming.empty()) state->incoming.pop();
+            state->incoming.clear();
             state->quit = true;
         }
     }
@@ -614,7 +570,7 @@ struct CurlDownloader : public Downloader
             auto state(state_.lock());
             if (state->quit)
                 throw nix::Error("cannot enqueue download request because the download thread is shutting down");
-            state->incoming.push(item);
+            state->incoming.push_back(item);
         }
         writeFull(wakeupPipe.writeSide.get(), " ");
     }
@@ -697,7 +653,9 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
 
 DownloadResult Downloader::download(const DownloadRequest & request)
 {
-    return enqueueDownload(request).get();
+    return retry<DownloadResult>(request.tries, [&]() {
+        return enqueueDownload(request).get();
+    });
 }
 
 void Downloader::download(DownloadRequest && request, Sink & sink)
@@ -883,7 +841,7 @@ CachedDownloadResult Downloader::downloadCached(ref<Store> store, const string &
             writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
         } catch (DownloadError & e) {
             if (storePath.empty()) throw;
-            printError(format("warning: %1%; using cached result") % e.msg());
+            warn("%s; using cached result", e.msg());
             result.etag = expectedETag;
         }
     }
@@ -933,5 +891,4 @@ bool isUri(const string & s)
     return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
 }
 
-
 }