about summary refs log tree commit diff
path: root/src/libstore/download.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r--src/libstore/download.cc188
1 files changed, 83 insertions, 105 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 467f570bbf05..0c5a73ea3c51 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -8,6 +8,7 @@
 #include "compression.hh"
 #include "pathlocks.hh"
 #include "finally.hh"
+#include "retry.hh"
 
 #ifdef ENABLE_S3
 #include <aws/core/client/ClientConfiguration.h>
@@ -19,34 +20,16 @@
 #include <curl/curl.h>
 
 #include <algorithm>
-#include <cmath>
 #include <cstring>
 #include <iostream>
 #include <queue>
-#include <random>
 #include <thread>
 
 using namespace std::string_literals;
 
 namespace nix {
 
-struct DownloadSettings : Config
-{
-    Setting<bool> enableHttp2{this, true, "http2",
-        "Whether to enable HTTP/2 support."};
-
-    Setting<std::string> userAgentSuffix{this, "", "user-agent-suffix",
-        "String appended to the user agent in HTTP requests."};
-
-    Setting<size_t> httpConnections{this, 25, "http-connections",
-        "Number of parallel HTTP connections.",
-        {"binary-caches-parallel-connections"}};
-
-    Setting<unsigned long> connectTimeout{this, 0, "connect-timeout",
-        "Timeout for connecting to servers during downloads. 0 means use curl's builtin default."};
-};
-
-static DownloadSettings downloadSettings;
+DownloadSettings downloadSettings;
 
 static GlobalConfig::Register r1(&downloadSettings);
 
@@ -62,9 +45,6 @@ struct CurlDownloader : public Downloader
 {
     CURLM * curlm = 0;
 
-    std::random_device rd;
-    std::mt19937 mt19937;
-
     struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
     {
         CurlDownloader & downloader;
@@ -77,12 +57,6 @@ struct CurlDownloader : public Downloader
         bool active = false; // whether the handle has been added to the multi object
         std::string status;
 
-        unsigned int attempt = 0;
-
-        /* Don't start this download until the specified time point
-           has been reached. */
-        std::chrono::steady_clock::time_point embargo;
-
         struct curl_slist * requestHeaders = 0;
 
         std::string encoding;
@@ -270,6 +244,8 @@ struct CurlDownloader : public Downloader
             #if LIBCURL_VERSION_NUM >= 0x072f00
             if (downloadSettings.enableHttp2)
                 curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
+            else
+                curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
             #endif
             curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper);
             curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
@@ -319,16 +295,21 @@ struct CurlDownloader : public Downloader
             long httpStatus = 0;
             curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);
 
-            char * effectiveUrlCStr;
-            curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr);
-            if (effectiveUrlCStr)
-                result.effectiveUrl = effectiveUrlCStr;
+            char * effectiveUriCStr;
+            curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
+            if (effectiveUriCStr)
+                result.effectiveUri = effectiveUriCStr;
 
             debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
                 request.verb(), request.uri, code, httpStatus, result.bodySize);
 
-            if (decompressionSink)
-                decompressionSink->finish();
+            if (decompressionSink) {
+                try {
+                    decompressionSink->finish();
+                } catch (...) {
+                    writeException = std::current_exception();
+                }
+            }
 
             if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
                 code = CURLE_OK;
@@ -396,9 +377,7 @@ struct CurlDownloader : public Downloader
                     }
                 }
 
-                attempt++;
-
-                auto exc =
+                fail(
                     code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
                     ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
                     : httpStatus != 0
@@ -409,31 +388,15 @@ struct CurlDownloader : public Downloader
                         )
                     : DownloadError(err,
                         fmt("unable to %s '%s': %s (%d)",
-                            request.verb(), request.uri, curl_easy_strerror(code), code));
-
-                /* If this is a transient error, then maybe retry the
-                   download after a while. */
-                if (err == Transient && attempt < request.tries) {
-                    int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
-                    printError(format("warning: %s; retrying in %d ms") % exc.what() % ms);
-                    embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
-                    downloader.enqueueItem(shared_from_this());
-                }
-                else
-                    fail(exc);
+                            request.verb(), request.uri, curl_easy_strerror(code), code)));
             }
         }
     };
 
     struct State
     {
-        struct EmbargoComparator {
-            bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
-                return i1->embargo > i2->embargo;
-            }
-        };
         bool quit = false;
-        std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
+        std::vector<std::shared_ptr<DownloadItem>> incoming;
     };
 
     Sync<State> state_;
@@ -446,7 +409,6 @@ struct CurlDownloader : public Downloader
     std::thread workerThread;
 
     CurlDownloader()
-        : mt19937(rd())
     {
         static std::once_flag globalInit;
         std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@@ -540,9 +502,7 @@ struct CurlDownloader : public Downloader
 
             nextWakeup = std::chrono::steady_clock::time_point();
 
-            /* Add new curl requests from the incoming requests queue,
-               except for requests that are embargoed (waiting for a
-               retry timeout to expire). */
+            /* Add new curl requests from the incoming requests queue. */
             if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
                 char buf[1024];
                 auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -551,22 +511,9 @@ struct CurlDownloader : public Downloader
             }
 
             std::vector<std::shared_ptr<DownloadItem>> incoming;
-            auto now = std::chrono::steady_clock::now();
-
             {
                 auto state(state_.lock());
-                while (!state->incoming.empty()) {
-                    auto item = state->incoming.top();
-                    if (item->embargo <= now) {
-                        incoming.push_back(item);
-                        state->incoming.pop();
-                    } else {
-                        if (nextWakeup == std::chrono::steady_clock::time_point()
-                            || item->embargo < nextWakeup)
-                            nextWakeup = item->embargo;
-                        break;
-                    }
-                }
+                std::swap(state->incoming, incoming);
                 quit = state->quit;
             }
 
@@ -593,7 +540,7 @@ struct CurlDownloader : public Downloader
 
         {
             auto state(state_.lock());
-            while (!state->incoming.empty()) state->incoming.pop();
+            state->incoming.clear();
             state->quit = true;
         }
     }
@@ -609,11 +556,27 @@ struct CurlDownloader : public Downloader
             auto state(state_.lock());
             if (state->quit)
                 throw nix::Error("cannot enqueue download request because the download thread is shutting down");
-            state->incoming.push(item);
+            state->incoming.push_back(item);
         }
         writeFull(wakeupPipe.writeSide.get(), " ");
     }
 
+#ifdef ENABLE_S3
+    std::tuple<std::string, std::string, Store::Params> parseS3Uri(std::string uri)
+    {
+        auto [path, params] = splitUriAndParams(uri);
+
+        auto slash = path.find('/', 5); // 5 is the length of "s3://" prefix
+            if (slash == std::string::npos)
+                throw nix::Error("bad S3 URI '%s'", path);
+
+        std::string bucketName(path, 5, slash - 5);
+        std::string key(path, slash + 1);
+
+        return {bucketName, key, params};
+    }
+#endif
+
     void enqueueDownload(const DownloadRequest & request,
         Callback<DownloadResult> callback) override
     {
@@ -622,12 +585,15 @@ struct CurlDownloader : public Downloader
             // FIXME: do this on a worker thread
             try {
 #ifdef ENABLE_S3
-                S3Helper s3Helper("", Aws::Region::US_EAST_1, "", ""); // FIXME: make configurable
-                auto slash = request.uri.find('/', 5);
-                if (slash == std::string::npos)
-                    throw nix::Error("bad S3 URI '%s'", request.uri);
-                std::string bucketName(request.uri, 5, slash - 5);
-                std::string key(request.uri, slash + 1);
+                auto [bucketName, key, params] = parseS3Uri(request.uri);
+
+                std::string profile = get(params, "profile", "");
+                std::string region = get(params, "region", Aws::Region::US_EAST_1);
+                std::string scheme = get(params, "scheme", "");
+                std::string endpoint = get(params, "endpoint", "");
+
+                S3Helper s3Helper(profile, region, scheme, endpoint);
+
                 // FIXME: implement ETag
                 auto s3Res = s3Helper.getObject(bucketName, key);
                 DownloadResult res;
@@ -673,7 +639,9 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
 
 DownloadResult Downloader::download(const DownloadRequest & request)
 {
-    return enqueueDownload(request).get();
+    return retry<DownloadResult>(request.tries, [&]() {
+        return enqueueDownload(request).get();
+    });
 }
 
 void Downloader::download(DownloadRequest && request, Sink & sink)
@@ -771,20 +739,26 @@ void Downloader::download(DownloadRequest && request, Sink & sink)
     }
 }
 
-Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl, int ttl)
+CachedDownloadResult Downloader::downloadCached(
+    ref<Store> store, const CachedDownloadRequest & request)
 {
-    auto url = resolveUri(url_);
+    auto url = resolveUri(request.uri);
 
+    auto name = request.name;
     if (name == "") {
         auto p = url.rfind('/');
         if (p != string::npos) name = string(url, p + 1);
     }
 
     Path expectedStorePath;
-    if (expectedHash) {
-        expectedStorePath = store->makeFixedOutputPath(unpack, expectedHash, name);
-        if (store->isValidPath(expectedStorePath))
-            return store->toRealPath(expectedStorePath);
+    if (request.expectedHash) {
+        expectedStorePath = store->makeFixedOutputPath(request.unpack, request.expectedHash, name);
+        if (store->isValidPath(expectedStorePath)) {
+            CachedDownloadResult result;
+            result.storePath = expectedStorePath;
+            result.path = store->toRealPath(expectedStorePath);
+            return result;
+        }
     }
 
     Path cacheDir = getCacheDir() + "/nix/tarballs";
@@ -803,6 +777,8 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa
 
     bool skip = false;
 
+    CachedDownloadResult result;
+
     if (pathExists(fileLink) && pathExists(dataFile)) {
         storePath = readLink(fileLink);
         store->addTempRoot(storePath);
@@ -810,10 +786,10 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa
             auto ss = tokenizeString<vector<string>>(readFile(dataFile), "\n");
             if (ss.size() >= 3 && ss[0] == url) {
                 time_t lastChecked;
-                if (string2Int(ss[2], lastChecked) && lastChecked + ttl >= time(0)) {
+                if (string2Int(ss[2], lastChecked) && (uint64_t) lastChecked + request.ttl >= (uint64_t) time(0)) {
                     skip = true;
-                    if (effectiveUrl)
-                        *effectiveUrl = url_;
+                    result.effectiveUri = request.uri;
+                    result.etag = ss[1];
                 } else if (!ss[1].empty()) {
                     debug(format("verifying previous ETag '%1%'") % ss[1]);
                     expectedETag = ss[1];
@@ -826,17 +802,17 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa
     if (!skip) {
 
         try {
-            DownloadRequest request(url);
-            request.expectedETag = expectedETag;
-            auto res = download(request);
-            if (effectiveUrl)
-                *effectiveUrl = res.effectiveUrl;
+            DownloadRequest request2(url);
+            request2.expectedETag = expectedETag;
+            auto res = download(request2);
+            result.effectiveUri = res.effectiveUri;
+            result.etag = res.etag;
 
             if (!res.cached) {
                 ValidPathInfo info;
                 StringSink sink;
                 dumpString(*res.data, sink);
-                Hash hash = hashString(expectedHash ? expectedHash.type : htSHA256, *res.data);
+                Hash hash = hashString(request.expectedHash ? request.expectedHash.type : htSHA256, *res.data);
                 info.path = store->makeFixedOutputPath(false, hash, name);
                 info.narHash = hashString(htSHA256, *sink.s);
                 info.narSize = sink.s->size();
@@ -851,11 +827,12 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa
             writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
         } catch (DownloadError & e) {
             if (storePath.empty()) throw;
-            printError(format("warning: %1%; using cached result") % e.msg());
+            warn("%s; using cached result", e.msg());
+            result.etag = expectedETag;
         }
     }
 
-    if (unpack) {
+    if (request.unpack) {
         Path unpackedLink = cacheDir + "/" + baseNameOf(storePath) + "-unpacked";
         PathLocks lock2({unpackedLink}, fmt("waiting for lock on '%1%'...", unpackedLink));
         Path unpackedStorePath;
@@ -878,14 +855,16 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa
     }
 
     if (expectedStorePath != "" && storePath != expectedStorePath) {
-        Hash gotHash = unpack
-            ? hashPath(expectedHash.type, store->toRealPath(storePath)).first
-            : hashFile(expectedHash.type, store->toRealPath(storePath));
+        Hash gotHash = request.unpack
+            ? hashPath(request.expectedHash.type, store->toRealPath(storePath)).first
+            : hashFile(request.expectedHash.type, store->toRealPath(storePath));
         throw nix::Error("hash mismatch in file downloaded from '%s':\n  wanted: %s\n  got:    %s",
-            url, expectedHash.to_string(), gotHash.to_string());
+            url, request.expectedHash.to_string(), gotHash.to_string());
     }
 
-    return store->toRealPath(storePath);
+    result.storePath = storePath;
+    result.path = store->toRealPath(storePath);
+    return result;
 }
 
 
@@ -898,5 +877,4 @@ bool isUri(const string & s)
     return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
 }
 
-
 }