diff options
Diffstat (limited to 'src/libstore')
-rw-r--r-- | src/libstore/binary-cache-store.cc | 18 | ||||
-rw-r--r-- | src/libstore/build.cc | 38 | ||||
-rw-r--r-- | src/libstore/download.cc | 103 | ||||
-rw-r--r-- | src/libstore/download.hh | 11 | ||||
-rw-r--r-- | src/libstore/http-binary-cache-store.cc | 56 | ||||
-rw-r--r-- | src/libstore/store-api.cc | 107 |
6 files changed, 199 insertions, 134 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 8b736056e01d..4527ee6ba660 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -10,8 +10,6 @@ #include "nar-info-disk-cache.hh" #include "nar-accessor.hh" #include "json.hh" -#include "retry.hh" -#include "download.hh" #include <chrono> @@ -81,15 +79,13 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink) std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) { - return retry<std::shared_ptr<std::string>>(downloadSettings.tries, [&]() -> std::shared_ptr<std::string> { - StringSink sink; - try { - getFile(path, sink); - } catch (NoSuchBinaryCacheFile &) { - return nullptr; - } - return sink.s; - }); + StringSink sink; + try { + getFile(path, sink); + } catch (NoSuchBinaryCacheFile &) { + return nullptr; + } + return sink.s; } Path BinaryCacheStore::narInfoFileFor(const Path & storePath) diff --git a/src/libstore/build.cc b/src/libstore/build.cc index 813d7e2c2d08..cf6428e12467 100644 --- a/src/libstore/build.cc +++ b/src/libstore/build.cc @@ -266,6 +266,12 @@ public: /* Set if at least one derivation had a timeout. */ bool timedOut; + /* Set if at least one derivation fails with a hash mismatch. */ + bool hashMismatch; + + /* Set if at least one derivation is not deterministic in check mode. */ + bool checkMismatch; + LocalStore & store; std::unique_ptr<HookInstance> hook; @@ -3219,6 +3225,7 @@ void DerivationGoal::registerOutputs() /* Throw an error after registering the path as valid. */ + worker.hashMismatch = true; delayedException = std::make_exception_ptr( BuildError("hash mismatch in fixed-output derivation '%s':\n wanted: %s\n got: %s", dest, h.to_string(), h2.to_string())); @@ -3261,6 +3268,7 @@ void DerivationGoal::registerOutputs() if (!worker.store.isValidPath(path)) continue; auto info = *worker.store.queryPathInfo(path); if (hash.first != info.narHash) { + worker.checkMismatch = true; if (settings.runDiffHook || settings.keepFailed) { Path dst = worker.store.toRealPath(path + checkSuffix); deletePath(dst); @@ -3272,10 +3280,10 @@ void DerivationGoal::registerOutputs() buildUser ? buildUser->getGID() : getgid(), path, dst, drvPath, tmpDir); - throw Error(format("derivation '%1%' may not be deterministic: output '%2%' differs from '%3%'") + throw NotDeterministic(format("derivation '%1%' may not be deterministic: output '%2%' differs from '%3%'") % drvPath % path % dst); } else - throw Error(format("derivation '%1%' may not be deterministic: output '%2%' differs") + throw NotDeterministic(format("derivation '%1%' may not be deterministic: output '%2%' differs") % drvPath % path); } @@ -4107,6 +4115,8 @@ Worker::Worker(LocalStore & store) lastWokenUp = steady_time_point::min(); permanentFailure = false; timedOut = false; + hashMismatch = false; + checkMismatch = false; } @@ -4467,7 +4477,29 @@ void Worker::waitForInput() unsigned int Worker::exitStatus() { - return timedOut ? 101 : (permanentFailure ? 100 : 1); + /* + * 1100100 + * ^^^^ + * |||`- timeout + * ||`-- output hash mismatch + * |`--- build failure + * `---- not deterministic + */ + unsigned int mask = 0; + bool buildFailure = permanentFailure || timedOut || hashMismatch; + if (buildFailure) + mask |= 0x04; // 100 + if (timedOut) + mask |= 0x01; // 101 + if (hashMismatch) + mask |= 0x02; // 102 + if (checkMismatch) { + mask |= 0x08; // 104 + } + + if (mask) + mask |= 0x60; + return mask ? mask : 1; } diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 6ce9525c38de..91087eebcfcb 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -8,7 +8,6 @@ #include "compression.hh" #include "pathlocks.hh" #include "finally.hh" -#include "retry.hh" #ifdef ENABLE_S3 #include <aws/core/client/ClientConfiguration.h> @@ -20,9 +19,11 @@ #include <curl/curl.h> #include <algorithm> +#include <cmath> #include <cstring> #include <iostream> #include <queue> +#include <random> #include <thread> using namespace std::string_literals; @@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader { CURLM * curlm = 0; + std::random_device rd; + std::mt19937 mt19937; + struct DownloadItem : public std::enable_shared_from_this<DownloadItem> { CurlDownloader & downloader; @@ -57,10 +61,20 @@ struct CurlDownloader : public Downloader bool active = false; // whether the handle has been added to the multi object std::string status; + unsigned int attempt = 0; + + /* Don't start this download until the specified time point + has been reached. */ + std::chrono::steady_clock::time_point embargo; + struct curl_slist * requestHeaders = 0; std::string encoding; + bool acceptRanges = false; + + curl_off_t writtenToSink = 0; + DownloadItem(CurlDownloader & downloader, const DownloadRequest & request, Callback<DownloadResult> callback) @@ -71,9 +85,10 @@ struct CurlDownloader : public Downloader {request.uri}, request.parentAct) , callback(callback) , finalSink([this](const unsigned char * data, size_t len) { - if (this->request.dataCallback) + if (this->request.dataCallback) { + writtenToSink += len; this->request.dataCallback((char *) data, len); - else + } else this->result.data->append((char *) data, len); }) { @@ -151,6 +166,7 @@ struct CurlDownloader : public Downloader status = ss.size() >= 2 ? ss[1] : ""; result.data = std::make_shared<std::string>(); result.bodySize = 0; + acceptRanges = false; encoding = ""; } else { auto i = line.find(':'); @@ -168,7 +184,9 @@ struct CurlDownloader : public Downloader return 0; } } else if (name == "content-encoding") - encoding = trim(string(line, i + 1));; + encoding = trim(string(line, i + 1)); + else if (name == "accept-ranges" && toLower(trim(std::string(line, i + 1))) == "bytes") + acceptRanges = true; } } return realSize; @@ -244,6 +262,8 @@ struct CurlDownloader : public Downloader #if LIBCURL_VERSION_NUM >= 0x072f00 if (downloadSettings.enableHttp2) curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS); + else + curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); #endif curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper); curl_easy_setopt(req, CURLOPT_WRITEDATA, this); @@ -284,6 +304,9 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str()); curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL); + if (writtenToSink) + curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink); + result.data = std::make_shared<std::string>(); result.bodySize = 0; } @@ -318,7 +341,7 @@ struct CurlDownloader : public Downloader failEx(writeException); else if (code == CURLE_OK && - (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) + (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 206 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; done = true; @@ -375,7 +398,9 @@ struct CurlDownloader : public Downloader } } - fail( + attempt++; + + auto exc = code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri)) : httpStatus != 0 @@ -386,15 +411,41 @@ struct CurlDownloader : public Downloader ) : DownloadError(err, fmt("unable to %s '%s': %s (%d)", - request.verb(), request.uri, curl_easy_strerror(code), code))); + request.verb(), request.uri, curl_easy_strerror(code), code)); + + /* If this is a transient error, then maybe retry the + download after a while. If we're writing to a + sink, we can only retry if the server supports + ranged requests. */ + if (err == Transient + && attempt < request.tries + && (!this->request.dataCallback + || writtenToSink == 0 + || (acceptRanges && encoding.empty()))) + { + int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); + if (writtenToSink) + warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms); + else + warn("%s; retrying in %d ms", exc.what(), ms); + embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); + downloader.enqueueItem(shared_from_this()); + } + else + fail(exc); } } }; struct State { + struct EmbargoComparator { + bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) { + return i1->embargo > i2->embargo; + } + }; bool quit = false; - std::vector<std::shared_ptr<DownloadItem>> incoming; + std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming; }; Sync<State> state_; @@ -407,6 +458,7 @@ struct CurlDownloader : public Downloader std::thread workerThread; CurlDownloader() + : mt19937(rd()) { static std::once_flag globalInit; std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); @@ -500,7 +552,9 @@ struct CurlDownloader : public Downloader nextWakeup = std::chrono::steady_clock::time_point(); - /* Add new curl requests from the incoming requests queue. */ + /* Add new curl requests from the incoming requests queue, + except for requests that are embargoed (waiting for a + retry timeout to expire). */ if (extraFDs[0].revents & CURL_WAIT_POLLIN) { char buf[1024]; auto res = read(extraFDs[0].fd, buf, sizeof(buf)); @@ -509,9 +563,22 @@ struct CurlDownloader : public Downloader } std::vector<std::shared_ptr<DownloadItem>> incoming; + auto now = std::chrono::steady_clock::now(); + { auto state(state_.lock()); - std::swap(state->incoming, incoming); + while (!state->incoming.empty()) { + auto item = state->incoming.top(); + if (item->embargo <= now) { + incoming.push_back(item); + state->incoming.pop(); + } else { + if (nextWakeup == std::chrono::steady_clock::time_point() + || item->embargo < nextWakeup) + nextWakeup = item->embargo; + break; + } + } quit = state->quit; } @@ -533,12 +600,12 @@ struct CurlDownloader : public Downloader workerThreadMain(); } catch (nix::Interrupted & e) { } catch (std::exception & e) { - printError(format("unexpected error in download thread: %s") % e.what()); + printError("unexpected error in download thread: %s", e.what()); } { auto state(state_.lock()); - state->incoming.clear(); + while (!state->incoming.empty()) state->incoming.pop(); state->quit = true; } } @@ -554,7 +621,7 @@ struct CurlDownloader : public Downloader auto state(state_.lock()); if (state->quit) throw nix::Error("cannot enqueue download request because the download thread is shutting down"); - state->incoming.push_back(item); + state->incoming.push(item); } writeFull(wakeupPipe.writeSide.get(), " "); } @@ -637,9 +704,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & DownloadResult Downloader::download(const DownloadRequest & request) { - return retry<DownloadResult>(request.tries, [&]() { - return enqueueDownload(request).get(); - }); + return enqueueDownload(request).get(); } void Downloader::download(DownloadRequest && request, Sink & sink) @@ -825,7 +890,7 @@ CachedDownloadResult Downloader::downloadCached( writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n"); } catch (DownloadError & e) { if (storePath.empty()) throw; - warn("%s; using cached result", e.msg()); + warn("warning: %s; using cached result", e.msg()); result.etag = expectedETag; } } @@ -853,10 +918,11 @@ CachedDownloadResult Downloader::downloadCached( } if (expectedStorePath != "" && storePath != expectedStorePath) { + unsigned int statusCode = 102; Hash gotHash = request.unpack ? hashPath(request.expectedHash.type, store->toRealPath(storePath)).first : hashFile(request.expectedHash.type, store->toRealPath(storePath)); - throw nix::Error("hash mismatch in file downloaded from '%s':\n wanted: %s\n got: %s", + throw nix::Error(statusCode, "hash mismatch in file downloaded from '%s':\n wanted: %s\n got: %s", url, request.expectedHash.to_string(), gotHash.to_string()); } @@ -875,4 +941,5 @@ bool isUri(const string & s) return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh"; } + } diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 9e965b506d0a..3b7fff3ba4cb 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -96,13 +96,11 @@ struct Downloader std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); - /* Synchronously download a file. The request will be retried in - case of transient failures. */ + /* Synchronously download a file. */ DownloadResult download(const DownloadRequest & request); /* Download a file, writing its data to a sink. The sink will be - invoked on the thread of the caller. The request will not be - retried in case of transient failures. */ + invoked on the thread of the caller. */ void download(DownloadRequest && request, Sink & sink); /* Check if the specified file is already in ~/.cache/nix/tarballs @@ -128,11 +126,6 @@ public: DownloadError(Downloader::Error error, const FormatOrString & fs) : Error(fs), error(error) { } - - bool isTransient() override - { - return error == Downloader::Error::Transient; - } }; bool isUri(const string & s); diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 5633b4355d25..df2fb93320fc 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -2,7 +2,6 @@ #include "download.hh" #include "globals.hh" #include "nar-info-disk-cache.hh" -#include "retry.hh" namespace nix { @@ -136,46 +135,21 @@ protected: { checkEnabled(); - struct State - { - DownloadRequest request; - std::function<void()> tryDownload; - unsigned int attempt = 0; - State(DownloadRequest && request) : request(request) {} - }; - - auto state = std::make_shared<State>(makeRequest(path)); - - state->tryDownload = [callback, state, this]() { - getDownloader()->enqueueDownload(state->request, - {[callback, state, this](std::future<DownloadResult> result) { - try { - callback(result.get().data); - } catch (DownloadError & e) { - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return callback(std::shared_ptr<std::string>()); - ++state->attempt; - if (state->attempt < state->request.tries && e.isTransient()) { - auto ms = retrySleepTime(state->attempt); - warn("%s; retrying in %d ms", e.what(), ms); - /* We can't sleep here because that would - block the download thread. So use a - separate thread for sleeping. */ - std::thread([state, ms]() { - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - state->tryDownload(); - }).detach(); - } else { - maybeDisable(); - callback.rethrow(); - } - } catch (...) { - callback.rethrow(); - } - }}); - }; - - state->tryDownload(); + auto request(makeRequest(path)); + + getDownloader()->enqueueDownload(request, + {[callback, this](std::future<DownloadResult> result) { + try { + callback(result.get().data); + } catch (DownloadError & e) { + if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + return callback(std::shared_ptr<std::string>()); + maybeDisable(); + callback.rethrow(); + } catch (...) { + callback.rethrow(); + } + }}); } }; diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 28ad7c019a94..f5608d3849f1 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -6,11 +6,10 @@ #include "thread-pool.hh" #include "json.hh" #include "derivations.hh" -#include "retry.hh" -#include "download.hh" #include <future> + namespace nix { @@ -86,18 +85,25 @@ string storePathToHash(const Path & path) void checkStoreName(const string & name) { string validChars = "+-._?="; + + auto baseError = format("The path name '%2%' is invalid: %3%. " + "Path names are alphanumeric and can include the symbols %1% " + "and must not begin with a period. " + "Note: If '%2%' is a source file and you cannot rename it on " + "disk, builtins.path { name = ... } can be used to give it an " + "alternative name.") % validChars % name; + /* Disallow names starting with a dot for possible security reasons (e.g., "." and ".."). */ if (string(name, 0, 1) == ".") - throw Error(format("illegal name: '%1%'") % name); + throw Error(baseError % "it is illegal to start the name with a period"); for (auto & i : name) if (!((i >= 'A' && i <= 'Z') || (i >= 'a' && i <= 'z') || (i >= '0' && i <= '9') || validChars.find(i) != string::npos)) { - throw Error(format("invalid character '%1%' in name '%2%'") - % i % name); + throw Error(baseError % (format("the '%1%' character is invalid") % i)); } } @@ -573,57 +579,54 @@ void Store::buildPaths(const PathSet & paths, BuildMode buildMode) void copyStorePath(ref<Store> srcStore, ref<Store> dstStore, const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs) { - retry<void>(downloadSettings.tries, [&]() { - - auto srcUri = srcStore->getUri(); - auto dstUri = dstStore->getUri(); - - Activity act(*logger, lvlInfo, actCopyPath, - srcUri == "local" || srcUri == "daemon" - ? fmt("copying path '%s' to '%s'", storePath, dstUri) - : dstUri == "local" || dstUri == "daemon" - ? fmt("copying path '%s' from '%s'", storePath, srcUri) - : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri), - {storePath, srcUri, dstUri}); - PushActivity pact(act.id); - - auto info = srcStore->queryPathInfo(storePath); - - uint64_t total = 0; - - if (!info->narHash) { - StringSink sink; - srcStore->narFromPath({storePath}, sink); - auto info2 = make_ref<ValidPathInfo>(*info); - info2->narHash = hashString(htSHA256, *sink.s); - if (!info->narSize) info2->narSize = sink.s->size(); - if (info->ultimate) info2->ultimate = false; - info = info2; - - StringSource source(*sink.s); - dstStore->addToStore(*info, source, repair, checkSigs); - return; - } + auto srcUri = srcStore->getUri(); + auto dstUri = dstStore->getUri(); + + Activity act(*logger, lvlInfo, actCopyPath, + srcUri == "local" || srcUri == "daemon" + ? fmt("copying path '%s' to '%s'", storePath, dstUri) + : dstUri == "local" || dstUri == "daemon" + ? fmt("copying path '%s' from '%s'", storePath, srcUri) + : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri), + {storePath, srcUri, dstUri}); + PushActivity pact(act.id); + + auto info = srcStore->queryPathInfo(storePath); + + uint64_t total = 0; + + if (!info->narHash) { + StringSink sink; + srcStore->narFromPath({storePath}, sink); + auto info2 = make_ref<ValidPathInfo>(*info); + info2->narHash = hashString(htSHA256, *sink.s); + if (!info->narSize) info2->narSize = sink.s->size(); + if (info->ultimate) info2->ultimate = false; + info = info2; + + StringSource source(*sink.s); + dstStore->addToStore(*info, source, repair, checkSigs); + return; + } - if (info->ultimate) { - auto info2 = make_ref<ValidPathInfo>(*info); - info2->ultimate = false; - info = info2; - } + if (info->ultimate) { + auto info2 = make_ref<ValidPathInfo>(*info); + info2->ultimate = false; + info = info2; + } - auto source = sinkToSource([&](Sink & sink) { - LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { - sink(data, len); - total += len; - act.progress(total, info->narSize); - }); - srcStore->narFromPath({storePath}, wrapperSink); - }, [&]() { - throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri()); + auto source = sinkToSource([&](Sink & sink) { + LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { + sink(data, len); + total += len; + act.progress(total, info->narSize); }); - - dstStore->addToStore(*info, *source, repair, checkSigs); + srcStore->narFromPath({storePath}, wrapperSink); + }, [&]() { + throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri()); }); + + dstStore->addToStore(*info, *source, repair, checkSigs); } |