diff options
Diffstat (limited to 'src/libstore')
30 files changed, 722 insertions, 497 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 2e9a13e564ca..4527ee6ba660 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -54,17 +54,38 @@ void BinaryCacheStore::init() } } -std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) +void BinaryCacheStore::getFile(const std::string & path, + Callback<std::shared_ptr<std::string>> callback) +{ + try { + callback(getFile(path)); + } catch (...) { callback.rethrow(); } +} + +void BinaryCacheStore::getFile(const std::string & path, Sink & sink) { std::promise<std::shared_ptr<std::string>> promise; getFile(path, - [&](std::shared_ptr<std::string> result) { - promise.set_value(result); - }, - [&](std::exception_ptr exc) { - promise.set_exception(exc); - }); - return promise.get_future().get(); + {[&](std::future<std::shared_ptr<std::string>> result) { + try { + promise.set_value(result.get()); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }}); + auto data = promise.get_future().get(); + sink((unsigned char *) data->data(), data->size()); +} + +std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) +{ + StringSink sink; + try { + getFile(path, sink); + } catch (NoSuchBinaryCacheFile &) { + return nullptr; + } + return sink.s; } Path BinaryCacheStore::narInfoFileFor(const Path & storePath) @@ -196,30 +217,30 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) { auto info = queryPathInfo(storePath).cast<const NarInfo>(); - auto nar = getFile(info->url); - - if (!nar) throw Error(format("file '%s' missing from binary cache") % info->url); - - stats.narRead++; - stats.narReadCompressedBytes += nar->size(); - uint64_t narSize = 0; - StringSource source(*nar); - LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { sink(data, len); narSize += len; }); - decompress(info->compression, source, wrapperSink); + auto decompressor = makeDecompressionSink(info->compression, wrapperSink); + try { + getFile(info->url, *decompressor); + } catch (NoSuchBinaryCacheFile & e) { + throw SubstituteGone(e.what()); + } + + decompressor->finish(); + + stats.narRead++; + //stats.narReadCompressedBytes += nar->size(); // FIXME stats.narReadBytes += narSize; } void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) + Callback<std::shared_ptr<ValidPathInfo>> callback) { auto uri = getUri(); auto act = std::make_shared<Activity>(*logger, lvlTalkative, actQueryPathInfo, @@ -229,17 +250,22 @@ void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, auto narInfoFile = narInfoFileFor(storePath); getFile(narInfoFile, - [=](std::shared_ptr<std::string> data) { - if (!data) return success(0); + {[=](std::future<std::shared_ptr<std::string>> fut) { + try { + auto data = fut.get(); - stats.narInfoRead++; + if (!data) return callback(nullptr); - callSuccess(success, failure, (std::shared_ptr<ValidPathInfo>) - std::make_shared<NarInfo>(*this, *data, narInfoFile)); + stats.narInfoRead++; - (void) act; // force Activity into this lambda to ensure it stays alive - }, - failure); + callback((std::shared_ptr<ValidPathInfo>) + std::make_shared<NarInfo>(*this, *data, narInfoFile)); + + (void) act; // force Activity into this lambda to ensure it stays alive + } catch (...) { + callback.rethrow(); + } + }}); } Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index e20b968442b7..6bc83fc50ca1 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -38,11 +38,16 @@ public: const std::string & data, const std::string & mimeType) = 0; - /* Return the contents of the specified file, or null if it - doesn't exist. */ + /* Note: subclasses must implement at least one of the two + following getFile() methods. */ + + /* Dump the contents of the specified file to a sink. */ + virtual void getFile(const std::string & path, Sink & sink); + + /* Fetch the specified file and call the specified callback with + the result. A subclass may implement this asynchronously. */ virtual void getFile(const std::string & path, - std::function<void(std::shared_ptr<std::string>)> success, - std::function<void(std::exception_ptr exc)> failure) = 0; + Callback<std::shared_ptr<std::string>> callback); std::shared_ptr<std::string> getFile(const std::string & path); @@ -71,8 +76,7 @@ public: { unsupported(); } void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) override; + Callback<std::shared_ptr<ValidPathInfo>> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override @@ -131,4 +135,6 @@ public: }; +MakeError(NoSuchBinaryCacheFile, Error); + } diff --git a/src/libstore/build.cc b/src/libstore/build.cc index f70ab8108fd7..cd37f7a3fc08 100644 --- a/src/libstore/build.cc +++ b/src/libstore/build.cc @@ -29,7 +29,9 @@ #include <sys/utsname.h> #include <sys/select.h> #include <sys/resource.h> +#include <sys/socket.h> #include <fcntl.h> +#include <netdb.h> #include <unistd.h> #include <errno.h> #include <cstring> @@ -672,8 +674,10 @@ HookInstance::HookInstance() toHook.readSide = -1; sink = FdSink(toHook.writeSide.get()); - for (auto & setting : settings.getSettings()) - sink << 1 << setting.first << setting.second; + std::map<std::string, Config::SettingInfo> settings; + globalConfig.getSettings(settings); + for (auto & setting : settings) + sink << 1 << setting.first << setting.second.value; sink << 0; } @@ -731,7 +735,7 @@ private: /* Whether to retry substituting the outputs after building the inputs. */ - bool retrySubstitution = false; + bool retrySubstitution; /* The derivation stored at drvPath. */ std::unique_ptr<BasicDerivation> drv; @@ -1121,6 +1125,8 @@ void DerivationGoal::haveDerivation() { trace("have derivation"); + retrySubstitution = false; + for (auto & i : drv->outputs) worker.store.addTempRoot(i.second.path); @@ -1159,7 +1165,7 @@ void DerivationGoal::outputsSubstituted() /* If the substitutes form an incomplete closure, then we should build the dependencies of this derivation, but after that, we can still use the substitutes for this derivation itself. */ - if (nrIncompleteClosure > 0 && !retrySubstitution) retrySubstitution = true; + if (nrIncompleteClosure > 0) retrySubstitution = true; nrFailed = nrNoSubstituters = nrIncompleteClosure = 0; @@ -1773,12 +1779,14 @@ static std::once_flag dns_resolve_flag; static void preloadNSS() { /* builtin:fetchurl can trigger a DNS lookup, which with glibc can trigger a dynamic library load of one of the glibc NSS libraries in a sandboxed child, which will fail unless the library's already - been loaded in the parent. So we force a download of an invalid URL to force the NSS machinery to + been loaded in the parent. So we force a lookup of an invalid domain to force the NSS machinery to load its lookup libraries in the parent before any child gets a chance to. */ std::call_once(dns_resolve_flag, []() { - DownloadRequest request("http://this.pre-initializes.the.dns.resolvers.invalid"); - request.tries = 1; // We only need to do it once, and this also suppresses an annoying warning - try { getDownloader()->download(request); } catch (...) {} + struct addrinfo *res = NULL; + + if (getaddrinfo("this.pre-initializes.the.dns.resolvers.invalid.", "http", NULL, &res) != 0) { + if (res) freeaddrinfo(res); + } }); } @@ -1999,7 +2007,7 @@ void DerivationGoal::startBuilder() /* Create /etc/hosts with localhost entry. */ if (!fixedOutput) - writeFile(chrootRootDir + "/etc/hosts", "127.0.0.1 localhost\n"); + writeFile(chrootRootDir + "/etc/hosts", "127.0.0.1 localhost\n::1 localhost\n"); /* Make the closure of the inputs available in the chroot, rather than the whole Nix store. This prevents any access @@ -3522,8 +3530,8 @@ private: /* The current substituter. */ std::shared_ptr<Store> sub; - /* Whether any substituter can realise this path. */ - bool hasSubstitute; + /* Whether a substituter failed. */ + bool substituterFailed = false; /* Path info returned by the substituter's query info operation. */ std::shared_ptr<const ValidPathInfo> info; @@ -3587,7 +3595,6 @@ public: SubstitutionGoal::SubstitutionGoal(const Path & storePath, Worker & worker, RepairFlag repair) : Goal(worker) - , hasSubstitute(false) , repair(repair) { this->storePath = storePath; @@ -3651,9 +3658,9 @@ void SubstitutionGoal::tryNext() /* Hack: don't indicate failure if there were no substituters. In that case the calling derivation should just do a build. */ - amDone(hasSubstitute ? ecFailed : ecNoSubstituters); + amDone(substituterFailed ? ecFailed : ecNoSubstituters); - if (hasSubstitute) { + if (substituterFailed) { worker.failedSubstitutions++; worker.updateProgress(); } @@ -3689,8 +3696,6 @@ void SubstitutionGoal::tryNext() worker.updateProgress(); - hasSubstitute = true; - /* Bail out early if this substituter lacks a valid signature. LocalStore::addToStore() also checks for this, but only after we've downloaded the path. */ @@ -3805,8 +3810,19 @@ void SubstitutionGoal::finished() state = &SubstitutionGoal::init; worker.waitForAWhile(shared_from_this()); return; - } catch (Error & e) { - printError(e.msg()); + } catch (std::exception & e) { + printError(e.what()); + + /* Cause the parent build to fail unless --fallback is given, + or the substitute has disappeared. The latter case behaves + the same as the substitute never having existed in the + first place. */ + try { + throw; + } catch (SubstituteGone &) { + } catch (...) { + substituterFailed = true; + } /* Try the next substitute. */ state = &SubstitutionGoal::tryNext; diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc index 4ca4a838e3c4..b4dcb35f951a 100644 --- a/src/libstore/builtins/fetchurl.cc +++ b/src/libstore/builtins/fetchurl.cc @@ -22,52 +22,55 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) return i->second; }; - auto fetch = [&](const string & url) { - /* No need to do TLS verification, because we check the hash of - the result anyway. */ - DownloadRequest request(url); - request.verifyTLS = false; - request.decompress = false; + Path storePath = getAttr("out"); + auto mainUrl = getAttr("url"); - /* Note: have to use a fresh downloader here because we're in - a forked process. */ - auto data = makeDownloader()->download(request); - assert(data.data); + /* Note: have to use a fresh downloader here because we're in + a forked process. */ + auto downloader = makeDownloader(); - return data.data; - }; + auto fetch = [&](const std::string & url) { + + auto source = sinkToSource([&](Sink & sink) { + + /* No need to do TLS verification, because we check the hash of + the result anyway. */ + DownloadRequest request(url); + request.verifyTLS = false; + request.decompress = false; + + auto decompressor = makeDecompressionSink( + hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink); + downloader->download(std::move(request), *decompressor); + decompressor->finish(); + }); + + if (get(drv.env, "unpack", "") == "1") + restorePath(storePath, *source); + else + writeFile(storePath, *source); - std::shared_ptr<std::string> data; + auto executable = drv.env.find("executable"); + if (executable != drv.env.end() && executable->second == "1") { + if (chmod(storePath.c_str(), 0755) == -1) + throw SysError(format("making '%1%' executable") % storePath); + } + }; + /* Try the hashed mirrors first. */ if (getAttr("outputHashMode") == "flat") for (auto hashedMirror : settings.hashedMirrors.get()) try { if (!hasSuffix(hashedMirror, "/")) hashedMirror += '/'; auto ht = parseHashType(getAttr("outputHashAlgo")); - data = fetch(hashedMirror + printHashType(ht) + "/" + Hash(getAttr("outputHash"), ht).to_string(Base16, false)); - break; + fetch(hashedMirror + printHashType(ht) + "/" + Hash(getAttr("outputHash"), ht).to_string(Base16, false)); + return; } catch (Error & e) { debug(e.what()); } - if (!data) data = fetch(getAttr("url")); - - Path storePath = getAttr("out"); - - auto unpack = drv.env.find("unpack"); - if (unpack != drv.env.end() && unpack->second == "1") { - if (string(*data, 0, 6) == string("\xfd" "7zXZ\0", 6)) - data = decompress("xz", *data); - StringSource source(*data); - restorePath(storePath, source); - } else - writeFile(storePath, *data); - - auto executable = drv.env.find("executable"); - if (executable != drv.env.end() && executable->second == "1") { - if (chmod(storePath.c_str(), 0755) == -1) - throw SysError(format("making '%1%' executable") % storePath); - } + /* Otherwise try the specified URL. */ + fetch(mainUrl); } } diff --git a/src/libstore/derivations.cc b/src/libstore/derivations.cc index 74b861281ee0..1e187ec5e954 100644 --- a/src/libstore/derivations.cc +++ b/src/libstore/derivations.cc @@ -342,7 +342,7 @@ Hash hashDerivationModulo(Store & store, Derivation drv) Hash h = drvHashes[i.first]; if (!h) { assert(store.isValidPath(i.first)); - Derivation drv2 = readDerivation(i.first); + Derivation drv2 = readDerivation(store.toRealPath(i.first)); h = hashDerivationModulo(store, drv2); drvHashes[i.first] = h; } diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 54f4dd218007..973fca0b130f 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -7,6 +7,7 @@ #include "s3.hh" #include "compression.hh" #include "pathlocks.hh" +#include "finally.hh" #ifdef ENABLE_S3 #include <aws/core/client/ClientConfiguration.h> @@ -29,12 +30,25 @@ using namespace std::string_literals; namespace nix { -double getTime() +struct DownloadSettings : Config { - struct timeval tv; - gettimeofday(&tv, 0); - return tv.tv_sec + (tv.tv_usec / 1000000.0); -} + Setting<bool> enableHttp2{this, true, "http2", + "Whether to enable HTTP/2 support."}; + + Setting<std::string> userAgentSuffix{this, "", "user-agent-suffix", + "String appended to the user agent in HTTP requests."}; + + Setting<size_t> httpConnections{this, 25, "http-connections", + "Number of parallel HTTP connections.", + {"binary-caches-parallel-connections"}}; + + Setting<unsigned long> connectTimeout{this, 0, "connect-timeout", + "Timeout for connecting to servers during downloads. 0 means use curl's builtin default."}; +}; + +static DownloadSettings downloadSettings; + +static GlobalConfig::Register r1(&downloadSettings); std::string resolveUri(const std::string & uri) { @@ -44,16 +58,6 @@ std::string resolveUri(const std::string & uri) return uri; } -ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data) -{ - if (encoding == "") - return data; - else if (encoding == "br") - return decompress(encoding, *data); - else - throw Error("unsupported Content-Encoding '%s'", encoding); -} - struct CurlDownloader : public Downloader { CURLM * curlm = 0; @@ -61,8 +65,6 @@ struct CurlDownloader : public Downloader std::random_device rd; std::mt19937 mt19937; - bool enableHttp2; - struct DownloadItem : public std::enable_shared_from_this<DownloadItem> { CurlDownloader & downloader; @@ -70,8 +72,7 @@ struct CurlDownloader : public Downloader DownloadResult result; Activity act; bool done = false; // whether either the success or failure function has been called - std::function<void(const DownloadResult &)> success; - std::function<void(std::exception_ptr exc)> failure; + Callback<DownloadResult> callback; CURL * req = 0; bool active = false; // whether the handle has been added to the multi object std::string status; @@ -86,10 +87,21 @@ struct CurlDownloader : public Downloader std::string encoding; - DownloadItem(CurlDownloader & downloader, const DownloadRequest & request) + DownloadItem(CurlDownloader & downloader, + const DownloadRequest & request, + Callback<DownloadResult> callback) : downloader(downloader) , request(request) - , act(*logger, lvlTalkative, actDownload, fmt("downloading '%s'", request.uri), {request.uri}, request.parentAct) + , act(*logger, lvlTalkative, actDownload, + fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), + {request.uri}, request.parentAct) + , callback(callback) + , finalSink([this](const unsigned char * data, size_t len) { + if (this->request.dataCallback) + this->request.dataCallback((char *) data, len); + else + this->result.data->append((char *) data, len); + }) { if (!request.expectedETag.empty()) requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); @@ -113,19 +125,40 @@ struct CurlDownloader : public Downloader } } - template<class T> - void fail(const T & e) + void failEx(std::exception_ptr ex) { assert(!done); done = true; - callFailure(failure, std::make_exception_ptr(e)); + callback.rethrow(ex); + } + + template<class T> + void fail(const T & e) + { + failEx(std::make_exception_ptr(e)); } + LambdaSink finalSink; + std::shared_ptr<CompressionSink> decompressionSink; + + std::exception_ptr writeException; + size_t writeCallback(void * contents, size_t size, size_t nmemb) { - size_t realSize = size * nmemb; - result.data->append((char *) contents, realSize); - return realSize; + try { + size_t realSize = size * nmemb; + result.bodySize += realSize; + + if (!decompressionSink) + decompressionSink = makeDecompressionSink(encoding, finalSink); + + (*decompressionSink)((unsigned char *) contents, realSize); + + return realSize; + } catch (...) { + writeException = std::current_exception(); + return 0; + } } static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) @@ -143,6 +176,7 @@ struct CurlDownloader : public Downloader auto ss = tokenizeString<vector<string>>(line, " "); status = ss.size() >= 2 ? ss[1] : ""; result.data = std::make_shared<std::string>(); + result.bodySize = 0; encoding = ""; } else { auto i = line.find(':'); @@ -194,7 +228,7 @@ struct CurlDownloader : public Downloader } size_t readOffset = 0; - int readCallback(char *buffer, size_t size, size_t nitems) + size_t readCallback(char *buffer, size_t size, size_t nitems) { if (readOffset == request.data->length()) return 0; @@ -205,7 +239,7 @@ struct CurlDownloader : public Downloader return count; } - static int readCallbackWrapper(char *buffer, size_t size, size_t nitems, void * userp) + static size_t readCallbackWrapper(char *buffer, size_t size, size_t nitems, void * userp) { return ((DownloadItem *) userp)->readCallback(buffer, size, nitems); } @@ -225,15 +259,16 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str()); curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10); curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(req, CURLOPT_USERAGENT, ("curl/" LIBCURL_VERSION " Nix/" + nixVersion + - (settings.userAgentSuffix != "" ? " " + settings.userAgentSuffix.get() : "")).c_str()); + (downloadSettings.userAgentSuffix != "" ? " " + downloadSettings.userAgentSuffix.get() : "")).c_str()); #if LIBCURL_VERSION_NUM >= 0x072b00 curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1); #endif #if LIBCURL_VERSION_NUM >= 0x072f00 - if (downloader.enableHttp2) + if (downloadSettings.enableHttp2) curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS); #endif curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper); @@ -265,7 +300,7 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0); } - curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, settings.connectTimeout.get()); + curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, downloadSettings.connectTimeout.get()); curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L); curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME, lowSpeedTimeout); @@ -276,6 +311,7 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL); result.data = std::make_shared<std::string>(); + result.bodySize = 0; } void finish(CURLcode code) @@ -288,34 +324,40 @@ struct CurlDownloader : public Downloader if (effectiveUrlCStr) result.effectiveUrl = effectiveUrlCStr; - debug(format("finished download of '%s'; curl status = %d, HTTP status = %d, body = %d bytes") - % request.uri % code % httpStatus % (result.data ? result.data->size() : 0)); + debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes", + request.verb(), request.uri, code, httpStatus, result.bodySize); + + if (decompressionSink) + decompressionSink->finish(); if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) { code = CURLE_OK; httpStatus = 304; } - if (code == CURLE_OK && + if (writeException) + failEx(writeException); + + else if (code == CURLE_OK && (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; done = true; try { - if (request.decompress) - result.data = decodeContent(encoding, ref<std::string>(result.data)); - callSuccess(success, failure, const_cast<const DownloadResult &>(result)); act.progress(result.data->size(), result.data->size()); + callback(std::move(result)); } catch (...) { done = true; - callFailure(failure, std::current_exception()); + callback.rethrow(); } - } else { + } + + else { // We treat most errors as transient, but won't retry when hopeless Error err = Transient; - if (httpStatus == 404 || code == CURLE_FILE_COULDNT_READ_FILE) { + if (httpStatus == 404 || httpStatus == 410 || code == CURLE_FILE_COULDNT_READ_FILE) { // The file is definitely not there err = NotFound; } else if (httpStatus == 401 || httpStatus == 403 || httpStatus == 407) { @@ -345,6 +387,8 @@ struct CurlDownloader : public Downloader case CURLE_INTERFACE_FAILED: case CURLE_UNKNOWN_OPTION: case CURLE_SSL_CACERT_BADFILE: + case CURLE_TOO_MANY_REDIRECTS: + case CURLE_WRITE_ERROR: err = Misc; break; default: // Shut up warnings @@ -356,10 +400,16 @@ struct CurlDownloader : public Downloader auto exc = code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted - ? DownloadError(Interrupted, format("download of '%s' was interrupted") % request.uri) + ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri)) : httpStatus != 0 - ? DownloadError(err, format("unable to download '%s': HTTP error %d (curl error: %s)") % request.uri % httpStatus % curl_easy_strerror(code)) - : DownloadError(err, format("unable to download '%s': %s (%d)") % request.uri % curl_easy_strerror(code) % code); + ? DownloadError(err, + fmt("unable to %s '%s': HTTP error %d", + request.verb(), request.uri, httpStatus) + + (code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code))) + ) + : DownloadError(err, + fmt("unable to %s '%s': %s (%d)", + request.verb(), request.uri, curl_easy_strerror(code), code)); /* If this is a transient error, then maybe retry the download after a while. */ @@ -408,11 +458,9 @@ struct CurlDownloader : public Downloader #endif #if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0 curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, - settings.binaryCachesParallelConnections.get()); + downloadSettings.httpConnections.get()); #endif - enableHttp2 = settings.enableHttp2; - wakeupPipe.create(); fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK); @@ -522,7 +570,7 @@ struct CurlDownloader : public Downloader } for (auto & item : incoming) { - debug(format("starting download of %s") % item->request.uri); + debug("starting %s of %s", item->request.verb(), item->request.uri); item->init(); curl_multi_add_handle(curlm, item->req); item->active = true; @@ -551,6 +599,11 @@ struct CurlDownloader : public Downloader void enqueueItem(std::shared_ptr<DownloadItem> item) { + if (item->request.data + && !hasPrefix(item->request.uri, "http://") + && !hasPrefix(item->request.uri, "https://")) + throw nix::Error("uploading to '%s' is not supported", item->request.uri); + { auto state(state_.lock()); if (state->quit) @@ -561,15 +614,14 @@ struct CurlDownloader : public Downloader } void enqueueDownload(const DownloadRequest & request, - std::function<void(const DownloadResult &)> success, - std::function<void(std::exception_ptr exc)> failure) override + Callback<DownloadResult> callback) override { /* Ugly hack to support s3:// URIs. */ if (hasPrefix(request.uri, "s3://")) { // FIXME: do this on a worker thread - sync2async<DownloadResult>(success, failure, [&]() -> DownloadResult { + try { #ifdef ENABLE_S3 - S3Helper s3Helper("", Aws::Region::US_EAST_1); // FIXME: make configurable + S3Helper s3Helper("", Aws::Region::US_EAST_1, ""); // FIXME: make configurable auto slash = request.uri.find('/', 5); if (slash == std::string::npos) throw nix::Error("bad S3 URI '%s'", request.uri); @@ -581,27 +633,22 @@ struct CurlDownloader : public Downloader if (!s3Res.data) throw DownloadError(NotFound, fmt("S3 object '%s' does not exist", request.uri)); res.data = s3Res.data; - return res; + callback(std::move(res)); #else throw nix::Error("cannot download '%s' because Nix is not built with S3 support", request.uri); #endif - }); + } catch (...) { callback.rethrow(); } return; } - auto item = std::make_shared<DownloadItem>(*this, request); - item->success = success; - item->failure = failure; - enqueueItem(item); + enqueueItem(std::make_shared<DownloadItem>(*this, request, callback)); } }; ref<Downloader> getDownloader() { - static std::shared_ptr<Downloader> downloader; - static std::once_flag downloaderCreated; - std::call_once(downloaderCreated, [&]() { downloader = makeDownloader(); }); - return ref<Downloader>(downloader); + static ref<Downloader> downloader = makeDownloader(); + return downloader; } ref<Downloader> makeDownloader() @@ -613,8 +660,13 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & { auto promise = std::make_shared<std::promise<DownloadResult>>(); enqueueDownload(request, - [promise](const DownloadResult & result) { promise->set_value(result); }, - [promise](std::exception_ptr exc) { promise->set_exception(exc); }); + {[promise](std::future<DownloadResult> fut) { + try { + promise->set_value(fut.get()); + } catch (...) { + promise->set_exception(std::current_exception()); + } + }}); return promise->get_future(); } @@ -623,6 +675,94 @@ DownloadResult Downloader::download(const DownloadRequest & request) return enqueueDownload(request).get(); } +void Downloader::download(DownloadRequest && request, Sink & sink) +{ + /* Note: we can't call 'sink' via request.dataCallback, because + that would cause the sink to execute on the downloader + thread. If 'sink' is a coroutine, this will fail. Also, if the + sink is expensive (e.g. one that does decompression and writing + to the Nix store), it would stall the download thread too much. + Therefore we use a buffer to communicate data between the + download thread and the calling thread. */ + + struct State { + bool quit = false; + std::exception_ptr exc; + std::string data; + std::condition_variable avail, request; + }; + + auto _state = std::make_shared<Sync<State>>(); + + /* In case of an exception, wake up the download thread. FIXME: + abort the download request. */ + Finally finally([&]() { + auto state(_state->lock()); + state->quit = true; + state->request.notify_one(); + }); + + request.dataCallback = [_state](char * buf, size_t len) { + + auto state(_state->lock()); + + if (state->quit) return; + + /* If the buffer is full, then go to sleep until the calling + thread wakes us up (i.e. when it has removed data from the + buffer). Note: this does stall the download thread. */ + while (state->data.size() > 1024 * 1024) { + if (state->quit) return; + debug("download buffer is full; going to sleep"); + state.wait(state->request); + } + + /* Append data to the buffer and wake up the calling + thread. */ + state->data.append(buf, len); + state->avail.notify_one(); + }; + + enqueueDownload(request, + {[_state](std::future<DownloadResult> fut) { + auto state(_state->lock()); + state->quit = true; + try { + fut.get(); + } catch (...) { + state->exc = std::current_exception(); + } + state->avail.notify_one(); + state->request.notify_one(); + }}); + + auto state(_state->lock()); + + while (true) { + checkInterrupt(); + + /* If no data is available, then wait for the download thread + to wake us up. */ + if (state->data.empty()) { + + if (state->quit) { + if (state->exc) std::rethrow_exception(state->exc); + break; + } + + state.wait(state->avail); + } + + /* If data is available, then flush it to the sink and wake up + the download thread if it's blocked on a full buffer. */ + if (!state->data.empty()) { + sink((unsigned char *) state->data.data(), state->data.size()); + state->data.clear(); + state->request.notify_one(); + } + } +} + Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl, int ttl) { auto url = resolveUri(url_); diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 7ade756fc356..f0228f7d053a 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -21,9 +21,15 @@ struct DownloadRequest bool decompress = true; std::shared_ptr<std::string> data; std::string mimeType; + std::function<void(char *, size_t)> dataCallback; DownloadRequest(const std::string & uri) : uri(uri), parentAct(getCurActivity()) { } + + std::string verb() + { + return data ? "upload" : "download"; + } }; struct DownloadResult @@ -32,6 +38,7 @@ struct DownloadResult std::string etag; std::string effectiveUrl; std::shared_ptr<std::string> data; + uint64_t bodySize = 0; }; class Store; @@ -42,14 +49,17 @@ struct Downloader the download. The future may throw a DownloadError exception. */ virtual void enqueueDownload(const DownloadRequest & request, - std::function<void(const DownloadResult &)> success, - std::function<void(std::exception_ptr exc)> failure) = 0; + Callback<DownloadResult> callback) = 0; std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); /* Synchronously download a file. */ DownloadResult download(const DownloadRequest & request); + /* Download a file, writing its data to a sink. The sink will be + invoked on the thread of the caller. */ + void download(DownloadRequest && request, Sink & sink); + /* Check if the specified file is already in ~/.cache/nix/tarballs and is more recent than โtarball-ttlโ seconds. Otherwise, use the recorded ETag to verify if the server has a more @@ -78,7 +88,4 @@ public: bool isUri(const string & s); -/* Decode data according to the Content-Encoding header. */ -ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data); - } diff --git a/src/libstore/gc.cc b/src/libstore/gc.cc index ba49749d830a..b415d5421476 100644 --- a/src/libstore/gc.cc +++ b/src/libstore/gc.cc @@ -7,6 +7,7 @@ #include <queue> #include <algorithm> #include <regex> +#include <random> #include <sys/types.h> #include <sys/stat.h> @@ -365,7 +366,7 @@ try_again: char buf[bufsiz]; auto res = readlink(file.c_str(), buf, bufsiz); if (res == -1) { - if (errno == ENOENT || errno == EACCES) + if (errno == ENOENT || errno == EACCES || errno == ESRCH) return; throw SysError("reading symlink"); } @@ -425,25 +426,28 @@ PathSet LocalStore::findRuntimeRoots() readProcLink((format("%1%/%2%") % fdStr % fd_ent->d_name).str(), paths); } } - if (errno) + if (errno) { + if (errno == ESRCH) + continue; throw SysError(format("iterating /proc/%1%/fd") % ent->d_name); - fdDir.reset(); - - auto mapLines = - tokenizeString<std::vector<string>>(readFile((format("/proc/%1%/maps") % ent->d_name).str(), true), "\n"); - for (const auto& line : mapLines) { - auto match = std::smatch{}; - if (std::regex_match(line, match, mapRegex)) - paths.emplace(match[1]); } + fdDir.reset(); try { + auto mapLines = + tokenizeString<std::vector<string>>(readFile((format("/proc/%1%/maps") % ent->d_name).str(), true), "\n"); + for (const auto& line : mapLines) { + auto match = std::smatch{}; + if (std::regex_match(line, match, mapRegex)) + paths.emplace(match[1]); + } + auto envString = readFile((format("/proc/%1%/environ") % ent->d_name).str(), true); auto env_end = std::sregex_iterator{}; for (auto i = std::sregex_iterator{envString.begin(), envString.end(), storePathRegex}; i != env_end; ++i) paths.emplace(i->str()); } catch (SysError & e) { - if (errno == ENOENT || errno == EACCES) + if (errno == ENOENT || errno == EACCES || errno == ESRCH) continue; throw; } @@ -829,7 +833,8 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) alphabetically first (e.g. /nix/store/000...). This matters when using --max-freed etc. */ vector<Path> entries_(entries.begin(), entries.end()); - random_shuffle(entries_.begin(), entries_.end()); + std::mt19937 gen(1); + std::shuffle(entries_.begin(), entries_.end(), gen); for (auto & i : entries_) tryToDelete(state, i); diff --git a/src/libstore/globals.cc b/src/libstore/globals.cc index 544566e0b573..d95db56726cb 100644 --- a/src/libstore/globals.cc +++ b/src/libstore/globals.cc @@ -28,9 +28,10 @@ namespace nix { Settings settings; +static GlobalConfig::Register r1(&settings); + Settings::Settings() - : Config({}) - , nixPrefix(NIX_PREFIX) + : nixPrefix(NIX_PREFIX) , nixStore(canonPath(getEnv("NIX_STORE_DIR", getEnv("NIX_STORE", NIX_STORE_DIR)))) , nixDataDir(canonPath(getEnv("NIX_DATA_DIR", NIX_DATA_DIR))) , nixLogDir(canonPath(getEnv("NIX_LOG_DIR", NIX_LOG_DIR))) @@ -69,20 +70,15 @@ Settings::Settings() allowedImpureHostPrefixes = tokenizeString<StringSet>(DEFAULT_ALLOWED_IMPURE_PREFIXES); } -void Settings::loadConfFile() +void loadConfFile() { - applyConfigFile(nixConfDir + "/nix.conf"); + globalConfig.applyConfigFile(settings.nixConfDir + "/nix.conf"); /* We only want to send overrides to the daemon, i.e. stuff from ~/.nix/nix.conf or the command line. */ - resetOverriden(); + globalConfig.resetOverriden(); - applyConfigFile(getConfigDir() + "/nix/nix.conf"); -} - -void Settings::set(const string & name, const string & value) -{ - Config::set(name, value); + globalConfig.applyConfigFile(getConfigDir() + "/nix/nix.conf"); } unsigned int Settings::getDefaultCores() @@ -162,23 +158,11 @@ void initPlugins() throw Error("could not dynamically open plugin file '%s': %s", file, dlerror()); } } - /* We handle settings registrations here, since plugins can add settings */ - if (RegisterSetting::settingRegistrations) { - for (auto & registration : *RegisterSetting::settingRegistrations) - settings.addSetting(registration); - delete RegisterSetting::settingRegistrations; - } - settings.handleUnknownSettings(); -} - -RegisterSetting::SettingRegistrations * RegisterSetting::settingRegistrations; -RegisterSetting::RegisterSetting(AbstractSetting * s) -{ - if (!settingRegistrations) - settingRegistrations = new SettingRegistrations; - settingRegistrations->emplace_back(s); + /* Since plugins can add settings, try to re-apply previously + unknown settings. */ + globalConfig.reapplyUnknownSettings(); + globalConfig.warnUnknownSettings(); } - } diff --git a/src/libstore/globals.hh b/src/libstore/globals.hh index 9360096aae8c..f589078dbb98 100644 --- a/src/libstore/globals.hh +++ b/src/libstore/globals.hh @@ -13,26 +13,6 @@ namespace nix { typedef enum { smEnabled, smRelaxed, smDisabled } SandboxMode; -extern bool useCaseHack; // FIXME - -struct CaseHackSetting : public BaseSetting<bool> -{ - CaseHackSetting(Config * options, - const std::string & name, - const std::string & description, - const std::set<std::string> & aliases = {}) - : BaseSetting<bool>(useCaseHack, name, description, aliases) - { - options->addSetting(this); - } - - void set(const std::string & str) override - { - BaseSetting<bool>::set(str); - nix::useCaseHack = value; - } -}; - struct MaxBuildJobsSetting : public BaseSetting<unsigned int> { MaxBuildJobsSetting(Config * options, @@ -56,10 +36,6 @@ public: Settings(); - void loadConfFile(); - - void set(const string & name, const string & value); - Path nixPrefix; /* The directory where we store sources and derived files. */ @@ -217,9 +193,6 @@ public: Setting<bool> showTrace{this, false, "show-trace", "Whether to show a stack trace on evaluation errors."}; - Setting<bool> enableNativeCode{this, false, "allow-unsafe-native-code-during-evaluation", - "Whether builtin functions that allow executing native code should be enabled."}; - Setting<SandboxMode> sandboxMode{this, smDisabled, "sandbox", "Whether to enable sandboxed builds. Can be \"true\", \"false\" or \"relaxed\".", {"build-use-chroot", "build-use-sandbox"}}; @@ -232,13 +205,6 @@ public: "Additional paths to make available inside the build sandbox.", {"build-extra-chroot-dirs", "build-extra-sandbox-paths"}}; - Setting<bool> restrictEval{this, false, "restrict-eval", - "Whether to restrict file system access to paths in $NIX_PATH, " - "and network access to the URI prefixes listed in 'allowed-uris'."}; - - Setting<bool> pureEval{this, false, "pure-eval", - "Whether to restrict file system and network access to files specified by cryptographic hash."}; - Setting<size_t> buildRepeat{this, 0, "repeat", "The number of times to repeat a build in order to verify determinism.", {"build-repeat"}}; @@ -280,13 +246,6 @@ public: Setting<Strings> secretKeyFiles{this, {}, "secret-key-files", "Secret keys with which to sign local builds."}; - Setting<size_t> binaryCachesParallelConnections{this, 25, "http-connections", - "Number of parallel HTTP connections.", - {"binary-caches-parallel-connections"}}; - - Setting<bool> enableHttp2{this, true, "http2", - "Whether to enable HTTP/2 support."}; - Setting<unsigned int> tarballTtl{this, 60 * 60, "tarball-ttl", "How soon to expire files fetched by builtins.fetchTarball and builtins.fetchurl."}; @@ -350,18 +309,6 @@ public: /* Path to the SSL CA file used */ Path caFile; - Setting<bool> enableImportFromDerivation{this, true, "allow-import-from-derivation", - "Whether the evaluator allows importing the result of a derivation."}; - - CaseHackSetting useCaseHack{this, "use-case-hack", - "Whether to enable a Darwin-specific hack for dealing with file name collisions."}; - - Setting<unsigned long> connectTimeout{this, 0, "connect-timeout", - "Timeout for connecting to servers during downloads. 0 means use curl's builtin default."}; - - Setting<std::string> userAgentSuffix{this, "", "user-agent-suffix", - "String appended to the user agent in HTTP requests."}; - #if __linux__ Setting<bool> filterSyscalls{this, true, "filter-syscalls", "Whether to prevent certain dangerous system calls, such as " @@ -383,9 +330,6 @@ public: Setting<uint64_t> maxFree{this, std::numeric_limits<uint64_t>::max(), "max-free", "Stop deleting garbage when free disk space is above the specified amount."}; - Setting<Strings> allowedUris{this, {}, "allowed-uris", - "Prefixes of URIs that builtin functions such as fetchurl and fetchGit are allowed to fetch."}; - Setting<Paths> pluginFiles{this, {}, "plugin-files", "Plugins to dynamically load at nix initialization time."}; }; @@ -398,15 +342,8 @@ extern Settings settings; anything else */ void initPlugins(); +void loadConfFile(); extern const string nixVersion; -struct RegisterSetting -{ - typedef std::vector<AbstractSetting *> SettingRegistrations; - static SettingRegistrations * settingRegistrations; - RegisterSetting(AbstractSetting * s); -}; - - } diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index b9e9cd5daba5..ab524d523cf2 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -73,32 +73,46 @@ protected: try { getDownloader()->download(req); } catch (DownloadError & e) { - throw UploadToHTTP(format("uploading to HTTP binary cache at %1% not supported: %2%") % cacheUri % e.msg()); + throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", cacheUri, e.msg()); } } - void getFile(const std::string & path, - std::function<void(std::shared_ptr<std::string>)> success, - std::function<void(std::exception_ptr exc)> failure) override + DownloadRequest makeRequest(const std::string & path) { DownloadRequest request(cacheUri + "/" + path); request.tries = 8; + return request; + } + + void getFile(const std::string & path, Sink & sink) override + { + auto request(makeRequest(path)); + try { + getDownloader()->download(std::move(request), sink); + } catch (DownloadError & e) { + if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri()); + throw; + } + } + + void getFile(const std::string & path, + Callback<std::shared_ptr<std::string>> callback) override + { + auto request(makeRequest(path)); getDownloader()->enqueueDownload(request, - [success](const DownloadResult & result) { - success(result.data); - }, - [success, failure](std::exception_ptr exc) { + {[callback](std::future<DownloadResult> result) { try { - std::rethrow_exception(exc); + callback(result.get().data); } catch (DownloadError & e) { if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return success(0); - failure(exc); + return callback(std::shared_ptr<std::string>()); + callback.rethrow(); } catch (...) { - failure(exc); + callback.rethrow(); } - }); + }}); } }; diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc index 5dee25308f7f..88d2574e86ef 100644 --- a/src/libstore/legacy-ssh-store.cc +++ b/src/libstore/legacy-ssh-store.cc @@ -17,6 +17,7 @@ struct LegacySSHStore : public Store const Setting<Path> sshKey{this, "", "ssh-key", "path to an SSH private key"}; const Setting<bool> compress{this, false, "compress", "whether to compress the connection"}; const Setting<Path> remoteProgram{this, "nix-store", "remote-program", "path to the nix-store executable on the remote system"}; + const Setting<std::string> remoteStore{this, "", "remote-store", "URI of the store on the remote system"}; // Hack for getting remote build log output. const Setting<int> logFD{this, -1, "log-fd", "file descriptor to which SSH's stderr is connected"}; @@ -27,6 +28,7 @@ struct LegacySSHStore : public Store FdSink to; FdSource from; int remoteVersion; + bool good = true; }; std::string host; @@ -41,7 +43,7 @@ struct LegacySSHStore : public Store , connections(make_ref<Pool<Connection>>( std::max(1, (int) maxConnections), [this]() { return openConnection(); }, - [](const ref<Connection> & r) { return true; } + [](const ref<Connection> & r) { return r->good; } )) , master( host, @@ -56,7 +58,9 @@ struct LegacySSHStore : public Store ref<Connection> openConnection() { auto conn = make_ref<Connection>(); - conn->sshConn = master.startCommand(fmt("%s --serve --write", remoteProgram)); + conn->sshConn = master.startCommand( + fmt("%s --serve --write", remoteProgram) + + (remoteStore.get() == "" ? "" : " --store " + shellEscape(remoteStore.get()))); conn->to = FdSink(conn->sshConn->in.get()); conn->from = FdSource(conn->sshConn->out.get()); @@ -84,10 +88,9 @@ struct LegacySSHStore : public Store } void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) override + Callback<std::shared_ptr<ValidPathInfo>> callback) override { - sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() -> std::shared_ptr<ValidPathInfo> { + try { auto conn(connections->get()); debug("querying remote host '%s' for info on '%s'", host, path); @@ -97,7 +100,7 @@ struct LegacySSHStore : public Store auto info = std::make_shared<ValidPathInfo>(); conn->from >> info->path; - if (info->path.empty()) return nullptr; + if (info->path.empty()) return callback(nullptr); assert(path == info->path); PathSet references; @@ -116,8 +119,8 @@ struct LegacySSHStore : public Store auto s = readString(conn->from); assert(s == ""); - return info; - }); + callback(std::move(info)); + } catch (...) { callback.rethrow(); } } void addToStore(const ValidPathInfo & info, Source & source, @@ -128,18 +131,48 @@ struct LegacySSHStore : public Store auto conn(connections->get()); - conn->to - << cmdImportPaths - << 1; - copyNAR(source, conn->to); - conn->to - << exportMagic - << info.path - << info.references - << info.deriver - << 0 - << 0; - conn->to.flush(); + if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 5) { + + conn->to + << cmdAddToStoreNar + << info.path + << info.deriver + << info.narHash.to_string(Base16, false) + << info.references + << info.registrationTime + << info.narSize + << info.ultimate + << info.sigs + << info.ca; + try { + copyNAR(source, conn->to); + } catch (...) { + conn->good = false; + throw; + } + conn->to.flush(); + + } else { + + conn->to + << cmdImportPaths + << 1; + try { + copyNAR(source, conn->to); + } catch (...) { + conn->good = false; + throw; + } + conn->to + << exportMagic + << info.path + << info.references + << info.deriver + << 0 + << 0; + conn->to.flush(); + + } if (readInt(conn->from) != 1) throw Error("failed to add path '%s' to remote host '%s', info.path, host"); diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 2577e90aef23..b7001795be4d 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -34,18 +34,14 @@ protected: const std::string & data, const std::string & mimeType) override; - void getFile(const std::string & path, - std::function<void(std::shared_ptr<std::string>)> success, - std::function<void(std::exception_ptr exc)> failure) override + void getFile(const std::string & path, Sink & sink) override { - sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { - try { - return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path)); - } catch (SysError & e) { - if (e.errNo == ENOENT) return std::shared_ptr<std::string>(); - throw; - } - }); + try { + readFile(binaryCacheDir + "/" + path, sink); + } catch (SysError & e) { + if (e.errNo == ENOENT) + throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache", path); + } } PathSet queryAllValidPaths() override diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc index ef8c2811bd86..c91dbf241bcf 100644 --- a/src/libstore/local-store.cc +++ b/src/libstore/local-store.cc @@ -450,7 +450,7 @@ static void canonicalisePathMetaData_(const Path & path, uid_t fromUid, InodesSe ssize_t eaSize = llistxattr(path.c_str(), nullptr, 0); if (eaSize < 0) { - if (errno != ENOTSUP) + if (errno != ENOTSUP && errno != ENODATA) throw SysError("querying extended attributes of '%s'", path); } else if (eaSize > 0) { std::vector<char> eaBuf(eaSize); @@ -629,17 +629,15 @@ uint64_t LocalStore::addValidPath(State & state, void LocalStore::queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) + Callback<std::shared_ptr<ValidPathInfo>> callback) { - sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { - + try { auto info = std::make_shared<ValidPathInfo>(); info->path = path; assertStorePath(path); - return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { + callback(retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { auto state(_state.lock()); /* Get the path info. */ @@ -679,8 +677,9 @@ void LocalStore::queryPathInfoUncached(const Path & path, info->references.insert(useQueryReferences.getStr(0)); return info; - }); - }); + })); + + } catch (...) { callback.rethrow(); } } @@ -976,7 +975,8 @@ const PublicKeys & LocalStore::getPublicKeys() void LocalStore::addToStore(const ValidPathInfo & info, Source & source, RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor) { - assert(info.narHash); + if (!info.narHash) + throw Error("cannot add path '%s' because it lacks a hash", info.path); if (requireSigs && checkSigs && !info.checkSignatures(*this, getPublicKeys())) throw Error("cannot add path '%s' because it lacks a valid signature", info.path); diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh index 1209a06356f7..746bdbeed793 100644 --- a/src/libstore/local-store.hh +++ b/src/libstore/local-store.hh @@ -127,8 +127,7 @@ public: PathSet queryAllValidPaths() override; void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) override; + Callback<std::shared_ptr<ValidPathInfo>> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index a82aa4e9cfa5..adcce026fa1d 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -33,9 +33,11 @@ void Store::computeFSClosure(const PathSet & startPaths, state->pending++; } - queryPathInfo(path, - [&, path](ref<ValidPathInfo> info) { - // FIXME: calls to isValidPath() should be async + queryPathInfo(path, {[&, path](std::future<ref<ValidPathInfo>> fut) { + // FIXME: calls to isValidPath() should be async + + try { + auto info = fut.get(); if (flipDirection) { @@ -75,14 +77,13 @@ void Store::computeFSClosure(const PathSet & startPaths, if (!--state->pending) done.notify_one(); } - }, - - [&, path](std::exception_ptr exc) { + } catch (...) { auto state(state_.lock()); - if (!state->exc) state->exc = exc; + if (!state->exc) state->exc = std::current_exception(); assert(state->pending); if (!--state->pending) done.notify_one(); - }); + }; + }}); }; for (auto & startPath : startPaths) diff --git a/src/libstore/optimise-store.cc b/src/libstore/optimise-store.cc index 7840167d7772..991512f21795 100644 --- a/src/libstore/optimise-store.cc +++ b/src/libstore/optimise-store.cc @@ -104,8 +104,7 @@ void LocalStore::optimisePath_(Activity * act, OptimiseStats & stats, *.app/Contents/Resources/\*.lproj seem to be the only paths affected. See https://github.com/NixOS/nix/issues/1443 for more discussion. */ - if (std::regex_search(path, std::regex("\\.app/Contents/PkgInfo$")) || - std::regex_search(path, std::regex("\\.app/Contents/Resources/.+\\.lproj$"))) + if (std::regex_search(path, std::regex("\\.app/Contents/.+$"))) { debug(format("'%1%' is not allowed to be linked in macOS") % path); return; diff --git a/src/libstore/profiles.cc b/src/libstore/profiles.cc index 4a607b584506..4c6af567ae6f 100644 --- a/src/libstore/profiles.cc +++ b/src/libstore/profiles.cc @@ -157,6 +157,29 @@ void deleteGenerations(const Path & profile, const std::set<unsigned int> & gens } } +void deleteGenerationsGreaterThan(const Path & profile, int max, bool dryRun) +{ + PathLocks lock; + lockProfile(lock, profile); + + int curGen; + bool fromCurGen = false; + Generations gens = findGenerations(profile, curGen); + for (auto i = gens.rbegin(); i != gens.rend(); ++i) { + if (i->number == curGen) { + fromCurGen = true; + max--; + continue; + } + if (fromCurGen) { + if (max) { + max--; + continue; + } + deleteGeneration2(profile, i->number, dryRun); + } + } +} void deleteOldGenerations(const Path & profile, bool dryRun) { diff --git a/src/libstore/profiles.hh b/src/libstore/profiles.hh index 1d4e6d3037db..5fa1533de311 100644 --- a/src/libstore/profiles.hh +++ b/src/libstore/profiles.hh @@ -39,6 +39,8 @@ void deleteGeneration(const Path & profile, unsigned int gen); void deleteGenerations(const Path & profile, const std::set<unsigned int> & gensToDelete, bool dryRun); +void deleteGenerationsGreaterThan(const Path & profile, const int max, bool dryRun); + void deleteOldGenerations(const Path & profile, bool dryRun); void deleteGenerationsOlderThan(const Path & profile, time_t t, bool dryRun); diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 080cef93d214..ea86ef052f53 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -7,6 +7,7 @@ #include "globals.hh" #include "derivations.hh" #include "pool.hh" +#include "finally.hh" #include <sys/types.h> #include <sys/stat.h> @@ -187,10 +188,11 @@ void RemoteStore::setOptions(Connection & conn) << settings.useSubstitutes; if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) { - auto overrides = settings.getSettings(true); + std::map<std::string, Config::SettingInfo> overrides; + globalConfig.getSettings(overrides, true); conn.to << overrides.size(); for (auto & i : overrides) - conn.to << i.first << i.second; + conn.to << i.first << i.second.value; } conn.processStderr(); @@ -293,38 +295,40 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, void RemoteStore::queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) + Callback<std::shared_ptr<ValidPathInfo>> callback) { - sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { - auto conn(connections->get()); - conn->to << wopQueryPathInfo << path; - try { - conn->processStderr(); - } catch (Error & e) { - // Ugly backwards compatibility hack. - if (e.msg().find("is not valid") != std::string::npos) - throw InvalidPath(e.what()); - throw; - } - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { - bool valid; conn->from >> valid; - if (!valid) throw InvalidPath(format("path '%s' is not valid") % path); - } - auto info = std::make_shared<ValidPathInfo>(); - info->path = path; - info->deriver = readString(conn->from); - if (info->deriver != "") assertStorePath(info->deriver); - info->narHash = Hash(readString(conn->from), htSHA256); - info->references = readStorePaths<PathSet>(*this, conn->from); - conn->from >> info->registrationTime >> info->narSize; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { - conn->from >> info->ultimate; - info->sigs = readStrings<StringSet>(conn->from); - conn->from >> info->ca; + try { + std::shared_ptr<ValidPathInfo> info; + { + auto conn(connections->get()); + conn->to << wopQueryPathInfo << path; + try { + conn->processStderr(); + } catch (Error & e) { + // Ugly backwards compatibility hack. + if (e.msg().find("is not valid") != std::string::npos) + throw InvalidPath(e.what()); + throw; + } + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { + bool valid; conn->from >> valid; + if (!valid) throw InvalidPath(format("path '%s' is not valid") % path); + } + info = std::make_shared<ValidPathInfo>(); + info->path = path; + info->deriver = readString(conn->from); + if (info->deriver != "") assertStorePath(info->deriver); + info->narHash = Hash(readString(conn->from), htSHA256); + info->references = readStorePaths<PathSet>(*this, conn->from); + conn->from >> info->registrationTime >> info->narSize; + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { + conn->from >> info->ultimate; + info->sigs = readStrings<StringSet>(conn->from); + conn->from >> info->ca; + } } - return info; - }); + callback(std::move(info)); + } catch (...) { callback.rethrow(); } } @@ -411,8 +415,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << info.references << info.registrationTime << info.narSize << info.ultimate << info.sigs << info.ca << repair << !checkSigs; - copyNAR(source, conn->to); - conn->processStderr(); + bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21; + if (!tunnel) copyNAR(source, conn->to); + conn->processStderr(0, tunnel ? &source : nullptr); } } @@ -435,8 +440,10 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath, conn->to.written = 0; conn->to.warn = true; connections->incCapacity(); - dumpPath(srcPath, conn->to, filter); - connections->decCapacity(); + { + Finally cleanup([&]() { connections->decCapacity(); }); + dumpPath(srcPath, conn->to, filter); + } conn->to.warn = false; conn->processStderr(); } catch (SysError & e) { diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index 95fa59a2069d..b488e34ce263 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -40,8 +40,7 @@ public: PathSet queryAllValidPaths() override; void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) override; + Callback<std::shared_ptr<ValidPathInfo>> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 103f141a1a11..7711388f05a9 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -84,8 +84,8 @@ static void initAWS() }); } -S3Helper::S3Helper(const std::string & profile, const std::string & region) - : config(makeConfig(region)) +S3Helper::S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint) + : config(makeConfig(region, endpoint)) , client(make_ref<Aws::S3::S3Client>( profile == "" ? std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>( @@ -99,7 +99,7 @@ S3Helper::S3Helper(const std::string & profile, const std::string & region) #else Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, #endif - false)) + endpoint.empty())) { } @@ -116,11 +116,14 @@ class RetryStrategy : public Aws::Client::DefaultRetryStrategy } }; -ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region) +ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region, const string & endpoint) { initAWS(); auto res = make_ref<Aws::Client::ClientConfiguration>(); res->region = region; + if (!endpoint.empty()) { + res->endpointOverride = endpoint; + } res->requestTimeoutMs = 600 * 1000; res->retryStrategy = std::make_shared<RetryStrategy>(); res->caFile = settings.caFile; @@ -150,10 +153,8 @@ S3Helper::DownloadResult S3Helper::getObject( auto result = checkAws(fmt("AWS error fetching '%s'", key), client->GetObject(request)); - res.data = decodeContent( - result.GetContentEncoding(), - make_ref<std::string>( - dynamic_cast<std::stringstream &>(result.GetBody()).str())); + res.data = decompress(result.GetContentEncoding(), + dynamic_cast<std::stringstream &>(result.GetBody()).str()); } catch (S3Error & e) { if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw; @@ -170,6 +171,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { const Setting<std::string> profile{this, "", "profile", "The name of the AWS configuration profile to use."}; const Setting<std::string> region{this, Aws::Region::US_EAST_1, "region", {"aws-region"}}; + const Setting<std::string> endpoint{this, "", "endpoint", "An optional override of the endpoint to use when talking to S3."}; const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"}; @@ -186,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Params & params, const std::string & bucketName) : S3BinaryCacheStore(params) , bucketName(bucketName) - , s3Helper(profile, region) + , s3Helper(profile, region, endpoint) { diskCache = getNarInfoDiskCache(); } @@ -273,6 +275,9 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore return true; } + std::shared_ptr<TransferManager> transferManager; + std::once_flag transferManagerCreated; + void uploadFile(const std::string & path, const std::string & data, const std::string & mimeType, const std::string & contentEncoding) @@ -284,61 +289,49 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads); - TransferManagerConfiguration transferConfig(executor.get()); - - transferConfig.s3Client = s3Helper.client; - transferConfig.bufferSize = bufferSize; - - if (contentEncoding != "") - transferConfig.createMultipartUploadTemplate.SetContentEncoding( - contentEncoding); - - transferConfig.uploadProgressCallback = - [&](const TransferManager *transferManager, - const std::shared_ptr<const TransferHandle> - &transferHandle) { - //FIXME: find a way to properly abort the multipart upload. - checkInterrupt(); - printTalkative("upload progress ('%s'): '%d' of '%d' bytes", - path, - transferHandle->GetBytesTransferred(), - transferHandle->GetBytesTotalSize()); - }; + std::call_once(transferManagerCreated, [&]() { - transferConfig.transferStatusUpdatedCallback = - [&](const TransferManager *, - const std::shared_ptr<const TransferHandle> - &transferHandle) { - switch (transferHandle->GetStatus()) { - case TransferStatus::COMPLETED: - printTalkative("upload of '%s' completed", path); - stats.put++; - stats.putBytes += data.size(); - break; - case TransferStatus::IN_PROGRESS: - break; - case TransferStatus::FAILED: - throw Error("AWS error: failed to upload 's3://%s/%s'", - bucketName, path); - break; - default: - throw Error("AWS error: transfer status of 's3://%s/%s' " - "in unexpected state", - bucketName, path); - }; - }; + TransferManagerConfiguration transferConfig(executor.get()); + + transferConfig.s3Client = s3Helper.client; + transferConfig.bufferSize = bufferSize; - std::shared_ptr<TransferManager> transferManager = - TransferManager::Create(transferConfig); + transferConfig.uploadProgressCallback = + [&](const TransferManager *transferManager, + const std::shared_ptr<const TransferHandle> + &transferHandle) + { + //FIXME: find a way to properly abort the multipart upload. + //checkInterrupt(); + debug("upload progress ('%s'): '%d' of '%d' bytes", + path, + transferHandle->GetBytesTransferred(), + transferHandle->GetBytesTotalSize()); + }; + + transferManager = TransferManager::Create(transferConfig); + }); auto now1 = std::chrono::steady_clock::now(); std::shared_ptr<TransferHandle> transferHandle = - transferManager->UploadFile(stream, bucketName, path, mimeType, - Aws::Map<Aws::String, Aws::String>()); + transferManager->UploadFile( + stream, bucketName, path, mimeType, + Aws::Map<Aws::String, Aws::String>(), + nullptr, contentEncoding); transferHandle->WaitUntilFinished(); + if (transferHandle->GetStatus() == TransferStatus::FAILED) + throw Error("AWS error: failed to upload 's3://%s/%s': %s", + bucketName, path, transferHandle->GetLastError().GetMessage()); + + if (transferHandle->GetStatus() != TransferStatus::COMPLETED) + throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", + bucketName, path); + + printTalkative("upload of '%s' completed", path); + auto now2 = std::chrono::steady_clock::now(); auto duration = @@ -349,6 +342,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore bucketName % path % data.size() % duration); stats.putTimeMs += duration; + stats.putBytes += data.size(); + stats.put++; } void upsertFile(const std::string & path, const std::string & data, @@ -364,24 +359,23 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore uploadFile(path, data, mimeType, ""); } - void getFile(const std::string & path, - std::function<void(std::shared_ptr<std::string>)> success, - std::function<void(std::exception_ptr exc)> failure) override + void getFile(const std::string & path, Sink & sink) override { - sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { - stats.get++; + stats.get++; - auto res = s3Helper.getObject(bucketName, path); + // FIXME: stream output to sink. + auto res = s3Helper.getObject(bucketName, path); - stats.getBytes += res.data ? res.data->size() : 0; - stats.getTimeMs += res.durationMs; + stats.getBytes += res.data ? res.data->size() : 0; + stats.getTimeMs += res.durationMs; - if (res.data) - printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms", - bucketName, path, res.data->size(), res.durationMs); + if (res.data) { + printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms", + bucketName, path, res.data->size(), res.durationMs); - return res.data; - }); + sink((unsigned char *) res.data->data(), res.data->size()); + } else + throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri()); } PathSet queryAllValidPaths() override diff --git a/src/libstore/s3.hh b/src/libstore/s3.hh index 4f996400343c..95d612b66335 100644 --- a/src/libstore/s3.hh +++ b/src/libstore/s3.hh @@ -14,9 +14,9 @@ struct S3Helper ref<Aws::Client::ClientConfiguration> config; ref<Aws::S3::S3Client> client; - S3Helper(const std::string & profile, const std::string & region); + S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint); - ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region); + ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region, const std::string & endpoint); struct DownloadResult { diff --git a/src/libstore/serve-protocol.hh b/src/libstore/serve-protocol.hh index f67d1e2580a5..9fae6d5349f1 100644 --- a/src/libstore/serve-protocol.hh +++ b/src/libstore/serve-protocol.hh @@ -5,7 +5,7 @@ namespace nix { #define SERVE_MAGIC_1 0x390c9deb #define SERVE_MAGIC_2 0x5452eecb -#define SERVE_PROTOCOL_VERSION 0x204 +#define SERVE_PROTOCOL_VERSION 0x205 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) @@ -18,6 +18,7 @@ typedef enum { cmdBuildPaths = 6, cmdQueryClosure = 7, cmdBuildDerivation = 8, + cmdAddToStoreNar = 9, } ServeCommand; } diff --git a/src/libstore/sqlite.cc b/src/libstore/sqlite.cc index 42d40e71d8be..a061d64f36d8 100644 --- a/src/libstore/sqlite.cc +++ b/src/libstore/sqlite.cc @@ -10,6 +10,7 @@ namespace nix { [[noreturn]] void throwSQLiteError(sqlite3 * db, const FormatOrString & fs) { int err = sqlite3_errcode(db); + int exterr = sqlite3_extended_errcode(db); auto path = sqlite3_db_filename(db, nullptr); if (!path) path = "(in-memory)"; @@ -21,7 +22,7 @@ namespace nix { : fmt("SQLite database '%s' is busy", path)); } else - throw SQLiteError("%s: %s (in '%s')", fs.s, sqlite3_errstr(err), path); + throw SQLiteError("%s: %s (in '%s')", fs.s, sqlite3_errstr(exterr), path); } SQLite::SQLite(const Path & path) diff --git a/src/libstore/ssh.cc b/src/libstore/ssh.cc index 033c580936ad..5e0e44935cca 100644 --- a/src/libstore/ssh.cc +++ b/src/libstore/ssh.cc @@ -4,8 +4,9 @@ namespace nix { SSHMaster::SSHMaster(const std::string & host, const std::string & keyFile, bool useMaster, bool compress, int logFD) : host(host) + , fakeSSH(host == "localhost") , keyFile(keyFile) - , useMaster(useMaster) + , useMaster(useMaster && !fakeSSH) , compress(compress) , logFD(logFD) { @@ -45,12 +46,19 @@ std::unique_ptr<SSHMaster::Connection> SSHMaster::startCommand(const std::string if (logFD != -1 && dup2(logFD, STDERR_FILENO) == -1) throw SysError("duping over stderr"); - Strings args = { "ssh", host.c_str(), "-x", "-a" }; - addCommonSSHOpts(args); - if (socketPath != "") - args.insert(args.end(), {"-S", socketPath}); - if (verbosity >= lvlChatty) - args.push_back("-v"); + Strings args; + + if (fakeSSH) { + args = { "bash", "-c" }; + } else { + args = { "ssh", host.c_str(), "-x", "-a" }; + addCommonSSHOpts(args); + if (socketPath != "") + args.insert(args.end(), {"-S", socketPath}); + if (verbosity >= lvlChatty) + args.push_back("-v"); + } + args.push_back(command); execvp(args.begin()->c_str(), stringsToCharPtrs(args).data()); diff --git a/src/libstore/ssh.hh b/src/libstore/ssh.hh index 1268e6d00054..4f0f0bd29f9f 100644 --- a/src/libstore/ssh.hh +++ b/src/libstore/ssh.hh @@ -10,6 +10,7 @@ class SSHMaster private: const std::string host; + bool fakeSSH; const std::string keyFile; const bool useMaster; const bool compress; diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 1a0d12ca78c2..1f42097fccfb 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -253,6 +253,8 @@ std::string Store::getUri() bool Store::isValidPath(const Path & storePath) { + assertStorePath(storePath); + auto hashPart = storePathToHash(storePath); { @@ -303,20 +305,20 @@ ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath) std::promise<ref<ValidPathInfo>> promise; queryPathInfo(storePath, - [&](ref<ValidPathInfo> info) { - promise.set_value(info); - }, - [&](std::exception_ptr exc) { - promise.set_exception(exc); - }); + {[&](std::future<ref<ValidPathInfo>> result) { + try { + promise.set_value(result.get()); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }}); return promise.get_future().get(); } void Store::queryPathInfo(const Path & storePath, - std::function<void(ref<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) + Callback<ref<ValidPathInfo>> callback) { auto hashPart = storePathToHash(storePath); @@ -328,7 +330,7 @@ void Store::queryPathInfo(const Path & storePath, stats.narInfoReadAverted++; if (!*res) throw InvalidPath(format("path '%s' is not valid") % storePath); - return success(ref<ValidPathInfo>(*res)); + return callback(ref<ValidPathInfo>(*res)); } } @@ -344,35 +346,36 @@ void Store::queryPathInfo(const Path & storePath, (res.second->path != storePath && storePathToName(storePath) != "")) throw InvalidPath(format("path '%s' is not valid") % storePath); } - return success(ref<ValidPathInfo>(res.second)); + return callback(ref<ValidPathInfo>(res.second)); } } - } catch (std::exception & e) { - return callFailure(failure); - } + } catch (...) { return callback.rethrow(); } queryPathInfoUncached(storePath, - [this, storePath, hashPart, success, failure](std::shared_ptr<ValidPathInfo> info) { + {[this, storePath, hashPart, callback](std::future<std::shared_ptr<ValidPathInfo>> fut) { - if (diskCache) - diskCache->upsertNarInfo(getUri(), hashPart, info); + try { + auto info = fut.get(); - { - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, info); - } + if (diskCache) + diskCache->upsertNarInfo(getUri(), hashPart, info); - if (!info - || (info->path != storePath && storePathToName(storePath) != "")) - { - stats.narInfoMissing++; - return failure(std::make_exception_ptr(InvalidPath(format("path '%s' is not valid") % storePath))); - } + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, info); + } - callSuccess(success, failure, ref<ValidPathInfo>(info)); + if (!info + || (info->path != storePath && storePathToName(storePath) != "")) + { + stats.narInfoMissing++; + throw InvalidPath("path '%s' is not valid", storePath); + } - }, failure); + callback(ref<ValidPathInfo>(info)); + } catch (...) { callback.rethrow(); } + }}); } @@ -392,26 +395,19 @@ PathSet Store::queryValidPaths(const PathSet & paths, SubstituteFlag maybeSubsti auto doQuery = [&](const Path & path ) { checkInterrupt(); - queryPathInfo(path, - [path, &state_, &wakeup](ref<ValidPathInfo> info) { - auto state(state_.lock()); + queryPathInfo(path, {[path, &state_, &wakeup](std::future<ref<ValidPathInfo>> fut) { + auto state(state_.lock()); + try { + auto info = fut.get(); state->valid.insert(path); - assert(state->left); - if (!--state->left) - wakeup.notify_one(); - }, - [path, &state_, &wakeup](std::exception_ptr exc) { - auto state(state_.lock()); - try { - std::rethrow_exception(exc); - } catch (InvalidPath &) { - } catch (...) { - state->exc = exc; - } - assert(state->left); - if (!--state->left) - wakeup.notify_one(); - }); + } catch (InvalidPath &) { + } catch (...) { + state->exc = std::current_exception(); + } + assert(state->left); + if (!--state->left) + wakeup.notify_one(); + }}); }; for (auto & path : paths) @@ -613,6 +609,8 @@ void copyStorePath(ref<Store> srcStore, ref<Store> dstStore, act.progress(total, info->narSize); }); srcStore->narFromPath({storePath}, wrapperSink); + }, [&]() { + throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri()); }); dstStore->addToStore(*info, *source, repair, checkSigs); @@ -633,11 +631,12 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const PathSet & storePa Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size())); std::atomic<size_t> nrDone{0}; + std::atomic<size_t> nrFailed{0}; std::atomic<uint64_t> bytesExpected{0}; std::atomic<uint64_t> nrRunning{0}; auto showProgress = [&]() { - act.progress(nrDone, missing.size(), nrRunning); + act.progress(nrDone, missing.size(), nrRunning, nrFailed); }; ThreadPool pool; @@ -666,7 +665,16 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const PathSet & storePa if (!dstStore->isValidPath(storePath)) { MaintainCount<decltype(nrRunning)> mc(nrRunning); showProgress(); - copyStorePath(srcStore, dstStore, storePath, repair, checkSigs); + try { + copyStorePath(srcStore, dstStore, storePath, repair, checkSigs); + } catch (Error &e) { + nrFailed++; + if (!settings.keepGoing) + throw e; + logger->log(lvlError, format("could not copy %s: %s") % storePath % e.what()); + showProgress(); + return; + } } nrDone++; @@ -838,8 +846,24 @@ ref<Store> openStore(const std::string & uri_, if (q != std::string::npos) { for (auto s : tokenizeString<Strings>(uri.substr(q + 1), "&")) { auto e = s.find('='); - if (e != std::string::npos) - params[s.substr(0, e)] = s.substr(e + 1); + if (e != std::string::npos) { + auto value = s.substr(e + 1); + std::string decoded; + for (size_t i = 0; i < value.size(); ) { + if (value[i] == '%') { + if (i + 2 >= value.size()) + throw Error("invalid URI parameter '%s'", value); + try { + decoded += std::stoul(std::string(value, i + 1, 2), 0, 16); + i += 3; + } catch (...) { + throw Error("invalid URI parameter '%s'", value); + } + } else + decoded += value[i++]; + } + params[s.substr(0, e)] = decoded; + } } uri = uri_.substr(0, q); } @@ -847,7 +871,7 @@ ref<Store> openStore(const std::string & uri_, for (auto fun : *RegisterStoreImplementation::implementations) { auto store = fun(uri, params); if (store) { - store->handleUnknownSettings(); + store->warnUnknownSettings(); return ref<Store>(store); } } diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index ea259f07e8ab..7c5b495a4482 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -22,6 +22,7 @@ MakeError(SubstError, Error) MakeError(BuildError, Error) /* denotes a permanent build failure */ MakeError(InvalidPath, Error) MakeError(Unsupported, Error) +MakeError(SubstituteGone, Error) struct BasicDerivation; @@ -355,14 +356,12 @@ public: /* Asynchronous version of queryPathInfo(). */ void queryPathInfo(const Path & path, - std::function<void(ref<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure); + Callback<ref<ValidPathInfo>> callback); protected: virtual void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) = 0; + Callback<std::shared_ptr<ValidPathInfo>> callback) = 0; public: diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 996e1d25355f..5ebdfaf134d6 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -6,7 +6,7 @@ namespace nix { #define WORKER_MAGIC_1 0x6e697863 #define WORKER_MAGIC_2 0x6478696f -#define PROTOCOL_VERSION 0x114 +#define PROTOCOL_VERSION 0x115 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) |