diff options
Diffstat (limited to 'src/libstore')
32 files changed, 924 insertions, 505 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 3e07a2aa2b60..25ad0d75b70a 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -97,7 +97,7 @@ void BinaryCacheStore::init() auto cacheInfo = getFile(cacheInfoFile); if (!cacheInfo) { - upsertFile(cacheInfoFile, "StoreDir: " + storeDir + "\n"); + upsertFile(cacheInfoFile, "StoreDir: " + storeDir + "\n", "text/x-nix-cache-info"); } else { for (auto & line : tokenizeString<Strings>(*cacheInfo, "\n")) { size_t colon = line.find(':'); @@ -224,7 +224,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, const ref<std::str } } - upsertFile(storePathToHash(info.path) + ".ls.xz", *compress("xz", jsonOut.str())); + upsertFile(storePathToHash(info.path) + ".ls", jsonOut.str(), "application/json"); } else { @@ -250,10 +250,11 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, const ref<std::str narInfo->url = "nar/" + printHash32(narInfo->fileHash) + ".nar" + (compression == "xz" ? ".xz" : compression == "bzip2" ? ".bz2" : + compression == "br" ? ".br" : ""); if (repair || !fileExists(narInfo->url)) { stats.narWrite++; - upsertFile(narInfo->url, *narCompressed); + upsertFile(narInfo->url, *narCompressed, "application/x-nix-nar"); } else stats.narWriteAverted++; @@ -264,7 +265,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, const ref<std::str /* Atomically write the NAR info file.*/ if (secretKey) narInfo->sign(*secretKey); - upsertFile(narInfoFile, narInfo->to_string()); + upsertFile(narInfoFile, narInfo->to_string(), "text/x-nix-narinfo"); auto hashPart = storePathToHash(narInfo->path); @@ -382,4 +383,28 @@ ref<FSAccessor> BinaryCacheStore::getFSAccessor() return make_ref<RemoteFSAccessor>(ref<Store>(shared_from_this())); } +std::shared_ptr<std::string> BinaryCacheStore::getBuildLog(const Path & path) +{ + Path drvPath; + + if (isDerivation(path)) + drvPath = path; + else { + try { + auto info = queryPathInfo(path); + // FIXME: add a "Log" field to .narinfo + if (info->deriver == "") return nullptr; + drvPath = info->deriver; + } catch (InvalidPath &) { + return nullptr; + } + } + + auto logPath = "log/" + baseNameOf(drvPath); + + debug("fetching build log from binary cache ‘%s/%s’", getUri(), logPath); + + return getFile(logPath); +} + } diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index a70d50d4949c..d42b1abd2455 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -31,7 +31,9 @@ public: virtual bool fileExists(const std::string & path) = 0; - virtual void upsertFile(const std::string & path, const std::string & data) = 0; + virtual void upsertFile(const std::string & path, + const std::string & data, + const std::string & mimeType) = 0; /* Return the contents of the specified file, or null if it doesn't exist. */ @@ -122,6 +124,8 @@ public: void addSignatures(const Path & storePath, const StringSet & sigs) override { notImpl(); } + std::shared_ptr<std::string> getBuildLog(const Path & path) override; + }; } diff --git a/src/libstore/build.cc b/src/libstore/build.cc index 5d6fff4e349f..fc840df81a56 100644 --- a/src/libstore/build.cc +++ b/src/libstore/build.cc @@ -1,5 +1,3 @@ -#include "config.h" - #include "references.hh" #include "pathlocks.hh" #include "globals.hh" @@ -644,7 +642,7 @@ HookInstance::~HookInstance() { try { toHook.writeSide = -1; - if (pid != -1) pid.kill(true); + if (pid != -1) pid.kill(); } catch (...) { ignoreException(); } @@ -1439,7 +1437,7 @@ void DerivationGoal::buildDone() to have terminated. In fact, the builder could also have simply have closed its end of the pipe, so just to be sure, kill it. */ - int status = hook ? hook->pid.kill(true) : pid.kill(true); + int status = hook ? hook->pid.kill() : pid.kill(); debug(format("builder process for ‘%1%’ finished") % drvPath); @@ -1582,36 +1580,48 @@ HookReply DerivationGoal::tryBuildHook() if (!worker.hook) worker.hook = std::make_unique<HookInstance>(); - /* Tell the hook about system features (beyond the system type) - required from the build machine. (The hook could parse the - drv file itself, but this is easier.) */ - Strings features = tokenizeString<Strings>(get(drv->env, "requiredSystemFeatures")); - for (auto & i : features) checkStoreName(i); /* !!! abuse */ - - /* Send the request to the hook. */ - writeLine(worker.hook->toHook.writeSide.get(), (format("%1% %2% %3% %4%") - % (worker.getNrLocalBuilds() < settings.maxBuildJobs ? "1" : "0") - % drv->platform % drvPath % concatStringsSep(",", features)).str()); + try { - /* Read the first line of input, which should be a word indicating - whether the hook wishes to perform the build. */ - string reply; - while (true) { - string s = readLine(worker.hook->fromHook.readSide.get()); - if (string(s, 0, 2) == "# ") { - reply = string(s, 2); - break; + /* Tell the hook about system features (beyond the system type) + required from the build machine. (The hook could parse the + drv file itself, but this is easier.) */ + Strings features = tokenizeString<Strings>(get(drv->env, "requiredSystemFeatures")); + for (auto & i : features) checkStoreName(i); /* !!! abuse */ + + /* Send the request to the hook. */ + writeLine(worker.hook->toHook.writeSide.get(), (format("%1% %2% %3% %4%") + % (worker.getNrLocalBuilds() < settings.maxBuildJobs ? "1" : "0") + % drv->platform % drvPath % concatStringsSep(",", features)).str()); + + /* Read the first line of input, which should be a word indicating + whether the hook wishes to perform the build. */ + string reply; + while (true) { + string s = readLine(worker.hook->fromHook.readSide.get()); + if (string(s, 0, 2) == "# ") { + reply = string(s, 2); + break; + } + s += "\n"; + writeToStderr(s); } - s += "\n"; - writeToStderr(s); - } - debug(format("hook reply is ‘%1%’") % reply); + debug(format("hook reply is ‘%1%’") % reply); - if (reply == "decline" || reply == "postpone") - return reply == "decline" ? rpDecline : rpPostpone; - else if (reply != "accept") - throw Error(format("bad hook reply ‘%1%’") % reply); + if (reply == "decline" || reply == "postpone") + return reply == "decline" ? rpDecline : rpPostpone; + else if (reply != "accept") + throw Error(format("bad hook reply ‘%1%’") % reply); + + } catch (SysError & e) { + if (e.errNo == EPIPE) { + printError("build hook died unexpectedly: %s", + chomp(drainFD(worker.hook->fromHook.readSide.get()))); + worker.hook = 0; + return rpDecline; + } else + throw; + } printMsg(lvlTalkative, format("using hook to build path(s) %1%") % showPaths(missingPaths)); @@ -2309,6 +2319,14 @@ void DerivationGoal::runChild() bool setUser = true; + /* Make the contents of netrc available to builtin:fetchurl + (which may run under a different uid and/or in a sandbox). */ + std::string netrcData; + try { + if (drv->isBuiltin() && drv->builder == "builtin:fetchurl") + netrcData = readFile(settings.netrcFile); + } catch (SysError &) { } + #if __linux__ if (useChroot) { @@ -2677,7 +2695,7 @@ void DerivationGoal::runChild() if (drv->isBuiltin()) { try { if (drv->builder == "builtin:fetchurl") - builtinFetchurl(*drv); + builtinFetchurl(*drv, netrcData); else throw Error(format("unsupported builtin function ‘%1%’") % string(drv->builder, 8)); _exit(0); @@ -2747,6 +2765,8 @@ void DerivationGoal::registerOutputs() Path path = i.second.path; if (missingPaths.find(path) == missingPaths.end()) continue; + ValidPathInfo info; + Path actualPath = path; if (useChroot) { actualPath = chrootRootDir + path; @@ -2849,6 +2869,8 @@ void DerivationGoal::registerOutputs() format("output path ‘%1%’ has %2% hash ‘%3%’ when ‘%4%’ was expected") % path % i.second.hashAlgo % printHash16or32(h2) % printHash16or32(h)); } + + info.ca = makeFixedOutputCA(recursive, h2); } /* Get rid of all weird permissions. This also checks that @@ -2948,7 +2970,6 @@ void DerivationGoal::registerOutputs() worker.markContentsGood(path); } - ValidPathInfo info; info.path = path; info.narHash = hash.first; info.narSize = hash.second; @@ -3027,9 +3048,6 @@ void DerivationGoal::registerOutputs() } -string drvsLogDir = "drvs"; - - Path DerivationGoal::openLogFile() { logSize = 0; @@ -3039,7 +3057,7 @@ Path DerivationGoal::openLogFile() string baseName = baseNameOf(drvPath); /* Create a log file. */ - Path dir = (format("%1%/%2%/%3%/") % worker.store.logDir % drvsLogDir % string(baseName, 0, 2)).str(); + Path dir = (format("%1%/%2%/%3%/") % worker.store.logDir % worker.store.drvsLogDir % string(baseName, 0, 2)).str(); createDirs(dir); Path logFileName = (format("%1%/%2%%3%") @@ -3074,7 +3092,9 @@ void DerivationGoal::closeLogFile() void DerivationGoal::deleteTmpDir(bool force) { if (tmpDir != "") { - if (settings.keepFailed && !force) { + /* Don't keep temporary directories for builtins because they + might have privileged stuff (like a copy of netrc). */ + if (settings.keepFailed && !force && !drv->isBuiltin()) { printError( format("note: keeping build directory ‘%2%’") % drvPath % tmpDir); diff --git a/src/libstore/builtins.cc b/src/libstore/builtins.cc index a30f30906f01..c5dbd57f8bc6 100644 --- a/src/libstore/builtins.cc +++ b/src/libstore/builtins.cc @@ -6,8 +6,16 @@ namespace nix { -void builtinFetchurl(const BasicDerivation & drv) +void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) { + /* Make the host's netrc data available. Too bad curl requires + this to be stored in a file. It would be nice if we could just + pass a pointer to the data. */ + if (netrcData != "") { + settings.netrcFile = "netrc"; + writeFile(settings.netrcFile, netrcData, 0600); + } + auto getAttr = [&](const string & name) { auto i = drv.env.find(name); if (i == drv.env.end()) throw Error(format("attribute ‘%s’ missing") % name); diff --git a/src/libstore/builtins.hh b/src/libstore/builtins.hh index 4b2431aa08cf..0cc6ba31f658 100644 --- a/src/libstore/builtins.hh +++ b/src/libstore/builtins.hh @@ -4,6 +4,6 @@ namespace nix { -void builtinFetchurl(const BasicDerivation & drv); +void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData); } diff --git a/src/libstore/crypto.cc b/src/libstore/crypto.cc index 747483afb30b..0fc86a1fe921 100644 --- a/src/libstore/crypto.cc +++ b/src/libstore/crypto.cc @@ -105,7 +105,9 @@ PublicKeys getDefaultPublicKeys() // FIXME: filter duplicates - for (auto s : settings.get("binary-cache-public-keys", Strings())) { + for (auto s : settings.get("binary-cache-public-keys", + Strings{"cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY="})) + { PublicKey key(s); publicKeys.emplace(key.name, key); } diff --git a/src/libstore/derivations.cc b/src/libstore/derivations.cc index 79526c594f71..0c6ceb9f6741 100644 --- a/src/libstore/derivations.cc +++ b/src/libstore/derivations.cc @@ -4,7 +4,7 @@ #include "util.hh" #include "worker-protocol.hh" #include "fs-accessor.hh" - +#include "istringstream_nocopy.hh" namespace nix { @@ -152,7 +152,7 @@ static StringSet parseStrings(std::istream & str, bool arePaths) static Derivation parseDerivation(const string & s) { Derivation drv; - std::istringstream str(s); + istringstream_nocopy str(s); expect(str, "Derive(["); /* Parse the list of outputs. */ @@ -397,8 +397,8 @@ PathSet BasicDerivation::outputPaths() const Source & readDerivation(Source & in, Store & store, BasicDerivation & drv) { drv.outputs.clear(); - auto nr = readInt(in); - for (unsigned int n = 0; n < nr; n++) { + auto nr = readNum<size_t>(in); + for (size_t n = 0; n < nr; n++) { auto name = readString(in); DerivationOutput o; in >> o.path >> o.hashAlgo >> o.hash; @@ -410,8 +410,8 @@ Source & readDerivation(Source & in, Store & store, BasicDerivation & drv) in >> drv.platform >> drv.builder; drv.args = readStrings<Strings>(in); - nr = readInt(in); - for (unsigned int n = 0; n < nr; n++) { + nr = readNum<size_t>(in); + for (size_t n = 0; n < nr; n++) { auto key = readString(in); auto value = readString(in); drv.env[key] = value; diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 074e0ca6642a..22bde086e6a2 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -4,6 +4,12 @@ #include "hash.hh" #include "store-api.hh" #include "archive.hh" +#include "s3.hh" +#include "compression.hh" + +#ifdef ENABLE_S3 +#include <aws/core/client/ClientConfiguration.h> +#endif #include <unistd.h> #include <fcntl.h> @@ -33,6 +39,16 @@ std::string resolveUri(const std::string & uri) return uri; } +ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data) +{ + if (encoding == "") + return data; + else if (encoding == "br") + return decompress(encoding, *data); + else + throw Error("unsupported Content-Encoding ‘%s’", encoding); +} + struct CurlDownloader : public Downloader { CURLM * curlm = 0; @@ -66,6 +82,8 @@ struct CurlDownloader : public Downloader struct curl_slist * requestHeaders = 0; + std::string encoding; + DownloadItem(CurlDownloader & downloader, const DownloadRequest & request) : downloader(downloader), request(request) { @@ -123,6 +141,7 @@ struct CurlDownloader : public Downloader auto ss = tokenizeString<vector<string>>(line, " "); status = ss.size() >= 2 ? ss[1] : ""; result.data = std::make_shared<std::string>(); + encoding = ""; } else { auto i = line.find(':'); if (i != string::npos) { @@ -138,7 +157,8 @@ struct CurlDownloader : public Downloader debug(format("shutting down on 200 HTTP response with expected ETag")); return 0; } - } + } else if (name == "content-encoding") + encoding = trim(string(line, i + 1));; } } return realSize; @@ -200,7 +220,7 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str()); curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1); - curl_easy_setopt(req, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str()); + curl_easy_setopt(req, CURLOPT_USERAGENT, ("curl/" LIBCURL_VERSION " Nix/" + nixVersion).c_str()); #if LIBCURL_VERSION_NUM >= 0x072b00 curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1); #endif @@ -223,13 +243,17 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_NOBODY, 1); if (request.verifyTLS) - curl_easy_setopt(req, CURLOPT_CAINFO, - getEnv("NIX_SSL_CERT_FILE", getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt")).c_str()); + curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.c_str()); else { curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0); curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0); } + /* If no file exist in the specified path, curl continues to work + anyway as if netrc support was disabled. */ + curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.c_str()); + curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL); + result.data = std::make_shared<std::string>(); } @@ -260,14 +284,26 @@ struct CurlDownloader : public Downloader { result.cached = httpStatus == 304; done = true; - callSuccess(success, failure, const_cast<const DownloadResult &>(result)); + + try { + result.data = decodeContent(encoding, ref<std::string>(result.data)); + callSuccess(success, failure, const_cast<const DownloadResult &>(result)); + } catch (...) { + done = true; + callFailure(failure, std::current_exception()); + } } else { Error err = (httpStatus == 404 || code == CURLE_FILE_COULDNT_READ_FILE) ? NotFound : httpStatus == 403 ? Forbidden : (httpStatus == 408 || httpStatus == 500 || httpStatus == 503 || httpStatus == 504 || httpStatus == 522 || httpStatus == 524 - || code == CURLE_COULDNT_RESOLVE_HOST) ? Transient : + || code == CURLE_COULDNT_RESOLVE_HOST + || code == CURLE_RECV_ERROR +#if LIBCURL_VERSION_NUM >= 0x073200 + || code == CURLE_HTTP2_STREAM +#endif + ) ? Transient : Misc; attempt++; @@ -480,6 +516,31 @@ struct CurlDownloader : public Downloader std::function<void(const DownloadResult &)> success, std::function<void(std::exception_ptr exc)> failure) override { + /* Ugly hack to support s3:// URIs. */ + if (hasPrefix(request.uri, "s3://")) { + // FIXME: do this on a worker thread + sync2async<DownloadResult>(success, failure, [&]() -> DownloadResult { +#ifdef ENABLE_S3 + S3Helper s3Helper(Aws::Region::US_EAST_1); // FIXME: make configurable + auto slash = request.uri.find('/', 5); + if (slash == std::string::npos) + throw nix::Error("bad S3 URI ‘%s’", request.uri); + std::string bucketName(request.uri, 5, slash - 5); + std::string key(request.uri, slash + 1); + // FIXME: implement ETag + auto s3Res = s3Helper.getObject(bucketName, key); + DownloadResult res; + if (!s3Res.data) + throw DownloadError(NotFound, fmt("S3 object ‘%s’ does not exist", request.uri)); + res.data = s3Res.data; + return res; +#else + throw nix::Error("cannot download ‘%s’ because Nix is not built with S3 support", request.uri); +#endif + }); + return; + } + auto item = std::make_shared<DownloadItem>(*this, request); item->success = success; item->failure = failure; @@ -581,6 +642,7 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa Hash hash = hashString(expectedHash ? expectedHash.type : htSHA256, *res.data); info.path = store->makeFixedOutputPath(false, hash, name); info.narHash = hashString(htSHA256, *sink.s); + info.ca = makeFixedOutputCA(false, hash); store->addToStore(info, sink.s, false, true); storePath = info.path; } @@ -609,7 +671,7 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa Path tmpDir = createTempDir(); AutoDelete autoDelete(tmpDir, true); // FIXME: this requires GNU tar for decompression. - runProgram("tar", true, {"xf", storePath, "-C", tmpDir, "--strip-components", "1"}, ""); + runProgram("tar", true, {"xf", storePath, "-C", tmpDir, "--strip-components", "1"}); unpackedStorePath = store->addToStore(name, tmpDir, true, htSHA256, defaultPathFilter, false); } replaceSymlink(unpackedStorePath, unpackedLink); @@ -629,7 +691,7 @@ bool isUri(const string & s) size_t pos = s.find("://"); if (pos == string::npos) return false; string scheme(s, 0, pos); - return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git"; + return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3"; } diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 82b5d641fde9..e2e16b361036 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -23,7 +23,7 @@ struct DownloadRequest struct DownloadResult { - bool cached; + bool cached = false; std::string etag; std::string effectiveUrl; std::shared_ptr<std::string> data; @@ -73,4 +73,7 @@ public: bool isUri(const string & s); +/* Decode data according to the Content-Encoding header. */ +ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data); + } diff --git a/src/libstore/export-import.cc b/src/libstore/export-import.cc index c5618c826c54..2b8ab063e18e 100644 --- a/src/libstore/export-import.cc +++ b/src/libstore/export-import.cc @@ -61,39 +61,17 @@ void Store::exportPath(const Path & path, Sink & sink) hashAndWriteSink << exportMagic << path << info->references << info->deriver << 0; } -struct TeeSource : Source -{ - Source & readSource; - ref<std::string> data; - TeeSource(Source & readSource) - : readSource(readSource) - , data(make_ref<std::string>()) - { - } - size_t read(unsigned char * data, size_t len) - { - size_t n = readSource.read(data, len); - this->data->append((char *) data, n); - return n; - } -}; - -struct NopSink : ParseSink -{ -}; - Paths Store::importPaths(Source & source, std::shared_ptr<FSAccessor> accessor, bool dontCheckSigs) { Paths res; while (true) { - unsigned long long n = readLongLong(source); + auto n = readNum<uint64_t>(source); if (n == 0) break; if (n != 1) throw Error("input doesn't look like something created by ‘nix-store --export’"); /* Extract the NAR from the source. */ - TeeSource tee(source); - NopSink sink; - parseDump(sink, tee); + TeeSink tee(source); + parseDump(tee, tee.source); uint32_t magic = readInt(source); if (magic != exportMagic) @@ -110,14 +88,14 @@ Paths Store::importPaths(Source & source, std::shared_ptr<FSAccessor> accessor, info.deriver = readString(source); if (info.deriver != "") assertStorePath(info.deriver); - info.narHash = hashString(htSHA256, *tee.data); - info.narSize = tee.data->size(); + info.narHash = hashString(htSHA256, *tee.source.data); + info.narSize = tee.source.data->size(); // Ignore optional legacy signature. if (readInt(source) == 1) readString(source); - addToStore(info, tee.data, false, dontCheckSigs, accessor); + addToStore(info, tee.source.data, false, dontCheckSigs, accessor); res.push_back(info.path); } diff --git a/src/libstore/globals.cc b/src/libstore/globals.cc index 00b468892529..012b3d5b8b98 100644 --- a/src/libstore/globals.cc +++ b/src/libstore/globals.cc @@ -1,12 +1,10 @@ -#include "config.h" - #include "globals.hh" #include "util.hh" #include "archive.hh" -#include <map> #include <algorithm> -#include <unistd.h> +#include <map> +#include <thread> namespace nix { @@ -25,15 +23,26 @@ Settings settings; Settings::Settings() { + nixPrefix = NIX_PREFIX; + nixStore = canonPath(getEnv("NIX_STORE_DIR", getEnv("NIX_STORE", NIX_STORE_DIR))); + nixDataDir = canonPath(getEnv("NIX_DATA_DIR", NIX_DATA_DIR)); + nixLogDir = canonPath(getEnv("NIX_LOG_DIR", NIX_LOG_DIR)); + nixStateDir = canonPath(getEnv("NIX_STATE_DIR", NIX_STATE_DIR)); + nixConfDir = canonPath(getEnv("NIX_CONF_DIR", NIX_CONF_DIR)); + nixLibexecDir = canonPath(getEnv("NIX_LIBEXEC_DIR", NIX_LIBEXEC_DIR)); + nixBinDir = canonPath(getEnv("NIX_BIN_DIR", NIX_BIN_DIR)); + nixDaemonSocketFile = canonPath(nixStateDir + DEFAULT_SOCKET_PATH); + + // should be set with the other config options, but depends on nixLibexecDir +#ifdef __APPLE__ + preBuildHook = nixLibexecDir + "/nix/resolve-system-dependencies"; +#endif + keepFailed = false; keepGoing = false; tryFallback = false; maxBuildJobs = 1; - buildCores = 1; -#ifdef _SC_NPROCESSORS_ONLN - long res = sysconf(_SC_NPROCESSORS_ONLN); - if (res > 0) buildCores = res; -#endif + buildCores = std::max(1U, std::thread::hardware_concurrency()); readOnlyMode = false; thisSystem = SYSTEM; maxSilentTime = 0; @@ -59,25 +68,9 @@ Settings::Settings() lockCPU = getEnv("NIX_AFFINITY_HACK", "1") == "1"; showTrace = false; enableImportNative = false; -} - - -void Settings::processEnvironment() -{ - nixPrefix = NIX_PREFIX; - nixStore = canonPath(getEnv("NIX_STORE_DIR", getEnv("NIX_STORE", NIX_STORE_DIR))); - nixDataDir = canonPath(getEnv("NIX_DATA_DIR", NIX_DATA_DIR)); - nixLogDir = canonPath(getEnv("NIX_LOG_DIR", NIX_LOG_DIR)); - nixStateDir = canonPath(getEnv("NIX_STATE_DIR", NIX_STATE_DIR)); - nixConfDir = canonPath(getEnv("NIX_CONF_DIR", NIX_CONF_DIR)); - nixLibexecDir = canonPath(getEnv("NIX_LIBEXEC_DIR", NIX_LIBEXEC_DIR)); - nixBinDir = canonPath(getEnv("NIX_BIN_DIR", NIX_BIN_DIR)); - nixDaemonSocketFile = canonPath(nixStateDir + DEFAULT_SOCKET_PATH); - - // should be set with the other config options, but depends on nixLibexecDir -#ifdef __APPLE__ - preBuildHook = nixLibexecDir + "/nix/resolve-system-dependencies"; -#endif + netrcFile = fmt("%s/%s", nixConfDir, "netrc"); + caFile = getEnv("NIX_SSL_CERT_FILE", getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt")); + enableImportFromDerivation = true; } @@ -156,7 +149,14 @@ int Settings::get(const string & name, int def) void Settings::update() { _get(tryFallback, "build-fallback"); - _get(maxBuildJobs, "build-max-jobs"); + + auto s = get("build-max-jobs", std::string("1")); + if (s == "auto") + maxBuildJobs = std::max(1U, std::thread::hardware_concurrency()); + else + if (!string2Int(s, maxBuildJobs)) + throw Error("configuration setting ‘build-max-jobs’ should be ‘auto’ or an integer"); + _get(buildCores, "build-cores"); _get(thisSystem, "system"); _get(maxSilentTime, "build-max-silent-time"); @@ -179,12 +179,13 @@ void Settings::update() _get(envKeepDerivations, "env-keep-derivations"); _get(sshSubstituterHosts, "ssh-substituter-hosts"); _get(useSshSubstituter, "use-ssh-substituter"); - _get(logServers, "log-servers"); _get(enableImportNative, "allow-unsafe-native-code-during-evaluation"); _get(useCaseHack, "use-case-hack"); _get(preBuildHook, "pre-build-hook"); _get(keepGoing, "keep-going"); _get(keepFailed, "keep-failed"); + _get(netrcFile, "netrc-file"); + _get(enableImportFromDerivation, "allow-import-from-derivation"); } diff --git a/src/libstore/globals.hh b/src/libstore/globals.hh index a423b4e5c0f4..462721681912 100644 --- a/src/libstore/globals.hh +++ b/src/libstore/globals.hh @@ -16,8 +16,6 @@ struct Settings { Settings(); - void processEnvironment(); - void loadConfFile(); void set(const string & name, const string & value); @@ -183,9 +181,6 @@ struct Settings { /* Whether to show a stack trace if Nix evaluation fails. */ bool showTrace; - /* A list of URL prefixes that can return Nix build logs. */ - Strings logServers; - /* Whether the importNative primop should be enabled */ bool enableImportNative; @@ -193,6 +188,16 @@ struct Settings { build settings */ Path preBuildHook; + /* Path to the netrc file used to obtain usernames/passwords for + downloads. */ + Path netrcFile; + + /* Path to the SSL CA file used */ + Path caFile; + + /* Whether we allow import-from-derivation */ + bool enableImportFromDerivation; + private: SettingsMap settings, overrides; diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 9d31f77c921f..37a7d6ace142 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -64,7 +64,9 @@ protected: } } - void upsertFile(const std::string & path, const std::string & data) override + void upsertFile(const std::string & path, + const std::string & data, + const std::string & mimeType) override { throw UploadToHTTP("uploading to an HTTP binary cache is not supported"); } diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc index 5d9e5aad6e0a..0e838846c794 100644 --- a/src/libstore/legacy-ssh-store.cc +++ b/src/libstore/legacy-ssh-store.cc @@ -4,80 +4,50 @@ #include "serve-protocol.hh" #include "store-api.hh" #include "worker-protocol.hh" +#include "ssh.hh" namespace nix { -static std::string uriScheme = "legacy-ssh://"; +static std::string uriScheme = "ssh://"; struct LegacySSHStore : public Store { - string host; - struct Connection { - Pid sshPid; - AutoCloseFD out; - AutoCloseFD in; + std::unique_ptr<SSHMaster::Connection> sshConn; FdSink to; FdSource from; }; - AutoDelete tmpDir; - - Path socketPath; - - Pid sshMaster; + std::string host; ref<Pool<Connection>> connections; - Path key; + SSHMaster master; - LegacySSHStore(const string & host, const Params & params, - size_t maxConnections = std::numeric_limits<size_t>::max()) + LegacySSHStore(const string & host, const Params & params) : Store(params) , host(host) - , tmpDir(createTempDir("", "nix", true, true, 0700)) - , socketPath((Path) tmpDir + "/ssh.sock") , connections(make_ref<Pool<Connection>>( - maxConnections, + std::max(1, std::stoi(get(params, "max-connections", "1"))), [this]() { return openConnection(); }, [](const ref<Connection> & r) { return true; } )) - , key(get(params, "ssh-key", "")) + , master( + host, + get(params, "ssh-key", ""), + // Use SSH master only if using more than 1 connection. + connections->capacity() > 1, + get(params, "compress", "") == "true") { } ref<Connection> openConnection() { - if ((pid_t) sshMaster == -1) { - sshMaster = startProcess([&]() { - restoreSignals(); - Strings args{ "ssh", "-M", "-S", socketPath, "-N", "-x", "-a", host }; - if (!key.empty()) - args.insert(args.end(), {"-i", key}); - execvp("ssh", stringsToCharPtrs(args).data()); - throw SysError("starting SSH master connection to host ‘%s’", host); - }); - } - auto conn = make_ref<Connection>(); - Pipe in, out; - in.create(); - out.create(); - conn->sshPid = startProcess([&]() { - if (dup2(in.readSide.get(), STDIN_FILENO) == -1) - throw SysError("duping over STDIN"); - if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1) - throw SysError("duping over STDOUT"); - execlp("ssh", "ssh", "-S", socketPath.c_str(), host.c_str(), "nix-store", "--serve", "--write", nullptr); - throw SysError("executing ‘nix-store --serve’ on remote host ‘%s’", host); - }); - in.readSide = -1; - out.writeSide = -1; - conn->out = std::move(out.readSide); - conn->in = std::move(in.writeSide); - conn->to = FdSink(conn->in.get()); - conn->from = FdSource(conn->out.get()); + conn->sshConn = master.startCommand("nix-store --serve --write"); + conn->to = FdSink(conn->sshConn->in.get()); + conn->from = FdSource(conn->sshConn->out.get()); int remoteVersion; @@ -169,9 +139,9 @@ struct LegacySSHStore : public Store /* FIXME: inefficient. */ ParseSink parseSink; /* null sink; just parse the NAR */ - SavingSourceAdapter savedNAR(conn->from); + TeeSource savedNAR(conn->from); parseDump(parseSink, savedNAR); - sink(savedNAR.s); + sink(*savedNAR.data); } /* Unsupported methods. */ @@ -225,7 +195,7 @@ struct LegacySSHStore : public Store void collectGarbage(const GCOptions & options, GCResults & results) override { unsupported(); } - ref<FSAccessor> getFSAccessor() + ref<FSAccessor> getFSAccessor() override { unsupported(); } void addSignatures(const Path & storePath, const StringSet & sigs) override @@ -234,6 +204,41 @@ struct LegacySSHStore : public Store bool isTrusted() override { return true; } + void computeFSClosure(const PathSet & paths, + PathSet & out, bool flipDirection = false, + bool includeOutputs = false, bool includeDerivers = false) override + { + if (flipDirection || includeDerivers) { + Store::computeFSClosure(paths, out, flipDirection, includeOutputs, includeDerivers); + return; + } + + auto conn(connections->get()); + + conn->to + << cmdQueryClosure + << includeOutputs + << paths; + conn->to.flush(); + + auto res = readStorePaths<PathSet>(*this, conn->from); + + out.insert(res.begin(), res.end()); + } + + PathSet queryValidPaths(const PathSet & paths, bool maybeSubstitute = false) override + { + auto conn(connections->get()); + + conn->to + << cmdQueryValidPaths + << false // lock + << maybeSubstitute + << paths; + conn->to.flush(); + + return readStorePaths<PathSet>(*this, conn->from); + } }; static RegisterStoreImplementation regStore([]( diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 0f377989bd89..aff22f9fcc22 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -30,7 +30,9 @@ protected: bool fileExists(const std::string & path) override; - void upsertFile(const std::string & path, const std::string & data) override; + void upsertFile(const std::string & path, + const std::string & data, + const std::string & mimeType) override; void getFile(const std::string & path, std::function<void(std::shared_ptr<std::string>)> success, @@ -83,7 +85,9 @@ bool LocalBinaryCacheStore::fileExists(const std::string & path) return pathExists(binaryCacheDir + "/" + path); } -void LocalBinaryCacheStore::upsertFile(const std::string & path, const std::string & data) +void LocalBinaryCacheStore::upsertFile(const std::string & path, + const std::string & data, + const std::string & mimeType) { atomicWrite(binaryCacheDir + "/" + path, data); } diff --git a/src/libstore/local-fs-store.cc b/src/libstore/local-fs-store.cc index 4571a2211cd2..57e1b8a09fe6 100644 --- a/src/libstore/local-fs-store.cc +++ b/src/libstore/local-fs-store.cc @@ -2,6 +2,8 @@ #include "fs-accessor.hh" #include "store-api.hh" #include "globals.hh" +#include "compression.hh" +#include "derivations.hh" namespace nix { @@ -84,4 +86,46 @@ void LocalFSStore::narFromPath(const Path & path, Sink & sink) dumpPath(getRealStoreDir() + std::string(path, storeDir.size()), sink); } +const string LocalFSStore::drvsLogDir = "drvs"; + +std::shared_ptr<std::string> LocalFSStore::getBuildLog(const Path & path_) +{ + auto path(path_); + + assertStorePath(path); + + + if (!isDerivation(path)) { + try { + path = queryPathInfo(path)->deriver; + } catch (InvalidPath &) { + return nullptr; + } + if (path == "") return nullptr; + } + + string baseName = baseNameOf(path); + + for (int j = 0; j < 2; j++) { + + Path logPath = + j == 0 + ? (format("%1%/%2%/%3%/%4%") % logDir % drvsLogDir % string(baseName, 0, 2) % string(baseName, 2)).str() + : (format("%1%/%2%/%3%") % logDir % drvsLogDir % baseName).str(); + Path logBz2Path = logPath + ".bz2"; + + if (pathExists(logPath)) + return std::make_shared<std::string>(readFile(logPath)); + + else if (pathExists(logBz2Path)) { + try { + return decompress("bzip2", readFile(logBz2Path)); + } catch (Error &) { } + } + + } + + return nullptr; +} + } diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc index 612efde7bb8f..8610841d7229 100644 --- a/src/libstore/local-store.cc +++ b/src/libstore/local-store.cc @@ -1,4 +1,3 @@ -#include "config.h" #include "local-store.hh" #include "globals.hh" #include "archive.hh" @@ -45,7 +44,7 @@ LocalStore::LocalStore(const Params & params) , reservedPath(dbDir + "/reserved") , schemaPath(dbDir + "/schema") , trashDir(realStoreDir + "/trash") - , requireSigs(trim(settings.get("signed-binary-caches", std::string(""))) != "") // FIXME: rename option + , requireSigs(trim(settings.get("signed-binary-caches", std::string("*"))) != "") // FIXME: rename option , publicKeys(getDefaultPublicKeys()) { auto state(_state.lock()); @@ -520,6 +519,8 @@ void LocalStore::checkDerivationOutputs(const Path & drvPath, const Derivation & uint64_t LocalStore::addValidPath(State & state, const ValidPathInfo & info, bool checkOutputs) { + assert(info.ca == "" || info.isContentAddressed(*this)); + state.stmtRegisterValidPath.use() (info.path) ("sha256:" + printHash(info.narHash)) @@ -668,7 +669,7 @@ bool LocalStore::isValidPathUncached(const Path & path) } -PathSet LocalStore::queryValidPaths(const PathSet & paths) +PathSet LocalStore::queryValidPaths(const PathSet & paths, bool maybeSubstitute) { PathSet res; for (auto & i : paths) @@ -919,7 +920,7 @@ void LocalStore::addToStore(const ValidPathInfo & info, const ref<std::string> & info.path % info.narHash.to_string() % h.to_string()); if (requireSigs && !dontCheckSigs && !info.checkSignatures(*this, publicKeys)) - throw Error(format("cannot import path ‘%s’ because it lacks a valid signature") % info.path); + throw Error("cannot add path ‘%s’ because it lacks a valid signature", info.path); addTempRoot(info.path); @@ -1003,7 +1004,7 @@ Path LocalStore::addToStoreFromDump(const string & dump, const string & name, info.narHash = hash.first; info.narSize = hash.second; info.ultimate = true; - info.ca = "fixed:" + (recursive ? (std::string) "r:" : "") + h.to_string(); + info.ca = makeFixedOutputCA(recursive, h); registerValidPath(info); } diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh index 511209d8404a..28e9a31c9feb 100644 --- a/src/libstore/local-store.hh +++ b/src/libstore/local-store.hh @@ -21,9 +21,6 @@ namespace nix { const int nixSchemaVersion = 10; -extern string drvsLogDir; - - struct Derivation; @@ -102,7 +99,7 @@ public: bool isValidPathUncached(const Path & path) override; - PathSet queryValidPaths(const PathSet & paths) override; + PathSet queryValidPaths(const PathSet & paths, bool maybeSubstitute = false) override; PathSet queryAllValidPaths() override; diff --git a/src/libstore/nar-info-disk-cache.cc b/src/libstore/nar-info-disk-cache.cc index 13b67b81f35e..180a936edb85 100644 --- a/src/libstore/nar-info-disk-cache.cc +++ b/src/libstore/nar-info-disk-cache.cc @@ -106,25 +106,27 @@ public: "select * from NARs where cache = ? and hashPart = ? and ((present = 0 and timestamp > ?) or (present = 1 and timestamp > ?))"); /* Periodically purge expired entries from the database. */ - auto now = time(0); - - SQLiteStmt queryLastPurge(state->db, "select value from LastPurge"); - auto queryLastPurge_(queryLastPurge.use()); - - if (!queryLastPurge_.next() || queryLastPurge_.getInt(0) < now - purgeInterval) { - SQLiteStmt(state->db, - "delete from NARs where ((present = 0 and timestamp < ?) or (present = 1 and timestamp < ?))") - .use() - (now - ttlNegative) - (now - ttlPositive) - .exec(); - - debug("deleted %d entries from the NAR info disk cache", sqlite3_changes(state->db)); - - SQLiteStmt(state->db, - "insert or replace into LastPurge(dummy, value) values ('', ?)") - .use()(now).exec(); - } + retrySQLite<void>([&]() { + auto now = time(0); + + SQLiteStmt queryLastPurge(state->db, "select value from LastPurge"); + auto queryLastPurge_(queryLastPurge.use()); + + if (!queryLastPurge_.next() || queryLastPurge_.getInt(0) < now - purgeInterval) { + SQLiteStmt(state->db, + "delete from NARs where ((present = 0 and timestamp < ?) or (present = 1 and timestamp < ?))") + .use() + (now - ttlNegative) + (now - ttlPositive) + .exec(); + + debug("deleted %d entries from the NAR info disk cache", sqlite3_changes(state->db)); + + SQLiteStmt(state->db, + "insert or replace into LastPurge(dummy, value) values ('', ?)") + .use()(now).exec(); + } + }); } Cache & getCache(State & state, const std::string & uri) @@ -136,114 +138,123 @@ public: void createCache(const std::string & uri, const Path & storeDir, bool wantMassQuery, int priority) override { - auto state(_state.lock()); + retrySQLite<void>([&]() { + auto state(_state.lock()); - // FIXME: race + // FIXME: race - state->insertCache.use()(uri)(time(0))(storeDir)(wantMassQuery)(priority).exec(); - assert(sqlite3_changes(state->db) == 1); - state->caches[uri] = Cache{(int) sqlite3_last_insert_rowid(state->db), storeDir, wantMassQuery, priority}; + state->insertCache.use()(uri)(time(0))(storeDir)(wantMassQuery)(priority).exec(); + assert(sqlite3_changes(state->db) == 1); + state->caches[uri] = Cache{(int) sqlite3_last_insert_rowid(state->db), storeDir, wantMassQuery, priority}; + }); } bool cacheExists(const std::string & uri, bool & wantMassQuery, int & priority) override { - auto state(_state.lock()); + return retrySQLite<bool>([&]() { + auto state(_state.lock()); - auto i = state->caches.find(uri); - if (i == state->caches.end()) { - auto queryCache(state->queryCache.use()(uri)); - if (!queryCache.next()) return false; - state->caches.emplace(uri, - Cache{(int) queryCache.getInt(0), queryCache.getStr(1), queryCache.getInt(2) != 0, (int) queryCache.getInt(3)}); - } + auto i = state->caches.find(uri); + if (i == state->caches.end()) { + auto queryCache(state->queryCache.use()(uri)); + if (!queryCache.next()) return false; + state->caches.emplace(uri, + Cache{(int) queryCache.getInt(0), queryCache.getStr(1), queryCache.getInt(2) != 0, (int) queryCache.getInt(3)}); + } - auto & cache(getCache(*state, uri)); + auto & cache(getCache(*state, uri)); - wantMassQuery = cache.wantMassQuery; - priority = cache.priority; + wantMassQuery = cache.wantMassQuery; + priority = cache.priority; - return true; + return true; + }); } std::pair<Outcome, std::shared_ptr<NarInfo>> lookupNarInfo( const std::string & uri, const std::string & hashPart) override { - auto state(_state.lock()); + return retrySQLite<std::pair<Outcome, std::shared_ptr<NarInfo>>>( + [&]() -> std::pair<Outcome, std::shared_ptr<NarInfo>> { + auto state(_state.lock()); + + auto & cache(getCache(*state, uri)); - auto & cache(getCache(*state, uri)); - - auto now = time(0); - - auto queryNAR(state->queryNAR.use() - (cache.id) - (hashPart) - (now - ttlNegative) - (now - ttlPositive)); - - if (!queryNAR.next()) - return {oUnknown, 0}; - - if (!queryNAR.getInt(13)) - return {oInvalid, 0}; - - auto narInfo = make_ref<NarInfo>(); - - auto namePart = queryNAR.getStr(2); - narInfo->path = cache.storeDir + "/" + - hashPart + (namePart.empty() ? "" : "-" + namePart); - narInfo->url = queryNAR.getStr(3); - narInfo->compression = queryNAR.getStr(4); - if (!queryNAR.isNull(5)) - narInfo->fileHash = parseHash(queryNAR.getStr(5)); - narInfo->fileSize = queryNAR.getInt(6); - narInfo->narHash = parseHash(queryNAR.getStr(7)); - narInfo->narSize = queryNAR.getInt(8); - for (auto & r : tokenizeString<Strings>(queryNAR.getStr(9), " ")) - narInfo->references.insert(cache.storeDir + "/" + r); - if (!queryNAR.isNull(10)) - narInfo->deriver = cache.storeDir + "/" + queryNAR.getStr(10); - for (auto & sig : tokenizeString<Strings>(queryNAR.getStr(11), " ")) - narInfo->sigs.insert(sig); - - return {oValid, narInfo}; + auto now = time(0); + + auto queryNAR(state->queryNAR.use() + (cache.id) + (hashPart) + (now - ttlNegative) + (now - ttlPositive)); + + if (!queryNAR.next()) + return {oUnknown, 0}; + + if (!queryNAR.getInt(13)) + return {oInvalid, 0}; + + auto narInfo = make_ref<NarInfo>(); + + auto namePart = queryNAR.getStr(2); + narInfo->path = cache.storeDir + "/" + + hashPart + (namePart.empty() ? "" : "-" + namePart); + narInfo->url = queryNAR.getStr(3); + narInfo->compression = queryNAR.getStr(4); + if (!queryNAR.isNull(5)) + narInfo->fileHash = parseHash(queryNAR.getStr(5)); + narInfo->fileSize = queryNAR.getInt(6); + narInfo->narHash = parseHash(queryNAR.getStr(7)); + narInfo->narSize = queryNAR.getInt(8); + for (auto & r : tokenizeString<Strings>(queryNAR.getStr(9), " ")) + narInfo->references.insert(cache.storeDir + "/" + r); + if (!queryNAR.isNull(10)) + narInfo->deriver = cache.storeDir + "/" + queryNAR.getStr(10); + for (auto & sig : tokenizeString<Strings>(queryNAR.getStr(11), " ")) + narInfo->sigs.insert(sig); + + return {oValid, narInfo}; + }); } void upsertNarInfo( const std::string & uri, const std::string & hashPart, std::shared_ptr<ValidPathInfo> info) override { - auto state(_state.lock()); - - auto & cache(getCache(*state, uri)); - - if (info) { - - auto narInfo = std::dynamic_pointer_cast<NarInfo>(info); - - assert(hashPart == storePathToHash(info->path)); - - state->insertNAR.use() - (cache.id) - (hashPart) - (storePathToName(info->path)) - (narInfo ? narInfo->url : "", narInfo != 0) - (narInfo ? narInfo->compression : "", narInfo != 0) - (narInfo && narInfo->fileHash ? narInfo->fileHash.to_string() : "", narInfo && narInfo->fileHash) - (narInfo ? narInfo->fileSize : 0, narInfo != 0 && narInfo->fileSize) - (info->narHash.to_string()) - (info->narSize) - (concatStringsSep(" ", info->shortRefs())) - (info->deriver != "" ? baseNameOf(info->deriver) : "", info->deriver != "") - (concatStringsSep(" ", info->sigs)) - (time(0)).exec(); - - } else { - state->insertMissingNAR.use() - (cache.id) - (hashPart) - (time(0)).exec(); - } + retrySQLite<void>([&]() { + auto state(_state.lock()); + + auto & cache(getCache(*state, uri)); + + if (info) { + + auto narInfo = std::dynamic_pointer_cast<NarInfo>(info); + + assert(hashPart == storePathToHash(info->path)); + + state->insertNAR.use() + (cache.id) + (hashPart) + (storePathToName(info->path)) + (narInfo ? narInfo->url : "", narInfo != 0) + (narInfo ? narInfo->compression : "", narInfo != 0) + (narInfo && narInfo->fileHash ? narInfo->fileHash.to_string() : "", narInfo && narInfo->fileHash) + (narInfo ? narInfo->fileSize : 0, narInfo != 0 && narInfo->fileSize) + (info->narHash.to_string()) + (info->narSize) + (concatStringsSep(" ", info->shortRefs())) + (info->deriver != "" ? baseNameOf(info->deriver) : "", info->deriver != "") + (concatStringsSep(" ", info->sigs)) + (time(0)).exec(); + + } else { + state->insertMissingNAR.use() + (cache.id) + (hashPart) + (time(0)).exec(); + } + }); } }; diff --git a/src/libstore/nar-info.cc b/src/libstore/nar-info.cc index 201cac671a55..d1042c6de25e 100644 --- a/src/libstore/nar-info.cc +++ b/src/libstore/nar-info.cc @@ -59,9 +59,11 @@ NarInfo::NarInfo(const Store & store, const std::string & s, const std::string & } } else if (name == "Deriver") { - auto p = store.storeDir + "/" + value; - if (!store.isStorePath(p)) corrupt(); - deriver = p; + if (value != "unknown-deriver") { + auto p = store.storeDir + "/" + value; + if (!store.isStorePath(p)) corrupt(); + deriver = p; + } } else if (name == "System") system = value; diff --git a/src/libstore/optimise-store.cc b/src/libstore/optimise-store.cc index b71c7e905ff1..cf234e35d373 100644 --- a/src/libstore/optimise-store.cc +++ b/src/libstore/optimise-store.cc @@ -1,5 +1,3 @@ -#include "config.h" - #include "util.hh" #include "local-store.hh" #include "globals.hh" diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 42c09ec7e0b6..a1f2db5b0ec8 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -40,21 +40,34 @@ template PathSet readStorePaths(Store & store, Source & from); template Paths readStorePaths(Store & store, Source & from); /* TODO: Separate these store impls into different files, give them better names */ -RemoteStore::RemoteStore(const Params & params, size_t maxConnections) +RemoteStore::RemoteStore(const Params & params) : Store(params) , connections(make_ref<Pool<Connection>>( - maxConnections, - [this]() { return openConnection(); }, + std::max(1, std::stoi(get(params, "max-connections", "1"))), + [this]() { return openConnectionWrapper(); }, [](const ref<Connection> & r) { return r->to.good() && r->from.good(); } )) { } -UDSRemoteStore::UDSRemoteStore(const Params & params, size_t maxConnections) +ref<RemoteStore::Connection> RemoteStore::openConnectionWrapper() +{ + if (failed) + throw Error("opening a connection to remote store ‘%s’ previously failed", getUri()); + try { + return openConnection(); + } catch (...) { + failed = true; + throw; + } +} + + +UDSRemoteStore::UDSRemoteStore(const Params & params) : Store(params) , LocalFSStore(params) - , RemoteStore(params, maxConnections) + , RemoteStore(params) { } @@ -108,7 +121,7 @@ void RemoteStore::initConnection(Connection & conn) unsigned int magic = readInt(conn.from); if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch"); - conn.daemonVersion = readInt(conn.from); + conn.from >> conn.daemonVersion; if (GET_PROTOCOL_MAJOR(conn.daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION)) throw Error("Nix daemon protocol version not supported"); if (GET_PROTOCOL_MINOR(conn.daemonVersion) < 10) @@ -129,7 +142,7 @@ void RemoteStore::initConnection(Connection & conn) conn.processStderr(); } catch (Error & e) { - throw Error(format("cannot start daemon worker: %1%") % e.msg()); + throw Error("cannot open connection to remote store ‘%s’: %s", getUri(), e.what()); } setOptions(conn); @@ -170,12 +183,11 @@ bool RemoteStore::isValidPathUncached(const Path & path) auto conn(connections->get()); conn->to << wopIsValidPath << path; conn->processStderr(); - unsigned int reply = readInt(conn->from); - return reply != 0; + return readInt(conn->from); } -PathSet RemoteStore::queryValidPaths(const PathSet & paths) +PathSet RemoteStore::queryValidPaths(const PathSet & paths, bool maybeSubstitute) { auto conn(connections->get()); if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { @@ -246,8 +258,8 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, conn->to << wopQuerySubstitutablePathInfos << paths; conn->processStderr(); - unsigned int count = readInt(conn->from); - for (unsigned int n = 0; n < count; n++) { + size_t count = readNum<size_t>(conn->from); + for (size_t n = 0; n < count; n++) { Path path = readStorePath(*this, conn->from); SubstitutablePathInfo & info(infos[path]); info.deriver = readString(conn->from); @@ -277,7 +289,7 @@ void RemoteStore::queryPathInfoUncached(const Path & path, throw; } if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { - bool valid = readInt(conn->from) != 0; + bool valid; conn->from >> valid; if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path); } auto info = std::make_shared<ValidPathInfo>(); @@ -286,12 +298,11 @@ void RemoteStore::queryPathInfoUncached(const Path & path, if (info->deriver != "") assertStorePath(info->deriver); info->narHash = parseHash(htSHA256, readString(conn->from)); info->references = readStorePaths<PathSet>(*this, conn->from); - info->registrationTime = readInt(conn->from); - info->narSize = readLongLong(conn->from); + conn->from >> info->registrationTime >> info->narSize; if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { - info->ultimate = readInt(conn->from) != 0; + conn->from >> info->ultimate; info->sigs = readStrings<StringSet>(conn->from); - info->ca = readString(conn->from); + conn->from >> info->ca; } return info; }); @@ -380,8 +391,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, const ref<std::string> conn->to << wopAddToStoreNar << info.path << info.deriver << printHash(info.narHash) << info.references << info.registrationTime << info.narSize - << info.ultimate << info.sigs << *nar << repair << dontCheckSigs; - // FIXME: don't send nar as a string + << info.ultimate << info.sigs << info.ca + << repair << dontCheckSigs; + conn->to(*nar); conn->processStderr(); } } @@ -515,7 +527,7 @@ Roots RemoteStore::findRoots() auto conn(connections->get()); conn->to << wopFindRoots; conn->processStderr(); - unsigned int count = readInt(conn->from); + size_t count = readNum<size_t>(conn->from); Roots result; while (count--) { Path link = readString(conn->from); @@ -563,7 +575,7 @@ bool RemoteStore::verifyStore(bool checkContents, bool repair) auto conn(connections->get()); conn->to << wopVerifyStore << checkContents << repair; conn->processStderr(); - return readInt(conn->from) != 0; + return readInt(conn->from); } @@ -599,7 +611,7 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source) } else if (msg == STDERR_READ) { if (!source) throw Error("no source"); - size_t len = readInt(from); + size_t len = readNum<size_t>(from); auto buf = std::make_unique<unsigned char[]>(len); writeString(buf.get(), source->read(buf.get(), len), to); to.flush(); diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index 40f17da300d0..a08bd305639d 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -22,13 +22,13 @@ class RemoteStore : public virtual Store { public: - RemoteStore(const Params & params, size_t maxConnections = std::numeric_limits<size_t>::max()); + RemoteStore(const Params & params); /* Implementations of abstract store API methods. */ bool isValidPathUncached(const Path & path) override; - PathSet queryValidPaths(const PathSet & paths) override; + PathSet queryValidPaths(const PathSet & paths, bool maybeSubstitute = false) override; PathSet queryAllValidPaths() override; @@ -98,6 +98,8 @@ protected: void processStderr(Sink * sink = 0, Source * source = 0); }; + ref<Connection> openConnectionWrapper(); + virtual ref<Connection> openConnection() = 0; void initConnection(Connection & conn); @@ -106,6 +108,8 @@ protected: private: + std::atomic_bool failed{false}; + void setOptions(Connection & conn); }; @@ -113,7 +117,7 @@ class UDSRemoteStore : public LocalFSStore, public RemoteStore { public: - UDSRemoteStore(const Params & params, size_t maxConnections = std::numeric_limits<size_t>::max()); + UDSRemoteStore(const Params & params); std::string getUri() override; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index ccb71f1eefe5..3053f908c4e2 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -1,15 +1,17 @@ -#include "config.h" - #if ENABLE_S3 -#if __linux__ +#include "s3.hh" #include "s3-binary-cache-store.hh" #include "nar-info.hh" #include "nar-info-disk-cache.hh" #include "globals.hh" +#include "compression.hh" +#include "download.hh" +#include "istringstream_nocopy.hh" #include <aws/core/Aws.h> #include <aws/core/client/ClientConfiguration.h> +#include <aws/core/client/DefaultRetryStrategy.h> #include <aws/s3/S3Client.h> #include <aws/s3/model/CreateBucketRequest.h> #include <aws/s3/model/GetBucketLocationRequest.h> @@ -20,15 +22,6 @@ namespace nix { -struct istringstream_nocopy : public std::stringstream -{ - istringstream_nocopy(const std::string & s) - { - rdbuf()->pubsetbuf( - (char *) s.data(), s.size()); - } -}; - struct S3Error : public Error { Aws::S3::S3Errors err; @@ -62,21 +55,92 @@ static void initAWS() }); } +S3Helper::S3Helper(const string & region) + : config(makeConfig(region)) + , client(make_ref<Aws::S3::S3Client>(*config)) +{ +} + +/* Log AWS retries. */ +class RetryStrategy : public Aws::Client::DefaultRetryStrategy +{ + long CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override + { + auto res = Aws::Client::DefaultRetryStrategy::CalculateDelayBeforeNextRetry(error, attemptedRetries); + printError("AWS error '%s' (%s), will retry in %d ms", + error.GetExceptionName(), error.GetMessage(), res); + return res; + } +}; + +ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region) +{ + initAWS(); + auto res = make_ref<Aws::Client::ClientConfiguration>(); + res->region = region; + res->requestTimeoutMs = 600 * 1000; + res->retryStrategy = std::make_shared<RetryStrategy>(); + res->caFile = settings.caFile; + return res; +} + +S3Helper::DownloadResult S3Helper::getObject( + const std::string & bucketName, const std::string & key) +{ + debug("fetching ‘s3://%s/%s’...", bucketName, key); + + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(key); + + request.SetResponseStreamFactory([&]() { + return Aws::New<std::stringstream>("STRINGSTREAM"); + }); + + DownloadResult res; + + auto now1 = std::chrono::steady_clock::now(); + + try { + + auto result = checkAws(fmt("AWS error fetching ‘%s’", key), + client->GetObject(request)); + + res.data = decodeContent( + result.GetContentEncoding(), + make_ref<std::string>( + dynamic_cast<std::stringstream &>(result.GetBody()).str())); + + } catch (S3Error & e) { + if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw; + } + + auto now2 = std::chrono::steady_clock::now(); + + res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + + return res; +} + struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { std::string bucketName; - ref<Aws::Client::ClientConfiguration> config; - ref<Aws::S3::S3Client> client; - Stats stats; + S3Helper s3Helper; + + std::string narinfoCompression, lsCompression, logCompression; + S3BinaryCacheStoreImpl( const Params & params, const std::string & bucketName) : S3BinaryCacheStore(params) , bucketName(bucketName) - , config(makeConfig()) - , client(make_ref<Aws::S3::S3Client>(*config)) + , s3Helper(get(params, "aws-region", Aws::Region::US_EAST_1)) + , narinfoCompression(get(params, "narinfo-compression", "")) + , lsCompression(get(params, "ls-compression", "")) + , logCompression(get(params, "log-compression", "")) { diskCache = getNarInfoDiskCache(); } @@ -86,15 +150,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore return "s3://" + bucketName; } - ref<Aws::Client::ClientConfiguration> makeConfig() - { - initAWS(); - auto res = make_ref<Aws::Client::ClientConfiguration>(); - res->region = Aws::Region::US_EAST_1; // FIXME: make configurable - res->requestTimeoutMs = 600 * 1000; - return res; - } - void init() override { if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) { @@ -102,7 +157,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore /* Create the bucket if it doesn't already exists. */ // FIXME: HeadBucket would be more appropriate, but doesn't return // an easily parsed 404 message. - auto res = client->GetBucketLocation( + auto res = s3Helper.client->GetBucketLocation( Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName)); if (!res.IsSuccess()) { @@ -110,7 +165,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage()); checkAws(format("AWS error creating bucket ‘%s’") % bucketName, - client->CreateBucket( + s3Helper.client->CreateBucket( Aws::S3::Model::CreateBucketRequest() .WithBucket(bucketName) .WithCreateBucketConfiguration( @@ -148,7 +203,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { stats.head++; - auto res = client->HeadObject( + auto res = s3Helper.client->HeadObject( Aws::S3::Model::HeadObjectRequest() .WithBucket(bucketName) .WithKey(path)); @@ -164,13 +219,20 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore return true; } - void upsertFile(const std::string & path, const std::string & data) override + void uploadFile(const std::string & path, const std::string & data, + const std::string & mimeType, + const std::string & contentEncoding) { auto request = Aws::S3::Model::PutObjectRequest() .WithBucket(bucketName) .WithKey(path); + request.SetContentType(mimeType); + + if (contentEncoding != "") + request.SetContentEncoding(contentEncoding); + auto stream = std::make_shared<istringstream_nocopy>(data); request.SetBody(stream); @@ -181,7 +243,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore auto now1 = std::chrono::steady_clock::now(); auto result = checkAws(format("AWS error uploading ‘%s’") % path, - client->PutObject(request)); + s3Helper.client->PutObject(request)); auto now2 = std::chrono::steady_clock::now(); @@ -193,6 +255,19 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore stats.putTimeMs += duration; } + void upsertFile(const std::string & path, const std::string & data, + const std::string & mimeType) override + { + if (narinfoCompression != "" && hasSuffix(path, ".narinfo")) + uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression); + else if (lsCompression != "" && hasSuffix(path, ".ls")) + uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression); + else if (logCompression != "" && hasPrefix(path, "log/")) + uploadFile(path, *compress(logCompression, data), mimeType, logCompression); + else + uploadFile(path, data, mimeType, ""); + } + void getFile(const std::string & path, std::function<void(std::shared_ptr<std::string>)> success, std::function<void(std::exception_ptr exc)> failure) override @@ -200,42 +275,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); - auto request = - Aws::S3::Model::GetObjectRequest() - .WithBucket(bucketName) - .WithKey(path); - - request.SetResponseStreamFactory([&]() { - return Aws::New<std::stringstream>("STRINGSTREAM"); - }); - stats.get++; - try { - - auto now1 = std::chrono::steady_clock::now(); + auto res = s3Helper.getObject(bucketName, path); - auto result = checkAws(format("AWS error fetching ‘%s’") % path, - client->GetObject(request)); + stats.getBytes += res.data ? res.data->size() : 0; + stats.getTimeMs += res.durationMs; - auto now2 = std::chrono::steady_clock::now(); + if (res.data) + printTalkative("downloaded ‘s3://%s/%s’ (%d bytes) in %d ms", + bucketName, path, res.data->size(), res.durationMs); - auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); - - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); - - printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % res.size() % duration); - - stats.getBytes += res.size(); - stats.getTimeMs += duration; - - return std::make_shared<std::string>(res); - - } catch (S3Error & e) { - if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>(); - throw; - } + return res.data; }); } @@ -248,7 +299,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore debug(format("listing bucket ‘s3://%s’ from key ‘%s’...") % bucketName % marker); auto res = checkAws(format("AWS error listing bucket ‘%s’") % bucketName, - client->ListObjects( + s3Helper.client->ListObjects( Aws::S3::Model::ListObjectsRequest() .WithBucket(bucketName) .WithDelimiter("/") @@ -286,4 +337,3 @@ static RegisterStoreImplementation regStore([]( } #endif -#endif diff --git a/src/libstore/s3.hh b/src/libstore/s3.hh new file mode 100644 index 000000000000..08a7fbf96e98 --- /dev/null +++ b/src/libstore/s3.hh @@ -0,0 +1,33 @@ +#pragma once + +#if ENABLE_S3 + +#include "ref.hh" + +namespace Aws { namespace Client { class ClientConfiguration; } } +namespace Aws { namespace S3 { class S3Client; } } + +namespace nix { + +struct S3Helper +{ + ref<Aws::Client::ClientConfiguration> config; + ref<Aws::S3::S3Client> client; + + S3Helper(const std::string & region); + + ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region); + + struct DownloadResult + { + std::shared_ptr<std::string> data; + unsigned int durationMs; + }; + + DownloadResult getObject( + const std::string & bucketName, const std::string & key); +}; + +} + +#endif diff --git a/src/libstore/sqlite.cc b/src/libstore/sqlite.cc index 0197b091cd12..a81e62dbd6eb 100644 --- a/src/libstore/sqlite.cc +++ b/src/libstore/sqlite.cc @@ -3,36 +3,25 @@ #include <sqlite3.h> +#include <atomic> + namespace nix { [[noreturn]] void throwSQLiteError(sqlite3 * db, const format & f) { int err = sqlite3_errcode(db); + + auto path = sqlite3_db_filename(db, nullptr); + if (!path) path = "(in-memory)"; + if (err == SQLITE_BUSY || err == SQLITE_PROTOCOL) { - if (err == SQLITE_PROTOCOL) - printError("warning: SQLite database is busy (SQLITE_PROTOCOL)"); - else { - static bool warned = false; - if (!warned) { - printError("warning: SQLite database is busy"); - warned = true; - } - } - /* Sleep for a while since retrying the transaction right away - is likely to fail again. */ - checkInterrupt(); -#if HAVE_NANOSLEEP - struct timespec t; - t.tv_sec = 0; - t.tv_nsec = (random() % 100) * 1000 * 1000; /* <= 0.1s */ - nanosleep(&t, 0); -#else - sleep(1); -#endif - throw SQLiteBusy(format("%1%: %2%") % f.str() % sqlite3_errmsg(db)); + throw SQLiteBusy( + err == SQLITE_PROTOCOL + ? fmt("SQLite database ‘%s’ is busy (SQLITE_PROTOCOL)", path) + : fmt("SQLite database ‘%s’ is busy", path)); } else - throw SQLiteError(format("%1%: %2%") % f.str() % sqlite3_errmsg(db)); + throw SQLiteError("%s: %s (in ‘%s’)", f.str(), sqlite3_errstr(err), path); } SQLite::SQLite(const Path & path) @@ -54,24 +43,27 @@ SQLite::~SQLite() void SQLite::exec(const std::string & stmt) { - if (sqlite3_exec(db, stmt.c_str(), 0, 0, 0) != SQLITE_OK) - throwSQLiteError(db, format("executing SQLite statement ‘%s’") % stmt); + retrySQLite<void>([&]() { + if (sqlite3_exec(db, stmt.c_str(), 0, 0, 0) != SQLITE_OK) + throwSQLiteError(db, format("executing SQLite statement ‘%s’") % stmt); + }); } -void SQLiteStmt::create(sqlite3 * db, const string & s) +void SQLiteStmt::create(sqlite3 * db, const string & sql) { checkInterrupt(); assert(!stmt); - if (sqlite3_prepare_v2(db, s.c_str(), -1, &stmt, 0) != SQLITE_OK) - throwSQLiteError(db, "creating statement"); + if (sqlite3_prepare_v2(db, sql.c_str(), -1, &stmt, 0) != SQLITE_OK) + throwSQLiteError(db, fmt("creating statement ‘%s’", sql)); this->db = db; + this->sql = sql; } SQLiteStmt::~SQLiteStmt() { try { if (stmt && sqlite3_finalize(stmt) != SQLITE_OK) - throwSQLiteError(db, "finalizing statement"); + throwSQLiteError(db, fmt("finalizing statement ‘%s’", sql)); } catch (...) { ignoreException(); } @@ -128,14 +120,14 @@ void SQLiteStmt::Use::exec() int r = step(); assert(r != SQLITE_ROW); if (r != SQLITE_DONE) - throwSQLiteError(stmt.db, "executing SQLite statement"); + throwSQLiteError(stmt.db, fmt("executing SQLite statement ‘%s’", stmt.sql)); } bool SQLiteStmt::Use::next() { int r = step(); if (r != SQLITE_DONE && r != SQLITE_ROW) - throwSQLiteError(stmt.db, "executing SQLite query"); + throwSQLiteError(stmt.db, fmt("executing SQLite query ‘%s’", stmt.sql)); return r == SQLITE_ROW; } @@ -182,4 +174,24 @@ SQLiteTxn::~SQLiteTxn() } } +void handleSQLiteBusy(const SQLiteBusy & e) +{ + static std::atomic<time_t> lastWarned{0}; + + time_t now = time(0); + + if (now > lastWarned + 10) { + lastWarned = now; + printError("warning: %s", e.what()); + } + + /* Sleep for a while since retrying the transaction right away + is likely to fail again. */ + checkInterrupt(); + struct timespec t; + t.tv_sec = 0; + t.tv_nsec = (random() % 100) * 1000 * 1000; /* <= 0.1s */ + nanosleep(&t, 0); +} + } diff --git a/src/libstore/sqlite.hh b/src/libstore/sqlite.hh index 4d347a2e56ab..14a7a0dd8996 100644 --- a/src/libstore/sqlite.hh +++ b/src/libstore/sqlite.hh @@ -30,8 +30,9 @@ struct SQLiteStmt { sqlite3 * db = 0; sqlite3_stmt * stmt = 0; + std::string sql; SQLiteStmt() { } - SQLiteStmt(sqlite3 * db, const std::string & s) { create(db, s); } + SQLiteStmt(sqlite3 * db, const std::string & sql) { create(db, sql); } void create(sqlite3 * db, const std::string & s); ~SQLiteStmt(); operator sqlite3_stmt * () { return stmt; } @@ -94,6 +95,8 @@ MakeError(SQLiteBusy, SQLiteError); [[noreturn]] void throwSQLiteError(sqlite3 * db, const format & f); +void handleSQLiteBusy(const SQLiteBusy & e); + /* Convenience function for retrying a SQLite transaction when the database is busy. */ template<typename T> @@ -103,6 +106,7 @@ T retrySQLite(std::function<T()> fun) try { return fun(); } catch (SQLiteBusy & e) { + handleSQLiteBusy(e); } } } diff --git a/src/libstore/ssh-store.cc b/src/libstore/ssh-store.cc index 6f1862afa899..2a81a8b1ebe5 100644 --- a/src/libstore/ssh-store.cc +++ b/src/libstore/ssh-store.cc @@ -4,18 +4,33 @@ #include "archive.hh" #include "worker-protocol.hh" #include "pool.hh" +#include "ssh.hh" namespace nix { -static std::string uriScheme = "ssh://"; +static std::string uriScheme = "ssh-ng://"; class SSHStore : public RemoteStore { public: - SSHStore(string host, const Params & params, size_t maxConnections = std::numeric_limits<size_t>::max()); + SSHStore(const std::string & host, const Params & params) + : Store(params) + , RemoteStore(params) + , host(host) + , master( + host, + get(params, "ssh-key", ""), + // Use SSH master only if using more than 1 connection. + connections->capacity() > 1, + get(params, "compress", "") == "true") + { + } - std::string getUri() override; + std::string getUri() override + { + return uriScheme + host; + } void narFromPath(const Path & path, Sink & sink) override; @@ -25,43 +40,16 @@ private: struct Connection : RemoteStore::Connection { - Pid sshPid; - AutoCloseFD out; - AutoCloseFD in; + std::unique_ptr<SSHMaster::Connection> sshConn; }; ref<RemoteStore::Connection> openConnection() override; - AutoDelete tmpDir; - - Path socketPath; - - Pid sshMaster; - - string host; - - Path key; + std::string host; - bool compress; + SSHMaster master; }; -SSHStore::SSHStore(string host, const Params & params, size_t maxConnections) - : Store(params) - , RemoteStore(params, maxConnections) - , tmpDir(createTempDir("", "nix", true, true, 0700)) - , socketPath((Path) tmpDir + "/ssh.sock") - , host(std::move(host)) - , key(get(params, "ssh-key", "")) - , compress(get(params, "compress", "") == "true") -{ - /* open a connection and perform the handshake to verify all is well */ - connections->get(); -} - -string SSHStore::getUri() -{ - return uriScheme + host; -} class ForwardSource : public Source { @@ -94,35 +82,10 @@ ref<FSAccessor> SSHStore::getFSAccessor() ref<RemoteStore::Connection> SSHStore::openConnection() { - if ((pid_t) sshMaster == -1) { - sshMaster = startProcess([&]() { - restoreSignals(); - if (key.empty()) - execlp("ssh", "ssh", "-N", "-M", "-S", socketPath.c_str(), host.c_str(), NULL); - else - execlp("ssh", "ssh", "-N", "-M", "-S", socketPath.c_str(), "-i", key.c_str(), host.c_str(), NULL); - throw SysError("starting ssh master"); - }); - } - auto conn = make_ref<Connection>(); - Pipe in, out; - in.create(); - out.create(); - conn->sshPid = startProcess([&]() { - if (dup2(in.readSide.get(), STDIN_FILENO) == -1) - throw SysError("duping over STDIN"); - if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1) - throw SysError("duping over STDOUT"); - execlp("ssh", "ssh", "-S", socketPath.c_str(), host.c_str(), "nix-daemon", "--stdio", NULL); - throw SysError("executing nix-daemon --stdio over ssh"); - }); - in.readSide = -1; - out.writeSide = -1; - conn->out = std::move(out.readSide); - conn->in = std::move(in.writeSide); - conn->to = FdSink(conn->in.get()); - conn->from = FdSource(conn->out.get()); + conn->sshConn = master.startCommand("nix-daemon --stdio"); + conn->to = FdSink(conn->sshConn->in.get()); + conn->from = FdSource(conn->sshConn->out.get()); initConnection(*conn); return conn; } diff --git a/src/libstore/ssh.cc b/src/libstore/ssh.cc new file mode 100644 index 000000000000..e54f3f4ba284 --- /dev/null +++ b/src/libstore/ssh.cc @@ -0,0 +1,102 @@ +#include "ssh.hh" + +namespace nix { + +void SSHMaster::addCommonSSHOpts(Strings & args) +{ + for (auto & i : tokenizeString<Strings>(getEnv("NIX_SSHOPTS"))) + args.push_back(i); + if (!keyFile.empty()) + args.insert(args.end(), {"-i", keyFile}); + if (compress) + args.push_back("-C"); +} + +std::unique_ptr<SSHMaster::Connection> SSHMaster::startCommand(const std::string & command) +{ + Path socketPath = startMaster(); + + Pipe in, out; + in.create(); + out.create(); + + auto conn = std::make_unique<Connection>(); + conn->sshPid = startProcess([&]() { + restoreSignals(); + + close(in.writeSide.get()); + close(out.readSide.get()); + + if (dup2(in.readSide.get(), STDIN_FILENO) == -1) + throw SysError("duping over stdin"); + if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1) + throw SysError("duping over stdout"); + + Strings args = { "ssh", host.c_str(), "-x", "-a" }; + addCommonSSHOpts(args); + if (socketPath != "") + args.insert(args.end(), {"-S", socketPath}); + args.push_back(command); + execvp(args.begin()->c_str(), stringsToCharPtrs(args).data()); + + throw SysError("executing ‘%s’ on ‘%s’", command, host); + }); + + + in.readSide = -1; + out.writeSide = -1; + + conn->out = std::move(out.readSide); + conn->in = std::move(in.writeSide); + + return conn; +} + +Path SSHMaster::startMaster() +{ + if (!useMaster) return ""; + + auto state(state_.lock()); + + if (state->sshMaster != -1) return state->socketPath; + + state->tmpDir = std::make_unique<AutoDelete>(createTempDir("", "nix", true, true, 0700)); + + state->socketPath = (Path) *state->tmpDir + "/ssh.sock"; + + Pipe out; + out.create(); + + state->sshMaster = startProcess([&]() { + restoreSignals(); + + close(out.readSide.get()); + + if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1) + throw SysError("duping over stdout"); + + Strings args = + { "ssh", host.c_str(), "-M", "-N", "-S", state->socketPath + , "-o", "LocalCommand=echo started" + , "-o", "PermitLocalCommand=yes" + }; + addCommonSSHOpts(args); + execvp(args.begin()->c_str(), stringsToCharPtrs(args).data()); + + throw SysError("starting SSH master"); + }); + + out.writeSide = -1; + + std::string reply; + try { + reply = readLine(out.readSide.get()); + } catch (EndOfFile & e) { } + + if (reply != "started") + throw Error("failed to start SSH master connection to ‘%s’", host); + + return state->socketPath; +} + +} diff --git a/src/libstore/ssh.hh b/src/libstore/ssh.hh new file mode 100644 index 000000000000..b4396467e54e --- /dev/null +++ b/src/libstore/ssh.hh @@ -0,0 +1,49 @@ +#pragma once + +#include "util.hh" +#include "sync.hh" + +namespace nix { + +class SSHMaster +{ +private: + + const std::string host; + const std::string keyFile; + const bool useMaster; + const bool compress; + + struct State + { + Pid sshMaster; + std::unique_ptr<AutoDelete> tmpDir; + Path socketPath; + }; + + Sync<State> state_; + + void addCommonSSHOpts(Strings & args); + +public: + + SSHMaster(const std::string & host, const std::string & keyFile, bool useMaster, bool compress) + : host(host) + , keyFile(keyFile) + , useMaster(useMaster) + , compress(compress) + { + } + + struct Connection + { + Pid sshPid; + AutoCloseFD out, in; + }; + + std::unique_ptr<Connection> startCommand(const std::string & command); + + Path startMaster(); +}; + +} diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index b5934a0d1232..441166d04d8f 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -377,7 +377,7 @@ void Store::queryPathInfo(const Path & storePath, } -PathSet Store::queryValidPaths(const PathSet & paths) +PathSet Store::queryValidPaths(const PathSet & paths, bool maybeSubstitute) { struct State { @@ -550,6 +550,8 @@ void copyClosure(ref<Store> srcStore, ref<Store> dstStore, for (auto & path : storePaths) srcStore->computeFSClosure(path, closure); + // FIXME: use copyStorePaths() + PathSet valid = dstStore->queryValidPaths(closure); if (valid.size() == closure.size()) return; @@ -676,6 +678,12 @@ Strings ValidPathInfo::shortRefs() const } +std::string makeFixedOutputCA(bool recursive, const Hash & hash) +{ + return "fixed:" + (recursive ? (std::string) "r:" : "") + hash.to_string(); +} + + } @@ -702,7 +710,11 @@ ref<Store> openStore(const std::string & uri_) } uri = uri_.substr(0, q); } + return openStore(uri, params); +} +ref<Store> openStore(const std::string & uri, const Store::Params & params) +{ for (auto fun : *RegisterStoreImplementation::implementations) { auto store = fun(uri, params); if (store) return ref<Store>(store); @@ -766,10 +778,11 @@ std::list<ref<Store>> getDefaultSubstituters() state->stores.push_back(openStore(uri)); }; - for (auto uri : settings.get("substituters", Strings())) - addStore(uri); + Strings defaultSubstituters; + if (settings.nixStore == "/nix/store") + defaultSubstituters.push_back("https://cache.nixos.org/"); - for (auto uri : settings.get("binary-caches", Strings())) + for (auto uri : settings.get("substituters", settings.get("binary-caches", defaultSubstituters))) addStore(uri); for (auto uri : settings.get("extra-binary-caches", Strings())) @@ -781,37 +794,25 @@ std::list<ref<Store>> getDefaultSubstituters() } -void copyPaths(ref<Store> from, ref<Store> to, const Paths & storePaths, bool substitute) -{ - if (substitute) { - /* Filter out .drv files (we don't want to build anything). */ - PathSet paths2; - for (auto & path : storePaths) - if (!isDerivation(path)) paths2.insert(path); - unsigned long long downloadSize, narSize; - PathSet willBuild, willSubstitute, unknown; - to->queryMissing(PathSet(paths2.begin(), paths2.end()), - willBuild, willSubstitute, unknown, downloadSize, narSize); - /* FIXME: should use ensurePath(), but it only - does one path at a time. */ - if (!willSubstitute.empty()) - try { - to->buildPaths(willSubstitute); - } catch (Error & e) { - printMsg(lvlError, format("warning: %1%") % e.msg()); - } - } +void copyPaths(ref<Store> from, ref<Store> to, const PathSet & storePaths, bool substitute) +{ + PathSet valid = to->queryValidPaths(storePaths, substitute); + + PathSet missing; + for (auto & path : storePaths) + if (!valid.count(path)) missing.insert(path); std::string copiedLabel = "copied"; - logger->setExpected(copiedLabel, storePaths.size()); + logger->setExpected(copiedLabel, missing.size()); ThreadPool pool; processGraph<Path>(pool, - PathSet(storePaths.begin(), storePaths.end()), + PathSet(missing.begin(), missing.end()), [&](const Path & storePath) { + if (to->isValidPath(storePath)) return PathSet(); return from->queryPathInfo(storePath)->references; }, diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index d03e70849f93..98f2803f8136 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -128,7 +128,7 @@ struct ValidPathInfo of an output path of a derivation were actually produced by that derivation. In the intensional model, we have to trust that a particular output path was produced by a derivation; the - path name then implies the contents.) + path then implies the contents.) Ideally, the content-addressability assertion would just be a Boolean, and the store path would be computed from @@ -324,8 +324,10 @@ protected: public: - /* Query which of the given paths is valid. */ - virtual PathSet queryValidPaths(const PathSet & paths); + /* Query which of the given paths is valid. Optionally, try to + substitute missing paths. */ + virtual PathSet queryValidPaths(const PathSet & paths, + bool maybeSubstitute = false); /* Query the set of all valid paths. Note that for some store backends, the name part of store paths may be omitted @@ -511,7 +513,7 @@ public: `storePath' is returned; that is, the closures under the `referrers' relation instead of the `references' relation is returned. */ - void computeFSClosure(const PathSet & paths, + virtual void computeFSClosure(const PathSet & paths, PathSet & out, bool flipDirection = false, bool includeOutputs = false, bool includeDerivers = false); @@ -566,6 +568,11 @@ public: if they lack a signature. */ virtual bool isTrusted() { return false; } + /* Return the build log of the specified store path, if available, + or null otherwise. */ + virtual std::shared_ptr<std::string> getBuildLog(const Path & path) + { return nullptr; } + protected: Stats stats; @@ -579,6 +586,7 @@ public: const Path rootDir; const Path stateDir; const Path logDir; + const static string drvsLogDir; LocalFSStore(const Params & params); @@ -595,6 +603,8 @@ public: { return getRealStoreDir() + "/" + baseNameOf(storePath); } + + std::shared_ptr<std::string> getBuildLog(const Path & path) override; }; @@ -642,8 +652,10 @@ void removeTempRoots(); set to true *unless* you're going to collect garbage. */ ref<Store> openStore(const std::string & uri = getEnv("NIX_REMOTE")); +ref<Store> openStore(const std::string & uri, const Store::Params & params); + -void copyPaths(ref<Store> from, ref<Store> to, const Paths & storePaths, bool substitute = false); +void copyPaths(ref<Store> from, ref<Store> to, const PathSet & storePaths, bool substitute = false); enum StoreType { tDaemon, @@ -687,6 +699,11 @@ ValidPathInfo decodeValidPathInfo(std::istream & str, bool hashGiven = false); +/* Compute the content-addressability assertion (ValidPathInfo::ca) + for paths created by makeFixedOutputPath() / addToStore(). */ +std::string makeFixedOutputCA(bool recursive, const Hash & hash); + + MakeError(SubstError, Error) MakeError(BuildError, Error) /* denotes a permanent build failure */ MakeError(InvalidPath, Error) |