about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/binary-cache-store.cc18
-rw-r--r--src/libstore/download.cc67
-rw-r--r--src/libstore/download.hh11
-rw-r--r--src/libstore/http-binary-cache-store.cc57
-rw-r--r--src/libstore/store-api.cc94
-rw-r--r--src/libutil/retry.hh38
-rw-r--r--src/libutil/types.hh2
7 files changed, 125 insertions, 162 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index 8b736056e01d..4527ee6ba660 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -10,8 +10,6 @@
 #include "nar-info-disk-cache.hh"
 #include "nar-accessor.hh"
 #include "json.hh"
-#include "retry.hh"
-#include "download.hh"
 
 #include <chrono>
 
@@ -81,15 +79,13 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink)
 
 std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
 {
-    return retry<std::shared_ptr<std::string>>(downloadSettings.tries, [&]() -> std::shared_ptr<std::string> {
-        StringSink sink;
-        try {
-            getFile(path, sink);
-        } catch (NoSuchBinaryCacheFile &) {
-            return nullptr;
-        }
-        return sink.s;
-    });
+    StringSink sink;
+    try {
+        getFile(path, sink);
+    } catch (NoSuchBinaryCacheFile &) {
+        return nullptr;
+    }
+    return sink.s;
 }
 
 Path BinaryCacheStore::narInfoFileFor(const Path & storePath)
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 7a2af237ee8f..2267b5d35178 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -8,7 +8,6 @@
 #include "compression.hh"
 #include "pathlocks.hh"
 #include "finally.hh"
-#include "retry.hh"
 
 #ifdef ENABLE_S3
 #include <aws/core/client/ClientConfiguration.h>
@@ -20,9 +19,11 @@
 #include <curl/curl.h>
 
 #include <algorithm>
+#include <cmath>
 #include <cstring>
 #include <iostream>
 #include <queue>
+#include <random>
 #include <thread>
 
 using namespace std::string_literals;
@@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader
 {
     CURLM * curlm = 0;
 
+    std::random_device rd;
+    std::mt19937 mt19937;
+
     struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
     {
         CurlDownloader & downloader;
@@ -57,6 +61,12 @@ struct CurlDownloader : public Downloader
         bool active = false; // whether the handle has been added to the multi object
         std::string status;
 
+        unsigned int attempt = 0;
+
+        /* Don't start this download until the specified time point
+           has been reached. */
+        std::chrono::steady_clock::time_point embargo;
+
         struct curl_slist * requestHeaders = 0;
 
         std::string encoding;
@@ -377,7 +387,9 @@ struct CurlDownloader : public Downloader
                     }
                 }
 
-                fail(
+                attempt++;
+
+                auto exc =
                     code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
                     ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
                     : httpStatus != 0
@@ -388,15 +400,31 @@ struct CurlDownloader : public Downloader
                         )
                     : DownloadError(err,
                         fmt("unable to %s '%s': %s (%d)",
-                            request.verb(), request.uri, curl_easy_strerror(code), code)));
+                            request.verb(), request.uri, curl_easy_strerror(code), code));
+
+                /* If this is a transient error, then maybe retry the
+                   download after a while. */
+                if (err == Transient && attempt < request.tries) {
+                    int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
+                    printError(format("warning: %s; retrying in %d ms") % exc.what() % ms);
+                    embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
+                    downloader.enqueueItem(shared_from_this());
+                }
+                else
+                    fail(exc);
             }
         }
     };
 
     struct State
     {
+        struct EmbargoComparator {
+            bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
+                return i1->embargo > i2->embargo;
+            }
+        };
         bool quit = false;
-        std::vector<std::shared_ptr<DownloadItem>> incoming;
+        std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
     };
 
     Sync<State> state_;
@@ -409,6 +437,7 @@ struct CurlDownloader : public Downloader
     std::thread workerThread;
 
     CurlDownloader()
+        : mt19937(rd())
     {
         static std::once_flag globalInit;
         std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@@ -502,7 +531,9 @@ struct CurlDownloader : public Downloader
 
             nextWakeup = std::chrono::steady_clock::time_point();
 
-            /* Add new curl requests from the incoming requests queue. */
+            /* Add new curl requests from the incoming requests queue,
+               except for requests that are embargoed (waiting for a
+               retry timeout to expire). */
             if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
                 char buf[1024];
                 auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -511,9 +542,22 @@ struct CurlDownloader : public Downloader
             }
 
             std::vector<std::shared_ptr<DownloadItem>> incoming;
+            auto now = std::chrono::steady_clock::now();
+
             {
                 auto state(state_.lock());
-                std::swap(state->incoming, incoming);
+                while (!state->incoming.empty()) {
+                    auto item = state->incoming.top();
+                    if (item->embargo <= now) {
+                        incoming.push_back(item);
+                        state->incoming.pop();
+                    } else {
+                        if (nextWakeup == std::chrono::steady_clock::time_point()
+                            || item->embargo < nextWakeup)
+                            nextWakeup = item->embargo;
+                        break;
+                    }
+                }
                 quit = state->quit;
             }
 
@@ -540,7 +584,7 @@ struct CurlDownloader : public Downloader
 
         {
             auto state(state_.lock());
-            state->incoming.clear();
+            while (!state->incoming.empty()) state->incoming.pop();
             state->quit = true;
         }
     }
@@ -556,7 +600,7 @@ struct CurlDownloader : public Downloader
             auto state(state_.lock());
             if (state->quit)
                 throw nix::Error("cannot enqueue download request because the download thread is shutting down");
-            state->incoming.push_back(item);
+            state->incoming.push(item);
         }
         writeFull(wakeupPipe.writeSide.get(), " ");
     }
@@ -639,9 +683,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
 
 DownloadResult Downloader::download(const DownloadRequest & request)
 {
-    return retry<DownloadResult>(request.tries, [&]() {
-        return enqueueDownload(request).get();
-    });
+    return enqueueDownload(request).get();
 }
 
 void Downloader::download(DownloadRequest && request, Sink & sink)
@@ -827,7 +869,7 @@ CachedDownloadResult Downloader::downloadCached(
             writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
         } catch (DownloadError & e) {
             if (storePath.empty()) throw;
-            warn("%s; using cached result", e.msg());
+            printError(format("warning: %1%; using cached result") % e.msg());
             result.etag = expectedETag;
         }
     }
@@ -878,4 +920,5 @@ bool isUri(const string & s)
     return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
 }
 
+
 }
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index 9e965b506d0a..3b7fff3ba4cb 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -96,13 +96,11 @@ struct Downloader
 
     std::future<DownloadResult> enqueueDownload(const DownloadRequest & request);
 
-    /* Synchronously download a file. The request will be retried in
-       case of transient failures. */
+    /* Synchronously download a file. */
     DownloadResult download(const DownloadRequest & request);
 
     /* Download a file, writing its data to a sink. The sink will be
-       invoked on the thread of the caller. The request will not be
-       retried in case of transient failures. */
+       invoked on the thread of the caller. */
     void download(DownloadRequest && request, Sink & sink);
 
     /* Check if the specified file is already in ~/.cache/nix/tarballs
@@ -128,11 +126,6 @@ public:
     DownloadError(Downloader::Error error, const FormatOrString & fs)
         : Error(fs), error(error)
     { }
-
-    bool isTransient() override
-    {
-        return error == Downloader::Error::Transient;
-    }
 };
 
 bool isUri(const string & s);
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index 5633b4355d25..3d17a89660e1 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -2,7 +2,6 @@
 #include "download.hh"
 #include "globals.hh"
 #include "nar-info-disk-cache.hh"
-#include "retry.hh"
 
 namespace nix {
 
@@ -114,6 +113,7 @@ protected:
     DownloadRequest makeRequest(const std::string & path)
     {
         DownloadRequest request(cacheUri + "/" + path);
+        request.tries = 8;
         return request;
     }
 
@@ -136,46 +136,21 @@ protected:
     {
         checkEnabled();
 
-        struct State
-        {
-            DownloadRequest request;
-            std::function<void()> tryDownload;
-            unsigned int attempt = 0;
-            State(DownloadRequest && request) : request(request) {}
-        };
-
-        auto state = std::make_shared<State>(makeRequest(path));
-
-        state->tryDownload = [callback, state, this]() {
-            getDownloader()->enqueueDownload(state->request,
-                {[callback, state, this](std::future<DownloadResult> result) {
-                    try {
-                        callback(result.get().data);
-                    } catch (DownloadError & e) {
-                        if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
-                            return callback(std::shared_ptr<std::string>());
-                        ++state->attempt;
-                        if (state->attempt < state->request.tries && e.isTransient()) {
-                            auto ms = retrySleepTime(state->attempt);
-                            warn("%s; retrying in %d ms", e.what(), ms);
-                            /* We can't sleep here because that would
-                               block the download thread. So use a
-                               separate thread for sleeping. */
-                            std::thread([state, ms]() {
-                                std::this_thread::sleep_for(std::chrono::milliseconds(ms));
-                                state->tryDownload();
-                            }).detach();
-                        } else {
-                            maybeDisable();
-                            callback.rethrow();
-                        }
-                    } catch (...) {
-                        callback.rethrow();
-                    }
-                }});
-        };
-
-        state->tryDownload();
+        auto request(makeRequest(path));
+
+        getDownloader()->enqueueDownload(request,
+            {[callback, this](std::future<DownloadResult> result) {
+                try {
+                    callback(result.get().data);
+                } catch (DownloadError & e) {
+                    if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
+                        return callback(std::shared_ptr<std::string>());
+                    maybeDisable();
+                    callback.rethrow();
+                } catch (...) {
+                    callback.rethrow();
+                }
+            }});
     }
 
 };
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index 92f01fd2e0c7..f5608d3849f1 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -6,11 +6,10 @@
 #include "thread-pool.hh"
 #include "json.hh"
 #include "derivations.hh"
-#include "retry.hh"
-#include "download.hh"
 
 #include <future>
 
+
 namespace nix {
 
 
@@ -580,57 +579,54 @@ void Store::buildPaths(const PathSet & paths, BuildMode buildMode)
 void copyStorePath(ref<Store> srcStore, ref<Store> dstStore,
     const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs)
 {
-    retry<void>(downloadSettings.tries, [&]() {
-
-        auto srcUri = srcStore->getUri();
-        auto dstUri = dstStore->getUri();
-
-        Activity act(*logger, lvlInfo, actCopyPath,
-            srcUri == "local" || srcUri == "daemon"
-              ? fmt("copying path '%s' to '%s'", storePath, dstUri)
-              : dstUri == "local" || dstUri == "daemon"
-                ? fmt("copying path '%s' from '%s'", storePath, srcUri)
-                : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri),
-            {storePath, srcUri, dstUri});
-        PushActivity pact(act.id);
-
-        auto info = srcStore->queryPathInfo(storePath);
-
-        uint64_t total = 0;
-
-        if (!info->narHash) {
-            StringSink sink;
-            srcStore->narFromPath({storePath}, sink);
-            auto info2 = make_ref<ValidPathInfo>(*info);
-            info2->narHash = hashString(htSHA256, *sink.s);
-            if (!info->narSize) info2->narSize = sink.s->size();
-            if (info->ultimate) info2->ultimate = false;
-            info = info2;
-
-            StringSource source(*sink.s);
-            dstStore->addToStore(*info, source, repair, checkSigs);
-            return;
-        }
+    auto srcUri = srcStore->getUri();
+    auto dstUri = dstStore->getUri();
+
+    Activity act(*logger, lvlInfo, actCopyPath,
+        srcUri == "local" || srcUri == "daemon"
+          ? fmt("copying path '%s' to '%s'", storePath, dstUri)
+          : dstUri == "local" || dstUri == "daemon"
+            ? fmt("copying path '%s' from '%s'", storePath, srcUri)
+            : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri),
+        {storePath, srcUri, dstUri});
+    PushActivity pact(act.id);
+
+    auto info = srcStore->queryPathInfo(storePath);
+
+    uint64_t total = 0;
+
+    if (!info->narHash) {
+        StringSink sink;
+        srcStore->narFromPath({storePath}, sink);
+        auto info2 = make_ref<ValidPathInfo>(*info);
+        info2->narHash = hashString(htSHA256, *sink.s);
+        if (!info->narSize) info2->narSize = sink.s->size();
+        if (info->ultimate) info2->ultimate = false;
+        info = info2;
+
+        StringSource source(*sink.s);
+        dstStore->addToStore(*info, source, repair, checkSigs);
+        return;
+    }
 
-        if (info->ultimate) {
-            auto info2 = make_ref<ValidPathInfo>(*info);
-            info2->ultimate = false;
-            info = info2;
-        }
+    if (info->ultimate) {
+        auto info2 = make_ref<ValidPathInfo>(*info);
+        info2->ultimate = false;
+        info = info2;
+    }
 
-        auto source = sinkToSource([&](Sink & sink) {
-            LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
-                sink(data, len);
-                total += len;
-                act.progress(total, info->narSize);
-            });
-            srcStore->narFromPath({storePath}, wrapperSink);
-        }, [&]() {
-            throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
+    auto source = sinkToSource([&](Sink & sink) {
+        LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
+            sink(data, len);
+            total += len;
+            act.progress(total, info->narSize);
         });
-
-        dstStore->addToStore(*info, *source, repair, checkSigs);
+        srcStore->narFromPath({storePath}, wrapperSink);
+    }, [&]() {
+        throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
     });
+
+    dstStore->addToStore(*info, *source, repair, checkSigs);
 }
 
 
diff --git a/src/libutil/retry.hh b/src/libutil/retry.hh
deleted file mode 100644
index b45cb37f736b..000000000000
--- a/src/libutil/retry.hh
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include "logging.hh"
-
-#include <functional>
-#include <cmath>
-#include <random>
-#include <thread>
-
-namespace nix {
-
-inline unsigned int retrySleepTime(unsigned int attempt)
-{
-    std::random_device rd;
-    std::mt19937 mt19937;
-    return 250.0 * std::pow(2.0f,
-        attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(mt19937));
-}
-
-template<typename C>
-C retry(unsigned int attempts, std::function<C()> && f)
-{
-    unsigned int attempt = 0;
-    while (true) {
-        try {
-            return f();
-        } catch (BaseError & e) {
-            ++attempt;
-            if (attempt >= attempts || !e.isTransient())
-                throw;
-            auto ms = retrySleepTime(attempt);
-            warn("%s; retrying in %d ms", e.what(), ms);
-            std::this_thread::sleep_for(std::chrono::milliseconds(ms));
-        }
-    }
-}
-
-}
diff --git a/src/libutil/types.hh b/src/libutil/types.hh
index 88e3243f47a5..92bf469b5c6f 100644
--- a/src/libutil/types.hh
+++ b/src/libutil/types.hh
@@ -109,8 +109,6 @@ public:
     const string & msg() const { return err; }
     const string & prefix() const { return prefix_; }
     BaseError & addPrefix(const FormatOrString & fs);
-
-    virtual bool isTransient() { return false; }
 };
 
 #define MakeError(newClass, superClass) \