diff options
Diffstat (limited to 'src/libstore')
26 files changed, 1266 insertions, 556 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index ab80e4032f04..3e07a2aa2b60 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -8,14 +8,79 @@ #include "sync.hh" #include "remote-fs-accessor.hh" #include "nar-info-disk-cache.hh" +#include "nar-accessor.hh" +#include "json.hh" #include <chrono> +#include <future> + namespace nix { +/* Given requests for a path /nix/store/<x>/<y>, this accessor will + first download the NAR for /nix/store/<x> from the binary cache, + build a NAR accessor for that NAR, and use that to access <y>. */ +struct BinaryCacheStoreAccessor : public FSAccessor +{ + ref<BinaryCacheStore> store; + + std::map<Path, ref<FSAccessor>> nars; + + BinaryCacheStoreAccessor(ref<BinaryCacheStore> store) + : store(store) + { + } + + std::pair<ref<FSAccessor>, Path> fetch(const Path & path_) + { + auto path = canonPath(path_); + + auto storePath = store->toStorePath(path); + std::string restPath = std::string(path, storePath.size()); + + if (!store->isValidPath(storePath)) + throw InvalidPath(format("path ‘%1%’ is not a valid store path") % storePath); + + auto i = nars.find(storePath); + if (i != nars.end()) return {i->second, restPath}; + + StringSink sink; + store->narFromPath(storePath, sink); + + auto accessor = makeNarAccessor(sink.s); + nars.emplace(storePath, accessor); + return {accessor, restPath}; + } + + Stat stat(const Path & path) override + { + auto res = fetch(path); + return res.first->stat(res.second); + } + + StringSet readDirectory(const Path & path) override + { + auto res = fetch(path); + return res.first->readDirectory(res.second); + } + + std::string readFile(const Path & path) override + { + auto res = fetch(path); + return res.first->readFile(res.second); + } + + std::string readLink(const Path & path) override + { + auto res = fetch(path); + return res.first->readLink(res.second); + } +}; + BinaryCacheStore::BinaryCacheStore(const Params & params) : Store(params) , compression(get(params, "compression", "xz")) + , writeNARListing(get(params, "write-nar-listing", "0") == "1") { auto secretKeyFile = get(params, "secret-key", ""); if (secretKeyFile != "") @@ -57,14 +122,27 @@ void BinaryCacheStore::notImpl() throw Error("operation not implemented for binary cache stores"); } +std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) +{ + std::promise<std::shared_ptr<std::string>> promise; + getFile(path, + [&](std::shared_ptr<std::string> result) { + promise.set_value(result); + }, + [&](std::exception_ptr exc) { + promise.set_exception(exc); + }); + return promise.get_future().get(); +} + Path BinaryCacheStore::narInfoFileFor(const Path & storePath) { assertStorePath(storePath); return storePathToHash(storePath) + ".narinfo"; } -void BinaryCacheStore::addToStore(const ValidPathInfo & info, const std::string & nar, - bool repair, bool dontCheckSigs) +void BinaryCacheStore::addToStore(const ValidPathInfo & info, const ref<std::string> & nar, + bool repair, bool dontCheckSigs, std::shared_ptr<FSAccessor> accessor) { if (!repair && isValidPath(info.path)) return; @@ -81,20 +159,83 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, const std::string auto narInfoFile = narInfoFileFor(info.path); - assert(nar.compare(0, narMagic.size(), narMagic) == 0); + assert(nar->compare(0, narMagic.size(), narMagic) == 0); auto narInfo = make_ref<NarInfo>(info); - narInfo->narSize = nar.size(); - narInfo->narHash = hashString(htSHA256, nar); + narInfo->narSize = nar->size(); + narInfo->narHash = hashString(htSHA256, *nar); if (info.narHash && info.narHash != narInfo->narHash) throw Error(format("refusing to copy corrupted path ‘%1%’ to binary cache") % info.path); + auto accessor_ = std::dynamic_pointer_cast<BinaryCacheStoreAccessor>(accessor); + + /* Optionally write a JSON file containing a listing of the + contents of the NAR. */ + if (writeNARListing) { + std::ostringstream jsonOut; + + { + JSONObject jsonRoot(jsonOut); + jsonRoot.attr("version", 1); + + auto narAccessor = makeNarAccessor(nar); + + if (accessor_) + accessor_->nars.emplace(info.path, narAccessor); + + std::function<void(const Path &, JSONPlaceholder &)> recurse; + + recurse = [&](const Path & path, JSONPlaceholder & res) { + auto st = narAccessor->stat(path); + + auto obj = res.object(); + + switch (st.type) { + case FSAccessor::Type::tRegular: + obj.attr("type", "regular"); + obj.attr("size", st.fileSize); + if (st.isExecutable) + obj.attr("executable", true); + break; + case FSAccessor::Type::tDirectory: + obj.attr("type", "directory"); + { + auto res2 = obj.object("entries"); + for (auto & name : narAccessor->readDirectory(path)) { + auto res3 = res2.placeholder(name); + recurse(path + "/" + name, res3); + } + } + break; + case FSAccessor::Type::tSymlink: + obj.attr("type", "symlink"); + obj.attr("target", narAccessor->readLink(path)); + break; + default: + abort(); + } + }; + + { + auto res = jsonRoot.placeholder("root"); + recurse("", res); + } + } + + upsertFile(storePathToHash(info.path) + ".ls.xz", *compress("xz", jsonOut.str())); + } + + else { + if (accessor_) + accessor_->nars.emplace(info.path, makeNarAccessor(nar)); + } + /* Compress the NAR. */ narInfo->compression = compression; auto now1 = std::chrono::steady_clock::now(); - auto narCompressed = compress(compression, nar); + auto narCompressed = compress(compression, *nar); auto now2 = std::chrono::steady_clock::now(); narInfo->fileHash = hashString(htSHA256, *narCompressed); narInfo->fileSize = narCompressed->size(); @@ -102,7 +243,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, const std::string auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); printMsg(lvlTalkative, format("copying path ‘%1%’ (%2% bytes, compressed %3$.1f%% in %4% ms) to binary cache") % narInfo->path % narInfo->narSize - % ((1.0 - (double) narCompressed->size() / nar.size()) * 100.0) + % ((1.0 - (double) narCompressed->size() / nar->size()) * 100.0) % duration); /* Atomically write the NAR file. */ @@ -116,7 +257,7 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, const std::string } else stats.narWriteAverted++; - stats.narWriteBytes += nar.size(); + stats.narWriteBytes += nar->size(); stats.narWriteCompressedBytes += narCompressed->size(); stats.narWriteCompressionTimeMs += duration; @@ -175,17 +316,22 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) sink((unsigned char *) nar->c_str(), nar->size()); } -std::shared_ptr<ValidPathInfo> BinaryCacheStore::queryPathInfoUncached(const Path & storePath) +void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) { auto narInfoFile = narInfoFileFor(storePath); - auto data = getFile(narInfoFile); - if (!data) return 0; - auto narInfo = make_ref<NarInfo>(*this, *data, narInfoFile); + getFile(narInfoFile, + [=](std::shared_ptr<std::string> data) { + if (!data) return success(0); - stats.narInfoRead++; + stats.narInfoRead++; - return std::shared_ptr<NarInfo>(narInfo); + callSuccess(success, failure, (std::shared_ptr<ValidPathInfo>) + std::make_shared<NarInfo>(*this, *data, narInfoFile)); + }, + failure); } Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, @@ -210,7 +356,7 @@ Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, ValidPathInfo info; info.path = makeFixedOutputPath(recursive, h, name); - addToStore(info, *sink.s, repair); + addToStore(info, sink.s, repair, false, 0); return info.path; } @@ -225,7 +371,7 @@ Path BinaryCacheStore::addTextToStore(const string & name, const string & s, if (repair || !isValidPath(info.path)) { StringSink sink; dumpString(s, sink); - addToStore(info, *sink.s, repair); + addToStore(info, sink.s, repair, false, 0); } return info.path; diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index 2d10179f32ab..31878bbb2476 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -19,19 +19,29 @@ private: std::string compression; + bool writeNARListing; + protected: BinaryCacheStore(const Params & params); [[noreturn]] void notImpl(); +public: + virtual bool fileExists(const std::string & path) = 0; virtual void upsertFile(const std::string & path, const std::string & data) = 0; /* Return the contents of the specified file, or null if it doesn't exist. */ - virtual std::shared_ptr<std::string> getFile(const std::string & path) = 0; + virtual void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) = 0; + + std::shared_ptr<std::string> getFile(const std::string & path); + +protected: bool wantMassQuery_ = false; int priority = 50; @@ -50,13 +60,12 @@ public: bool isValidPathUncached(const Path & path) override; - PathSet queryValidPaths(const PathSet & paths) override - { notImpl(); } - PathSet queryAllValidPaths() override { notImpl(); } - std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) override; void queryReferrers(const Path & path, PathSet & referrers) override @@ -83,23 +92,24 @@ public: bool wantMassQuery() override { return wantMassQuery_; } - void addToStore(const ValidPathInfo & info, const std::string & nar, - bool repair = false, bool dontCheckSigs = false) override; + void addToStore(const ValidPathInfo & info, const ref<std::string> & nar, + bool repair, bool dontCheckSigs, + std::shared_ptr<FSAccessor> accessor) override; Path addToStore(const string & name, const Path & srcPath, - bool recursive = true, HashType hashAlgo = htSHA256, - PathFilter & filter = defaultPathFilter, bool repair = false) override; + bool recursive, HashType hashAlgo, + PathFilter & filter, bool repair) override; Path addTextToStore(const string & name, const string & s, - const PathSet & references, bool repair = false) override; + const PathSet & references, bool repair) override; void narFromPath(const Path & path, Sink & sink) override; - void buildPaths(const PathSet & paths, BuildMode buildMode = bmNormal) override + void buildPaths(const PathSet & paths, BuildMode buildMode) override { notImpl(); } BuildResult buildDerivation(const Path & drvPath, const BasicDerivation & drv, - BuildMode buildMode = bmNormal) override + BuildMode buildMode) override { notImpl(); } void ensurePath(const Path & path) override @@ -128,6 +138,8 @@ public: ref<FSAccessor> getFSAccessor() override; +public: + void addSignatures(const Path & storePath, const StringSet & sigs) override { notImpl(); } diff --git a/src/libstore/build.cc b/src/libstore/build.cc index e0eb702a4f82..b61ea5298e1e 100644 --- a/src/libstore/build.cc +++ b/src/libstore/build.cc @@ -768,7 +768,14 @@ private: GoalState state; /* Stuff we need to pass to initChild(). */ - typedef map<Path, Path> DirsInChroot; // maps target path to source path + struct ChrootPath { + Path source; + bool optional; + ChrootPath(Path source = "", bool optional = false) + : source(source), optional(optional) + { } + }; + typedef map<Path, ChrootPath> DirsInChroot; // maps target path to source path DirsInChroot dirsInChroot; typedef map<string, string> Environment; Environment env; @@ -804,6 +811,9 @@ private: result. */ ValidPathInfos prevInfos; + const uid_t sandboxUid = 1000; + const gid_t sandboxGid = 100; + public: DerivationGoal(const Path & drvPath, const StringSet & wantedOutputs, Worker & worker, BuildMode buildMode = bmNormal); @@ -1011,7 +1021,7 @@ void DerivationGoal::loadDerivation() trace("loading derivation"); if (nrFailed != 0) { - printMsg(lvlError, format("cannot build missing derivation ‘%1%’") % drvPath); + printError(format("cannot build missing derivation ‘%1%’") % drvPath); done(BuildResult::MiscFailure); return; } @@ -1165,7 +1175,7 @@ void DerivationGoal::repairClosure() PathSet broken; for (auto & i : outputClosure) { if (worker.pathContentsGood(i)) continue; - printMsg(lvlError, format("found corrupted or missing path ‘%1%’ in the output closure of ‘%2%’") % i % drvPath); + printError(format("found corrupted or missing path ‘%1%’ in the output closure of ‘%2%’") % i % drvPath); Path drvPath2 = outputsToDrv[i]; if (drvPath2 == "") addWaitee(worker.makeSubstitutionGoal(i, true)); @@ -1198,7 +1208,7 @@ void DerivationGoal::inputsRealised() if (nrFailed != 0) { if (!useDerivation) throw Error(format("some dependencies of ‘%1%’ are missing") % drvPath); - printMsg(lvlError, + printError( format("cannot build derivation ‘%1%’: %2% dependencies couldn't be built") % drvPath % nrFailed); done(BuildResult::DependencyFailed); @@ -1363,7 +1373,7 @@ void DerivationGoal::tryToBuild() startBuilder(); } catch (BuildError & e) { - printMsg(lvlError, e.msg()); + printError(e.msg()); outputLocks.unlock(); buildUser.release(); worker.permanentFailure = true; @@ -1512,7 +1522,7 @@ void DerivationGoal::buildDone() } catch (BuildError & e) { if (!hook) - printMsg(lvlError, e.msg()); + printError(e.msg()); outputLocks.unlock(); buildUser.release(); @@ -1641,7 +1651,7 @@ void DerivationGoal::startBuilder() nrRounds > 1 ? "building path(s) %1% (round %2%/%3%)" : "building path(s) %1%"); f.exceptions(boost::io::all_error_bits ^ boost::io::too_many_args_bit); - printMsg(lvlInfo, f % showPaths(missingPaths) % curRound % nrRounds); + printInfo(f % showPaths(missingPaths) % curRound % nrRounds); /* Right platform? */ if (!drv->canBuildLocally()) { @@ -1862,20 +1872,30 @@ void DerivationGoal::startBuilder() dirsInChroot.clear(); - for (auto & i : dirs) { + for (auto i : dirs) { + if (i.empty()) continue; + bool optional = false; + if (i[i.size() - 1] == '?') { + optional = true; + i.pop_back(); + } size_t p = i.find('='); if (p == string::npos) - dirsInChroot[i] = i; + dirsInChroot[i] = {i, optional}; else - dirsInChroot[string(i, 0, p)] = string(i, p + 1); + dirsInChroot[string(i, 0, p)] = {string(i, p + 1), optional}; } dirsInChroot[tmpDirInSandbox] = tmpDir; /* Add the closure of store paths to the chroot. */ PathSet closure; for (auto & i : dirsInChroot) - if (worker.store.isInStore(i.second)) - worker.store.computeFSClosure(worker.store.toStorePath(i.second), closure); + try { + if (worker.store.isInStore(i.second.source)) + worker.store.computeFSClosure(worker.store.toStorePath(i.second.source), closure); + } catch (Error & e) { + throw Error(format("while processing ‘build-sandbox-paths’: %s") % e.what()); + } for (auto & i : closure) dirsInChroot[i] = i; @@ -1937,14 +1957,18 @@ void DerivationGoal::startBuilder() createDirs(chrootRootDir + "/etc"); writeFile(chrootRootDir + "/etc/passwd", - "root:x:0:0:Nix build user:/:/noshell\n" - "nobody:x:65534:65534:Nobody:/:/noshell\n"); + (format( + "root:x:0:0:Nix build user:/:/noshell\n" + "nixbld:x:%1%:%2%:Nix build user:/:/noshell\n" + "nobody:x:65534:65534:Nobody:/:/noshell\n") % sandboxUid % sandboxGid).str()); /* Declare the build user's group so that programs get a consistent view of the system (e.g., "id -gn"). */ writeFile(chrootRootDir + "/etc/group", - "root:x:0:\n" - "nobody:x:65534:\n"); + (format( + "root:x:0:\n" + "nixbld:!:%1%:\n" + "nogroup:x:65534:\n") % sandboxGid).str()); /* Create /etc/hosts with localhost entry. */ if (!fixedOutput) @@ -2126,7 +2150,12 @@ void DerivationGoal::startBuilder() Pid helper = startProcess([&]() { /* Drop additional groups here because we can't do it - after we've created the new user namespace. */ + after we've created the new user namespace. FIXME: + this means that if we're not root in the parent + namespace, we can't drop additional groups; they will + be mapped to nogroup in the child namespace. There does + not seem to be a workaround for this. (But who can tell + from reading user_namespaces(7)?)*/ if (getuid() == 0 && setgroups(0, 0) == -1) throw SysError("setgroups failed"); @@ -2159,19 +2188,19 @@ void DerivationGoal::startBuilder() if (!string2Int<pid_t>(readLine(builderOut.readSide.get()), tmp)) abort(); pid = tmp; - /* Set the UID/GID mapping of the builder's user - namespace such that root maps to the build user, or to the - calling user (if build users are disabled). */ - uid_t targetUid = buildUser.enabled() ? buildUser.getUID() : getuid(); - uid_t targetGid = buildUser.enabled() ? buildUser.getGID() : getgid(); + /* Set the UID/GID mapping of the builder's user namespace + such that the sandbox user maps to the build user, or to + the calling user (if build users are disabled). */ + uid_t hostUid = buildUser.enabled() ? buildUser.getUID() : getuid(); + uid_t hostGid = buildUser.enabled() ? buildUser.getGID() : getgid(); writeFile("/proc/" + std::to_string(pid) + "/uid_map", - (format("0 %d 1") % targetUid).str()); + (format("%d %d 1") % sandboxUid % hostUid).str()); writeFile("/proc/" + std::to_string(pid) + "/setgroups", "deny"); writeFile("/proc/" + std::to_string(pid) + "/gid_map", - (format("0 %d 1") % targetGid).str()); + (format("%d %d 1") % sandboxGid % hostGid).str()); /* Signal the builder that we've updated its user namespace. */ @@ -2200,7 +2229,7 @@ void DerivationGoal::startBuilder() if (msg.size() == 1) break; throw Error(string(msg, 1)); } - printMsg(lvlDebug, msg); + debug(msg); } } @@ -2284,6 +2313,8 @@ void DerivationGoal::runChild() ss.push_back("/dev/tty"); ss.push_back("/dev/urandom"); ss.push_back("/dev/zero"); + ss.push_back("/dev/ptmx"); + ss.push_back("/dev/pts"); createSymlink("/proc/self/fd", chrootRootDir + "/dev/fd"); createSymlink("/proc/self/fd/0", chrootRootDir + "/dev/stdin"); createSymlink("/proc/self/fd/1", chrootRootDir + "/dev/stdout"); @@ -2307,12 +2338,16 @@ void DerivationGoal::runChild() environment. */ for (auto & i : dirsInChroot) { struct stat st; - Path source = i.second; + Path source = i.second.source; Path target = chrootRootDir + i.first; if (source == "/proc") continue; // backwards compatibility debug(format("bind mounting ‘%1%’ to ‘%2%’") % source % target); - if (stat(source.c_str(), &st) == -1) - throw SysError(format("getting attributes of path ‘%1%’") % source); + if (stat(source.c_str(), &st) == -1) { + if (i.second.optional && errno == ENOENT) + continue; + else + throw SysError(format("getting attributes of path ‘%1%’") % source); + } if (S_ISDIR(st.st_mode)) createDirs(target); else { @@ -2330,9 +2365,14 @@ void DerivationGoal::runChild() /* Mount a new tmpfs on /dev/shm to ensure that whatever the builder puts in /dev/shm is cleaned up automatically. */ - if (pathExists("/dev/shm") && mount("none", (chrootRootDir + "/dev/shm").c_str(), "tmpfs", 0, 0) == -1) + if (pathExists("/dev/shm") && mount("none", (chrootRootDir + "/dev/shm").c_str(), "tmpfs", 0, + fmt("size=%s", settings.get("sandbox-dev-shm-size", std::string("50%"))).c_str()) == -1) throw SysError("mounting /dev/shm"); +#if 0 + // FIXME: can't figure out how to do this in a user + // namespace. + /* Mount a new devpts on /dev/pts. Note that this requires the kernel to be compiled with CONFIG_DEVPTS_MULTIPLE_INSTANCES=y (which is the case @@ -2349,6 +2389,7 @@ void DerivationGoal::runChild() Linux versions, it is created with permissions 0. */ chmod_(chrootRootDir + "/dev/pts/ptmx", 0666); } +#endif /* Do the chroot(). */ if (chdir(chrootRootDir.c_str()) == -1) @@ -2369,11 +2410,12 @@ void DerivationGoal::runChild() if (rmdir("real-root") == -1) throw SysError("cannot remove real-root directory"); - /* Become root in the user namespace, which corresponds to - the build user or calling user in the parent namespace. */ - if (setgid(0) == -1) + /* Switch to the sandbox uid/gid in the user namespace, + which corresponds to the build user or calling user in + the parent namespace. */ + if (setgid(sandboxGid) == -1) throw SysError("setgid failed"); - if (setuid(0) == -1) + if (setuid(sandboxUid) == -1) throw SysError("setuid failed"); setUser = false; @@ -2685,7 +2727,7 @@ void DerivationGoal::registerOutputs() /* Apply hash rewriting if necessary. */ bool rewritten = false; if (!outputRewrites.empty()) { - printMsg(lvlError, format("warning: rewriting hashes in ‘%1%’; cross fingers") % path); + printError(format("warning: rewriting hashes in ‘%1%’; cross fingers") % path); /* Canonicalise first. This ensures that the path we're rewriting doesn't contain a hard link to /etc/shadow or @@ -2724,7 +2766,7 @@ void DerivationGoal::registerOutputs() Hash h2 = recursive ? hashPath(h.type, actualPath).first : hashFile(h.type, actualPath); if (buildMode == bmHash) { Path dest = worker.store.makeFixedOutputPath(recursive, h2, drv->env["name"]); - printMsg(lvlError, format("build produced path ‘%1%’ with %2% hash ‘%3%’") + printError(format("build produced path ‘%1%’ with %2% hash ‘%3%’") % dest % printHashType(h.type) % printHash16or32(h2)); if (worker.store.isValidPath(dest)) return; @@ -2948,7 +2990,7 @@ void DerivationGoal::deleteTmpDir(bool force) { if (tmpDir != "") { if (settings.keepFailed && !force) { - printMsg(lvlError, + printError( format("note: keeping build directory ‘%2%’") % drvPath % tmpDir); chmod(tmpDir.c_str(), 0755); @@ -2967,7 +3009,7 @@ void DerivationGoal::handleChildOutput(int fd, const string & data) { logSize += data.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - printMsg(lvlError, + printError( format("%1% killed after writing more than %2% bytes of log output") % getName() % settings.maxLogSize); killChild(); @@ -2990,7 +3032,7 @@ void DerivationGoal::handleChildOutput(int fd, const string & data) } if (hook && fd == hook->fromHook.readSide.get()) - printMsg(lvlError, data); // FIXME? + printError(data); // FIXME? } @@ -3004,7 +3046,7 @@ void DerivationGoal::handleEOF(int fd) void DerivationGoal::flushLine() { if (settings.verboseBuild) - printMsg(lvlInfo, filterANSIEscapes(currentLogLine, true)); + printError(filterANSIEscapes(currentLogLine, true)); else { logTail.push_back(currentLogLine); if (logTail.size() > settings.logLines) logTail.pop_front(); @@ -3217,7 +3259,7 @@ void SubstitutionGoal::tryNext() signature. LocalStore::addToStore() also checks for this, but only after we've downloaded the path. */ if (worker.store.requireSigs && !info->checkSignatures(worker.store, worker.store.publicKeys)) { - printMsg(lvlInfo, format("warning: substituter ‘%s’ does not have a valid signature for path ‘%s’") + printInfo(format("warning: substituter ‘%s’ does not have a valid signature for path ‘%s’") % sub->getUri() % storePath); tryNext(); return; @@ -3268,7 +3310,7 @@ void SubstitutionGoal::tryToRun() return; } - printMsg(lvlInfo, format("fetching path ‘%1%’...") % storePath); + printInfo(format("fetching path ‘%1%’...") % storePath); outPipe.create(); @@ -3304,7 +3346,7 @@ void SubstitutionGoal::finished() try { promise.get_future().get(); } catch (Error & e) { - printMsg(lvlInfo, e.msg()); + printInfo(e.msg()); /* Try the next substitute. */ state = &SubstitutionGoal::tryNext; @@ -3466,7 +3508,7 @@ void Worker::childTerminated(Goal * goal, bool wakeSleepers) { auto i = std::find_if(children.begin(), children.end(), [&](const Child & child) { return child.goal2 == goal; }); - assert(i != children.end()); + if (i == children.end()) return; if (i->inBuildSlot) { assert(nrLocalBuilds > 0); @@ -3598,7 +3640,7 @@ void Worker::waitForInput() if (!waitingForAWhile.empty()) { useTimeout = true; if (lastWokenUp == 0) - printMsg(lvlError, "waiting for locks or build slots..."); + printError("waiting for locks or build slots..."); if (lastWokenUp == 0 || lastWokenUp > before) lastWokenUp = before; timeout.tv_sec = std::max((time_t) 1, (time_t) (lastWokenUp + settings.pollInterval - before)); } else lastWokenUp = 0; @@ -3661,7 +3703,7 @@ void Worker::waitForInput() j->respectTimeouts && after - j->lastOutput >= (time_t) settings.maxSilentTime) { - printMsg(lvlError, + printError( format("%1% timed out after %2% seconds of silence") % goal->getName() % settings.maxSilentTime); goal->timedOut(); @@ -3672,7 +3714,7 @@ void Worker::waitForInput() j->respectTimeouts && after - j->timeStarted >= (time_t) settings.buildTimeout) { - printMsg(lvlError, + printError( format("%1% timed out after %2% seconds") % goal->getName() % settings.buildTimeout); goal->timedOut(); @@ -3700,7 +3742,7 @@ bool Worker::pathContentsGood(const Path & path) { std::map<Path, bool>::iterator i = pathContentsGoodCache.find(path); if (i != pathContentsGoodCache.end()) return i->second; - printMsg(lvlInfo, format("checking path ‘%1%’...") % path); + printInfo(format("checking path ‘%1%’...") % path); auto info = store.queryPathInfo(path); bool res; if (!pathExists(path)) @@ -3711,7 +3753,7 @@ bool Worker::pathContentsGood(const Path & path) res = info->narHash == nullHash || info->narHash == current.first; } pathContentsGoodCache[path] = res; - if (!res) printMsg(lvlError, format("path ‘%1%’ is corrupted or missing!") % path); + if (!res) printError(format("path ‘%1%’ is corrupted or missing!") % path); return res; } @@ -3749,7 +3791,7 @@ void LocalStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode) } if (!failed.empty()) - throw Error(format("build of %1% failed") % showPaths(failed), worker.exitStatus()); + throw Error(worker.exitStatus(), "build of %s failed", showPaths(failed)); } @@ -3785,7 +3827,7 @@ void LocalStore::ensurePath(const Path & path) worker.run(goals); if (goal->getExitCode() != Goal::ecSuccess) - throw Error(format("path ‘%1%’ does not exist and cannot be created") % path, worker.exitStatus()); + throw Error(worker.exitStatus(), "path ‘%s’ does not exist and cannot be created", path); } @@ -3806,7 +3848,7 @@ void LocalStore::repairPath(const Path & path) goals.insert(worker.makeDerivationGoal(deriver, StringSet(), bmRepair)); worker.run(goals); } else - throw Error(format("cannot repair path ‘%1%’") % path, worker.exitStatus()); + throw Error(worker.exitStatus(), "cannot repair path ‘%s’", path); } } diff --git a/src/libstore/builtins.cc b/src/libstore/builtins.cc index d3194a905733..a30f30906f01 100644 --- a/src/libstore/builtins.cc +++ b/src/libstore/builtins.cc @@ -17,13 +17,15 @@ void builtinFetchurl(const BasicDerivation & drv) auto fetch = [&](const string & url) { /* No need to do TLS verification, because we check the hash of the result anyway. */ - DownloadOptions options; - options.verifyTLS = false; + DownloadRequest request(url); + request.verifyTLS = false; /* Show a progress indicator, even though stderr is not a tty. */ - options.showProgress = DownloadOptions::yes; + request.showProgress = DownloadRequest::yes; - auto data = makeDownloader()->download(url, options); + /* Note: have to use a fresh downloader here because we're in + a forked process. */ + auto data = makeDownloader()->download(request); assert(data.data); return data.data; diff --git a/src/libstore/derivations.cc b/src/libstore/derivations.cc index f051f10bd018..d934bda38225 100644 --- a/src/libstore/derivations.cc +++ b/src/libstore/derivations.cc @@ -61,6 +61,7 @@ bool BasicDerivation::canBuildLocally() const #if __linux__ || (platform == "i686-linux" && settings.thisSystem == "x86_64-linux") || (platform == "armv6l-linux" && settings.thisSystem == "armv7l-linux") + || (platform == "armv5tel-linux" && (settings.thisSystem == "armv7l-linux" || settings.thisSystem == "armv6l-linux")) #elif __FreeBSD__ || (platform == "i686-linux" && settings.thisSystem == "x86_64-freebsd") || (platform == "i686-linux" && settings.thisSystem == "i686-freebsd") @@ -87,6 +88,38 @@ Path writeDerivation(ref<Store> store, } +MakeError(FormatError, Error) + + +/* Read string `s' from stream `str'. */ +static void expect(std::istream & str, const string & s) +{ + char s2[s.size()]; + str.read(s2, s.size()); + if (string(s2, s.size()) != s) + throw FormatError(format("expected string ‘%1%’") % s); +} + + +/* Read a C-style string from stream `str'. */ +static string parseString(std::istream & str) +{ + string res; + expect(str, "\""); + int c; + while ((c = str.get()) != '"') + if (c == '\\') { + c = str.get(); + if (c == 'n') res += '\n'; + else if (c == 'r') res += '\r'; + else if (c == 't') res += '\t'; + else res += c; + } + else res += c; + return res; +} + + static Path parsePath(std::istream & str) { string s = parseString(str); @@ -96,6 +129,20 @@ static Path parsePath(std::istream & str) } +static bool endOfList(std::istream & str) +{ + if (str.peek() == ',') { + str.get(); + return false; + } + if (str.peek() == ']') { + str.get(); + return true; + } + return false; +} + + static StringSet parseStrings(std::istream & str, bool arePaths) { StringSet res; diff --git a/src/libstore/download.cc b/src/libstore/download.cc index ed7e124d25f4..954044c2344f 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -5,11 +5,16 @@ #include "store-api.hh" #include "archive.hh" +#include <unistd.h> +#include <fcntl.h> + #include <curl/curl.h> +#include <queue> #include <iostream> #include <thread> - +#include <cmath> +#include <random> namespace nix { @@ -30,225 +35,462 @@ std::string resolveUri(const std::string & uri) struct CurlDownloader : public Downloader { - CURL * curl; - ref<std::string> data; - string etag, status, expectedETag, effectiveUrl; + CURLM * curlm = 0; - struct curl_slist * requestHeaders; + std::random_device rd; + std::mt19937 mt19937; - bool showProgress; - double prevProgressTime{0}, startTime{0}; - unsigned int moveBack{1}; + bool enableHttp2; - size_t writeCallback(void * contents, size_t size, size_t nmemb) + struct DownloadItem : public std::enable_shared_from_this<DownloadItem> { - size_t realSize = size * nmemb; - data->append((char *) contents, realSize); - return realSize; - } + CurlDownloader & downloader; + DownloadRequest request; + DownloadResult result; + bool done = false; // whether either the success or failure function has been called + std::function<void(const DownloadResult &)> success; + std::function<void(std::exception_ptr exc)> failure; + CURL * req = 0; + bool active = false; // whether the handle has been added to the multi object + std::string status; + + bool showProgress = false; + double prevProgressTime{0}, startTime{0}; + unsigned int moveBack{1}; + + unsigned int attempt = 0; + + /* Don't start this download until the specified time point + has been reached. */ + std::chrono::steady_clock::time_point embargo; + + struct curl_slist * requestHeaders = 0; + + DownloadItem(CurlDownloader & downloader, const DownloadRequest & request) + : downloader(downloader), request(request) + { + showProgress = + request.showProgress == DownloadRequest::yes || + (request.showProgress == DownloadRequest::automatic && isatty(STDERR_FILENO)); + + if (!request.expectedETag.empty()) + requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); + } - static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) - { - return ((CurlDownloader *) userp)->writeCallback(contents, size, nmemb); - } + ~DownloadItem() + { + if (req) { + if (active) + curl_multi_remove_handle(downloader.curlm, req); + curl_easy_cleanup(req); + } + if (requestHeaders) curl_slist_free_all(requestHeaders); + try { + if (!done) + fail(DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri)); + } catch (...) { + ignoreException(); + } + } - size_t headerCallback(void * contents, size_t size, size_t nmemb) - { - size_t realSize = size * nmemb; - string line = string((char *) contents, realSize); - printMsg(lvlVomit, format("got header: %1%") % trim(line)); - if (line.compare(0, 5, "HTTP/") == 0) { // new response starts - etag = ""; - auto ss = tokenizeString<vector<string>>(line, " "); - status = ss.size() >= 2 ? ss[1] : ""; - } else { - auto i = line.find(':'); - if (i != string::npos) { - string name = trim(string(line, 0, i)); - if (name == "ETag") { // FIXME: case - etag = trim(string(line, i + 1)); - /* Hack to work around a GitHub bug: it sends - ETags, but ignores If-None-Match. So if we get - the expected ETag on a 200 response, then shut - down the connection because we already have the - data. */ - printMsg(lvlDebug, format("got ETag: %1%") % etag); - if (etag == expectedETag && status == "200") { - printMsg(lvlDebug, format("shutting down on 200 HTTP response with expected ETag")); - return 0; + template<class T> + void fail(const T & e) + { + assert(!done); + done = true; + callFailure(failure, std::make_exception_ptr(e)); + } + + size_t writeCallback(void * contents, size_t size, size_t nmemb) + { + size_t realSize = size * nmemb; + result.data->append((char *) contents, realSize); + return realSize; + } + + static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) + { + return ((DownloadItem *) userp)->writeCallback(contents, size, nmemb); + } + + size_t headerCallback(void * contents, size_t size, size_t nmemb) + { + size_t realSize = size * nmemb; + std::string line((char *) contents, realSize); + printMsg(lvlVomit, format("got header for ‘%s’: %s") % request.uri % trim(line)); + if (line.compare(0, 5, "HTTP/") == 0) { // new response starts + result.etag = ""; + auto ss = tokenizeString<vector<string>>(line, " "); + status = ss.size() >= 2 ? ss[1] : ""; + result.data = std::make_shared<std::string>(); + } else { + auto i = line.find(':'); + if (i != string::npos) { + string name = toLower(trim(string(line, 0, i))); + if (name == "etag") { + result.etag = trim(string(line, i + 1)); + /* Hack to work around a GitHub bug: it sends + ETags, but ignores If-None-Match. So if we get + the expected ETag on a 200 response, then shut + down the connection because we already have the + data. */ + if (result.etag == request.expectedETag && status == "200") { + debug(format("shutting down on 200 HTTP response with expected ETag")); + return 0; + } } } } + return realSize; } - return realSize; - } - static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) - { - return ((CurlDownloader *) userp)->headerCallback(contents, size, nmemb); - } + static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) + { + return ((DownloadItem *) userp)->headerCallback(contents, size, nmemb); + } - int progressCallback(double dltotal, double dlnow) - { - if (showProgress) { - double now = getTime(); - if (prevProgressTime <= now - 1) { - string s = (format(" [%1$.0f/%2$.0f KiB, %3$.1f KiB/s]") - % (dlnow / 1024.0) - % (dltotal / 1024.0) - % (now == startTime ? 0 : dlnow / 1024.0 / (now - startTime))).str(); - std::cerr << "\e[" << moveBack << "D" << s; - moveBack = s.size(); + int progressCallback(double dltotal, double dlnow) + { + if (showProgress) { + double now = getTime(); + if (prevProgressTime <= now - 1) { + string s = (format(" [%1$.0f/%2$.0f KiB, %3$.1f KiB/s]") + % (dlnow / 1024.0) + % (dltotal / 1024.0) + % (now == startTime ? 0 : dlnow / 1024.0 / (now - startTime))).str(); + std::cerr << "\e[" << moveBack << "D" << s; + moveBack = s.size(); + std::cerr.flush(); + prevProgressTime = now; + } + } + return _isInterrupted; + } + + static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow) + { + return ((DownloadItem *) userp)->progressCallback(dltotal, dlnow); + } + + void init() + { + // FIXME: handle parallel downloads. + if (showProgress) { + std::cerr << (format("downloading ‘%1%’... ") % request.uri); std::cerr.flush(); - prevProgressTime = now; + startTime = getTime(); } + + if (!req) req = curl_easy_init(); + + curl_easy_reset(req); + curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str()); + curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(req, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str()); + #if LIBCURL_VERSION_NUM >= 0x072b00 + curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1); + #endif + #if LIBCURL_VERSION_NUM >= 0x072f00 + if (downloader.enableHttp2) + curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS); + #endif + curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper); + curl_easy_setopt(req, CURLOPT_WRITEDATA, this); + curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, DownloadItem::headerCallbackWrapper); + curl_easy_setopt(req, CURLOPT_HEADERDATA, this); + + curl_easy_setopt(req, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper); + curl_easy_setopt(req, CURLOPT_PROGRESSDATA, this); + curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0); + + curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders); + + if (request.head) + curl_easy_setopt(req, CURLOPT_NOBODY, 1); + + if (request.verifyTLS) + curl_easy_setopt(req, CURLOPT_CAINFO, + getEnv("NIX_SSL_CERT_FILE", getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt")).c_str()); + else { + curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0); + curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0); + } + + result.data = std::make_shared<std::string>(); } - return _isInterrupted; - } - static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow) - { - return ((CurlDownloader *) userp)->progressCallback(dltotal, dlnow); - } + void finish(CURLcode code) + { + if (showProgress) + //std::cerr << "\e[" << moveBack << "D\e[K\n"; + std::cerr << "\n"; - CurlDownloader() - : data(make_ref<std::string>()) - { - requestHeaders = 0; + long httpStatus = 0; + curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus); - curl = curl_easy_init(); - if (!curl) throw nix::Error("unable to initialize curl"); - } + char * effectiveUrlCStr; + curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr); + if (effectiveUrlCStr) + result.effectiveUrl = effectiveUrlCStr; - ~CurlDownloader() - { - if (curl) curl_easy_cleanup(curl); - if (requestHeaders) curl_slist_free_all(requestHeaders); - } + debug(format("finished download of ‘%s’; curl status = %d, HTTP status = %d, body = %d bytes") + % request.uri % code % httpStatus % (result.data ? result.data->size() : 0)); + + if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) { + code = CURLE_OK; + httpStatus = 304; + } + + if (code == CURLE_OK && + (httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) + { + result.cached = httpStatus == 304; + done = true; + callSuccess(success, failure, const_cast<const DownloadResult &>(result)); + } else { + Error err = + (httpStatus == 404 || code == CURLE_FILE_COULDNT_READ_FILE) ? NotFound : + httpStatus == 403 ? Forbidden : + (httpStatus == 408 || httpStatus == 500 || httpStatus == 503 + || httpStatus == 504 || httpStatus == 522 || httpStatus == 524 + || code == CURLE_COULDNT_RESOLVE_HOST) ? Transient : + Misc; + + attempt++; - bool fetch(const string & url, const DownloadOptions & options) + auto exc = + code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted + ? DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri) + : httpStatus != 0 + ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus) + : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code); + + /* If this is a transient error, then maybe retry the + download after a while. */ + if (err == Transient && attempt < request.tries) { + int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); + printError(format("warning: %s; retrying in %d ms") % exc.what() % ms); + embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); + downloader.enqueueItem(shared_from_this()); + } + else + fail(exc); + } + } + }; + + struct State { - showProgress = - options.showProgress == DownloadOptions::yes || - (options.showProgress == DownloadOptions::automatic && isatty(STDERR_FILENO)); + struct EmbargoComparator { + bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) { + return i1->embargo > i2->embargo; + } + }; + bool quit = false; + std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming; + }; - curl_easy_reset(curl); + Sync<State> state_; - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(curl, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str()); - curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1); + /* We can't use a std::condition_variable to wake up the curl + thread, because it only monitors file descriptors. So use a + pipe instead. */ + Pipe wakeupPipe; - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallbackWrapper); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) this); + std::thread workerThread; - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallbackWrapper); - curl_easy_setopt(curl, CURLOPT_HEADERDATA, (void *) this); + CurlDownloader() + : mt19937(rd()) + { + static std::once_flag globalInit; + std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); - curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper); - curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, (void *) this); - curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0); + curlm = curl_multi_init(); - curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); + #if LIBCURL_VERSION_NUM >= 0x072b00 // correct? + curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); + #endif + curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, + settings.get("binary-caches-parallel-connections", 25)); - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + enableHttp2 = settings.get("enable-http2", true); - if (options.verifyTLS) - curl_easy_setopt(curl, CURLOPT_CAINFO, getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt").c_str()); - else { - curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0); - curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0); - } + wakeupPipe.create(); + fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK); - data = make_ref<std::string>(); + workerThread = std::thread([&]() { workerThreadEntry(); }); + } - if (requestHeaders) { - curl_slist_free_all(requestHeaders); - requestHeaders = 0; + ~CurlDownloader() + { + /* Signal the worker thread to exit. */ + { + auto state(state_.lock()); + state->quit = true; } + writeFull(wakeupPipe.writeSide.get(), " "); - if (!options.expectedETag.empty()) { - this->expectedETag = options.expectedETag; - requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + options.expectedETag).c_str()); - } + workerThread.join(); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, requestHeaders); + if (curlm) curl_multi_cleanup(curlm); + } - if (options.head) - curl_easy_setopt(curl, CURLOPT_NOBODY, 1); + void workerThreadMain() + { + std::map<CURL *, std::shared_ptr<DownloadItem>> items; + + bool quit = false; + + std::chrono::steady_clock::time_point nextWakeup; + + while (!quit) { + checkInterrupt(); + + /* Let curl do its thing. */ + int running; + CURLMcode mc = curl_multi_perform(curlm, &running); + if (mc != CURLM_OK) + throw nix::Error(format("unexpected error from curl_multi_perform(): %s") % curl_multi_strerror(mc)); + + /* Set the promises of any finished requests. */ + CURLMsg * msg; + int left; + while ((msg = curl_multi_info_read(curlm, &left))) { + if (msg->msg == CURLMSG_DONE) { + auto i = items.find(msg->easy_handle); + assert(i != items.end()); + i->second->finish(msg->data.result); + curl_multi_remove_handle(curlm, i->second->req); + i->second->active = false; + items.erase(i); + } + } - if (showProgress) { - std::cerr << (format("downloading ‘%1%’... ") % url); - std::cerr.flush(); - startTime = getTime(); - } + /* Wait for activity, including wakeup events. */ + int numfds = 0; + struct curl_waitfd extraFDs[1]; + extraFDs[0].fd = wakeupPipe.readSide.get(); + extraFDs[0].events = CURL_WAIT_POLLIN; + extraFDs[0].revents = 0; + auto sleepTimeMs = + nextWakeup != std::chrono::steady_clock::time_point() + ? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count()) + : 1000000000; + //printMsg(lvlVomit, format("download thread waiting for %d ms") % sleepTimeMs); + mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds); + if (mc != CURLM_OK) + throw nix::Error(format("unexpected error from curl_multi_wait(): %s") % curl_multi_strerror(mc)); + + nextWakeup = std::chrono::steady_clock::time_point(); + + /* Add new curl requests from the incoming requests queue, + except for requests that are embargoed (waiting for a + retry timeout to expire). */ + if (extraFDs[0].revents & CURL_WAIT_POLLIN) { + char buf[1024]; + auto res = read(extraFDs[0].fd, buf, sizeof(buf)); + if (res == -1 && errno != EINTR) + throw SysError("reading curl wakeup socket"); + } + + std::vector<std::shared_ptr<DownloadItem>> incoming; + auto now = std::chrono::steady_clock::now(); + + { + auto state(state_.lock()); + while (!state->incoming.empty()) { + auto item = state->incoming.top(); + if (item->embargo <= now) { + incoming.push_back(item); + state->incoming.pop(); + } else { + if (nextWakeup == std::chrono::steady_clock::time_point() + || item->embargo < nextWakeup) + nextWakeup = item->embargo; + break; + } + } + quit = state->quit; + } - CURLcode res = curl_easy_perform(curl); - if (showProgress) - //std::cerr << "\e[" << moveBack << "D\e[K\n"; - std::cerr << "\n"; - checkInterrupt(); - if (res == CURLE_WRITE_ERROR && etag == options.expectedETag) return false; - - long httpStatus = -1; - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpStatus); - - if (res != CURLE_OK) { - Error err = - httpStatus == 404 ? NotFound : - httpStatus == 403 ? Forbidden : - (httpStatus == 408 || httpStatus == 500 || httpStatus == 503 - || httpStatus == 504 || httpStatus == 522 || httpStatus == 524 - || res == CURLE_COULDNT_RESOLVE_HOST) ? Transient : - Misc; - if (res == CURLE_HTTP_RETURNED_ERROR && httpStatus != -1) - throw DownloadError(err, format("unable to download ‘%s’: HTTP error %d") - % url % httpStatus); - else - throw DownloadError(err, format("unable to download ‘%s’: %s (%d)") - % url % curl_easy_strerror(res) % res); + for (auto & item : incoming) { + debug(format("starting download of %s") % item->request.uri); + item->init(); + curl_multi_add_handle(curlm, item->req); + item->active = true; + items[item->req] = item; + } } - char *effectiveUrlCStr; - curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr); - if (effectiveUrlCStr) - effectiveUrl = effectiveUrlCStr; + debug("download thread shutting down"); + } - if (httpStatus == 304) return false; + void workerThreadEntry() + { + try { + workerThreadMain(); + } catch (nix::Interrupted & e) { + } catch (std::exception & e) { + printError(format("unexpected error in download thread: %s") % e.what()); + } - return true; + { + auto state(state_.lock()); + while (!state->incoming.empty()) state->incoming.pop(); + state->quit = true; + } } - DownloadResult download(string url, const DownloadOptions & options) override + void enqueueItem(std::shared_ptr<DownloadItem> item) { - size_t attempt = 0; - - while (true) { - try { - DownloadResult res; - if (fetch(resolveUri(url), options)) { - res.cached = false; - res.data = data; - } else - res.cached = true; - res.effectiveUrl = effectiveUrl; - res.etag = etag; - return res; - } catch (DownloadError & e) { - attempt++; - if (e.error != Transient || attempt >= options.tries) throw; - auto ms = options.baseRetryTimeMs * (1 << (attempt - 1)); - printMsg(lvlError, format("warning: %s; retrying in %d ms") % e.what() % ms); - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - } + { + auto state(state_.lock()); + if (state->quit) + throw nix::Error("cannot enqueue download request because the download thread is shutting down"); + state->incoming.push(item); } + writeFull(wakeupPipe.writeSide.get(), " "); + } + + void enqueueDownload(const DownloadRequest & request, + std::function<void(const DownloadResult &)> success, + std::function<void(std::exception_ptr exc)> failure) override + { + auto item = std::make_shared<DownloadItem>(*this, request); + item->success = success; + item->failure = failure; + enqueueItem(item); } }; +ref<Downloader> getDownloader() +{ + static std::shared_ptr<Downloader> downloader; + static std::once_flag downloaderCreated; + std::call_once(downloaderCreated, [&]() { downloader = makeDownloader(); }); + return ref<Downloader>(downloader); +} + ref<Downloader> makeDownloader() { return make_ref<CurlDownloader>(); } +std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & request) +{ + auto promise = std::make_shared<std::promise<DownloadResult>>(); + enqueueDownload(request, + [promise](const DownloadResult & result) { promise->set_value(result); }, + [promise](std::exception_ptr exc) { promise->set_exception(exc); }); + return promise->get_future(); +} + +DownloadResult Downloader::download(const DownloadRequest & request) +{ + return enqueueDownload(request).get(); +} + Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl) { auto url = resolveUri(url_); @@ -292,7 +534,7 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa if (effectiveUrl) *effectiveUrl = url_; } else if (!ss[1].empty()) { - printMsg(lvlDebug, format("verifying previous ETag ‘%1%’") % ss[1]); + debug(format("verifying previous ETag ‘%1%’") % ss[1]); expectedETag = ss[1]; } } @@ -303,9 +545,9 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa if (!skip) { try { - DownloadOptions options; - options.expectedETag = expectedETag; - auto res = download(url, options); + DownloadRequest request(url); + request.expectedETag = expectedETag; + auto res = download(request); if (effectiveUrl) *effectiveUrl = res.effectiveUrl; @@ -316,7 +558,7 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa Hash hash = hashString(expectedHash ? expectedHash.type : htSHA256, *res.data); info.path = store->makeFixedOutputPath(false, hash, name); info.narHash = hashString(htSHA256, *sink.s); - store->addToStore(info, *sink.s, false, true); + store->addToStore(info, sink.s, false, true); storePath = info.path; } @@ -326,7 +568,7 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n"); } catch (DownloadError & e) { if (storePath.empty()) throw; - printMsg(lvlError, format("warning: %1%; using cached result") % e.msg()); + printError(format("warning: %1%; using cached result") % e.msg()); } } @@ -340,7 +582,7 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa unpackedStorePath = ""; } if (unpackedStorePath.empty()) { - printMsg(lvlInfo, format("unpacking ‘%1%’...") % url); + printInfo(format("unpacking ‘%1%’...") % url); Path tmpDir = createTempDir(); AutoDelete autoDelete(tmpDir, true); // FIXME: this requires GNU tar for decompression. diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 011b85f4721b..82b5d641fde9 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -4,24 +4,28 @@ #include "hash.hh" #include <string> +#include <future> namespace nix { -struct DownloadOptions +struct DownloadRequest { + std::string uri; std::string expectedETag; bool verifyTLS = true; enum { yes, no, automatic } showProgress = yes; bool head = false; size_t tries = 1; - unsigned int baseRetryTimeMs = 100; + unsigned int baseRetryTimeMs = 250; + + DownloadRequest(const std::string & uri) : uri(uri) { } }; struct DownloadResult { bool cached; - string etag; - string effectiveUrl; + std::string etag; + std::string effectiveUrl; std::shared_ptr<std::string> data; }; @@ -29,14 +33,33 @@ class Store; struct Downloader { - virtual DownloadResult download(string url, const DownloadOptions & options) = 0; + /* Enqueue a download request, returning a future to the result of + the download. The future may throw a DownloadError + exception. */ + virtual void enqueueDownload(const DownloadRequest & request, + std::function<void(const DownloadResult &)> success, + std::function<void(std::exception_ptr exc)> failure) = 0; + + std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); - Path downloadCached(ref<Store> store, const string & url, bool unpack, string name = "", - const Hash & expectedHash = Hash(), string * effectiveUrl = nullptr); + /* Synchronously download a file. */ + DownloadResult download(const DownloadRequest & request); - enum Error { NotFound, Forbidden, Misc, Transient }; + /* Check if the specified file is already in ~/.cache/nix/tarballs + and is more recent than ‘tarball-ttl’ seconds. Otherwise, + use the recorded ETag to verify if the server has a more + recent version, and if so, download it to the Nix store. */ + Path downloadCached(ref<Store> store, const string & uri, bool unpack, string name = "", + const Hash & expectedHash = Hash(), string * effectiveUri = nullptr); + + enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; }; +/* Return a shared Downloader object. Using this object is preferred + because it enables connection reuse and HTTP/2 multiplexing. */ +ref<Downloader> getDownloader(); + +/* Return a new Downloader object. */ ref<Downloader> makeDownloader(); class DownloadError : public Error diff --git a/src/libstore/export-import.cc b/src/libstore/export-import.cc index 6090ee3e9f83..c5618c826c54 100644 --- a/src/libstore/export-import.cc +++ b/src/libstore/export-import.cc @@ -117,15 +117,7 @@ Paths Store::importPaths(Source & source, std::shared_ptr<FSAccessor> accessor, if (readInt(source) == 1) readString(source); - addToStore(info, *tee.data, false, dontCheckSigs); - - // FIXME: implement accessors? - assert(!accessor); -#if 0 - auto accessor_ = std::dynamic_pointer_cast<BinaryCacheStoreAccessor>(accessor); - if (accessor_) - accessor_->nars.emplace(info.path, makeNarAccessor(tee.data)); -#endif + addToStore(info, tee.data, false, dontCheckSigs, accessor); res.push_back(info.path); } diff --git a/src/libstore/gc.cc b/src/libstore/gc.cc index 2eab7de0d8bf..ae03604faf98 100644 --- a/src/libstore/gc.cc +++ b/src/libstore/gc.cc @@ -39,7 +39,7 @@ int LocalStore::openGCLock(LockType lockType) throw SysError(format("opening global GC lock ‘%1%’") % fnGCLock); if (!lockFile(fdGCLock.get(), lockType, false)) { - printMsg(lvlError, format("waiting for the big garbage collector lock...")); + printError(format("waiting for the big garbage collector lock...")); lockFile(fdGCLock.get(), lockType, true); } @@ -129,7 +129,7 @@ Path LocalFSStore::addPermRoot(const Path & _storePath, if (settings.checkRootReachability) { Roots roots = findRoots(); if (roots.find(gcRoot) == roots.end()) - printMsg(lvlError, + printError( format( "warning: ‘%1%’ is not in a directory where the garbage collector looks for roots; " "therefore, ‘%2%’ might be removed by the garbage collector") @@ -226,7 +226,7 @@ void LocalStore::readTempRoots(PathSet & tempRoots, FDs & fds) only succeed if the owning process has died. In that case we don't care about its temporary roots. */ if (lockFile(fd->get(), ltWrite, false)) { - printMsg(lvlError, format("removing stale temporary roots file ‘%1%’") % path); + printError(format("removing stale temporary roots file ‘%1%’") % path); unlink(path.c_str()); writeFull(fd->get(), "d"); continue; @@ -264,7 +264,7 @@ void LocalStore::findRoots(const Path & path, unsigned char type, Roots & roots) if (isStorePath(storePath) && isValidPath(storePath)) roots[path] = storePath; else - printMsg(lvlInfo, format("skipping invalid root from ‘%1%’ to ‘%2%’") % path % storePath); + printInfo(format("skipping invalid root from ‘%1%’ to ‘%2%’") % path % storePath); }; try { @@ -287,7 +287,7 @@ void LocalStore::findRoots(const Path & path, unsigned char type, Roots & roots) target = absPath(target, dirOf(path)); if (!pathExists(target)) { if (isInDir(path, stateDir + "/" + gcRootsDir + "/auto")) { - printMsg(lvlInfo, format("removing stale link from ‘%1%’ to ‘%2%’") % path % target); + printInfo(format("removing stale link from ‘%1%’ to ‘%2%’") % path % target); unlink(path.c_str()); } } else { @@ -310,7 +310,7 @@ void LocalStore::findRoots(const Path & path, unsigned char type, Roots & roots) catch (SysError & e) { /* We only ignore permanent failures. */ if (e.errNo == EACCES || e.errNo == ENOENT || e.errNo == ENOTDIR) - printMsg(lvlInfo, format("cannot read potential root ‘%1%’") % path); + printInfo(format("cannot read potential root ‘%1%’") % path); else throw; } @@ -513,7 +513,7 @@ void LocalStore::deletePathRecursive(GCState & state, const Path & path) throw SysError(format("getting status of %1%") % realPath); } - printMsg(lvlInfo, format("deleting ‘%1%’") % path); + printInfo(format("deleting ‘%1%’") % path); state.results.paths.insert(path); @@ -535,7 +535,7 @@ void LocalStore::deletePathRecursive(GCState & state, const Path & path) state.bytesInvalidated += size; } catch (SysError & e) { if (e.errNo == ENOSPC) { - printMsg(lvlInfo, format("note: can't create move ‘%1%’: %2%") % realPath % e.msg()); + printInfo(format("note: can't create move ‘%1%’: %2%") % realPath % e.msg()); deleteGarbage(state, realPath); } } @@ -543,7 +543,7 @@ void LocalStore::deletePathRecursive(GCState & state, const Path & path) deleteGarbage(state, realPath); if (state.results.bytesFreed + state.bytesInvalidated > state.options.maxFreed) { - printMsg(lvlInfo, format("deleted or invalidated more than %1% bytes; stopping") % state.options.maxFreed); + printInfo(format("deleted or invalidated more than %1% bytes; stopping") % state.options.maxFreed); throw GCLimitReached(); } } @@ -562,7 +562,7 @@ bool LocalStore::canReachRoot(GCState & state, PathSet & visited, const Path & p } if (state.roots.find(path) != state.roots.end()) { - printMsg(lvlDebug, format("cannot delete ‘%1%’ because it's a root") % path); + debug(format("cannot delete ‘%1%’ because it's a root") % path); state.alive.insert(path); return true; } @@ -626,7 +626,7 @@ void LocalStore::tryToDelete(GCState & state, const Path & path) PathSet visited; if (canReachRoot(state, visited, path)) { - printMsg(lvlDebug, format("cannot delete ‘%1%’ because it's still reachable") % path); + debug(format("cannot delete ‘%1%’ because it's still reachable") % path); } else { /* No path we visited was a root, so everything is garbage. But we only delete ‘path’ and its referrers here so that @@ -682,7 +682,7 @@ void LocalStore::removeUnusedLinks(const GCState & state) throw SysError(format("statting ‘%1%’") % linksDir); long long overhead = st.st_blocks * 512ULL; - printMsg(lvlInfo, format("note: currently hard linking saves %.2f MiB") + printInfo(format("note: currently hard linking saves %.2f MiB") % ((unsharedSize - actualSize - overhead) / (1024.0 * 1024.0))); } @@ -715,7 +715,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) /* Find the roots. Since we've grabbed the GC lock, the set of permanent roots cannot increase now. */ - printMsg(lvlError, format("finding garbage collector roots...")); + printError(format("finding garbage collector roots...")); Roots rootMap = options.ignoreLiveness ? Roots() : findRoots(); for (auto & i : rootMap) state.roots.insert(i.second); @@ -744,7 +744,7 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) createDirs(trashDir); } catch (SysError & e) { if (e.errNo == ENOSPC) { - printMsg(lvlInfo, format("note: can't create trash directory: %1%") % e.msg()); + printInfo(format("note: can't create trash directory: %1%") % e.msg()); state.moveToTrash = false; } } @@ -765,9 +765,9 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) } else if (options.maxFreed > 0) { if (state.shouldDelete) - printMsg(lvlError, format("deleting garbage...")); + printError(format("deleting garbage...")); else - printMsg(lvlError, format("determining live/dead paths...")); + printError(format("determining live/dead paths...")); try { @@ -825,12 +825,12 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results) fds.clear(); /* Delete the trash directory. */ - printMsg(lvlInfo, format("deleting ‘%1%’") % trashDir); + printInfo(format("deleting ‘%1%’") % trashDir); deleteGarbage(state, trashDir); /* Clean up the links directory. */ if (options.action == GCOptions::gcDeleteDead || options.action == GCOptions::gcDeleteSpecific) { - printMsg(lvlError, format("deleting unused links...")); + printError(format("deleting unused links...")); removeUnusedLinks(state); } diff --git a/src/libstore/globals.cc b/src/libstore/globals.cc index ecf81e8eb38e..00b468892529 100644 --- a/src/libstore/globals.cc +++ b/src/libstore/globals.cc @@ -183,6 +183,8 @@ void Settings::update() _get(enableImportNative, "allow-unsafe-native-code-during-evaluation"); _get(useCaseHack, "use-case-hack"); _get(preBuildHook, "pre-build-hook"); + _get(keepGoing, "keep-going"); + _get(keepFailed, "keep-failed"); } diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index bdcd2fd3998b..9d31f77c921f 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -13,17 +13,12 @@ private: Path cacheUri; - Pool<Downloader> downloaders; - public: HttpBinaryCacheStore( const Params & params, const Path & _cacheUri) : BinaryCacheStore(params) , cacheUri(_cacheUri) - , downloaders( - std::numeric_limits<size_t>::max(), - []() { return makeDownloader(); }) { if (cacheUri.back() == '/') cacheUri.pop_back(); @@ -54,12 +49,11 @@ protected: bool fileExists(const std::string & path) override { try { - auto downloader(downloaders.get()); - DownloadOptions options; - options.showProgress = DownloadOptions::no; - options.head = true; - options.tries = 5; - downloader->download(cacheUri + "/" + path, options); + DownloadRequest request(cacheUri + "/" + path); + request.showProgress = DownloadRequest::no; + request.head = true; + request.tries = 5; + getDownloader()->download(request); return true; } catch (DownloadError & e) { /* S3 buckets return 403 if a file doesn't exist and the @@ -75,20 +69,29 @@ protected: throw UploadToHTTP("uploading to an HTTP binary cache is not supported"); } - std::shared_ptr<std::string> getFile(const std::string & path) override + void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) override { - auto downloader(downloaders.get()); - DownloadOptions options; - options.showProgress = DownloadOptions::no; - options.tries = 5; - options.baseRetryTimeMs = 1000; - try { - return downloader->download(cacheUri + "/" + path, options).data; - } catch (DownloadError & e) { - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return 0; - throw; - } + DownloadRequest request(cacheUri + "/" + path); + request.showProgress = DownloadRequest::no; + request.tries = 8; + + getDownloader()->enqueueDownload(request, + [success](const DownloadResult & result) { + success(result.data); + }, + [success, failure](std::exception_ptr exc) { + try { + std::rethrow_exception(exc); + } catch (DownloadError & e) { + if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + return success(0); + failure(exc); + } catch (...) { + failure(exc); + } + }); } }; diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 91d2650fe124..0f377989bd89 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -32,7 +32,19 @@ protected: void upsertFile(const std::string & path, const std::string & data) override; - std::shared_ptr<std::string> getFile(const std::string & path) override; + void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) override + { + sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { + try { + return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path)); + } catch (SysError & e) { + if (e.errNo == ENOENT) return std::shared_ptr<std::string>(); + throw; + } + }); + } PathSet queryAllValidPaths() override { @@ -76,16 +88,6 @@ void LocalBinaryCacheStore::upsertFile(const std::string & path, const std::stri atomicWrite(binaryCacheDir + "/" + path, data); } -std::shared_ptr<std::string> LocalBinaryCacheStore::getFile(const std::string & path) -{ - try { - return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path)); - } catch (SysError & e) { - if (e.errNo == ENOENT) return 0; - throw; - } -} - static RegisterStoreImplementation regStore([]( const std::string & uri, const Store::Params & params) -> std::shared_ptr<Store> diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc index 272d48741114..612efde7bb8f 100644 --- a/src/libstore/local-store.cc +++ b/src/libstore/local-store.cc @@ -45,7 +45,7 @@ LocalStore::LocalStore(const Params & params) , reservedPath(dbDir + "/reserved") , schemaPath(dbDir + "/schema") , trashDir(realStoreDir + "/trash") - , requireSigs(settings.get("signed-binary-caches", std::string("")) != "") // FIXME: rename option + , requireSigs(trim(settings.get("signed-binary-caches", std::string(""))) != "") // FIXME: rename option , publicKeys(getDefaultPublicKeys()) { auto state(_state.lock()); @@ -77,7 +77,7 @@ LocalStore::LocalStore(const Params & params) struct group * gr = getgrnam(settings.buildUsersGroup.c_str()); if (!gr) - printMsg(lvlError, format("warning: the group ‘%1%’ specified in ‘build-users-group’ does not exist") + printError(format("warning: the group ‘%1%’ specified in ‘build-users-group’ does not exist") % settings.buildUsersGroup); else { struct stat st; @@ -125,7 +125,7 @@ LocalStore::LocalStore(const Params & params) #endif if (res == -1) { writeFull(fd.get(), string(settings.reservedSize, 'X')); - ftruncate(fd.get(), settings.reservedSize); + [[gnu::unused]] auto res2 = ftruncate(fd.get(), settings.reservedSize); } } } catch (SysError & e) { /* don't care about errors */ @@ -137,7 +137,7 @@ LocalStore::LocalStore(const Params & params) globalLock = openLockFile(globalLockPath.c_str(), true); if (!lockFile(globalLock.get(), ltRead, false)) { - printMsg(lvlError, "waiting for the big Nix store lock..."); + printError("waiting for the big Nix store lock..."); lockFile(globalLock.get(), ltRead, true); } @@ -168,7 +168,7 @@ LocalStore::LocalStore(const Params & params) "please upgrade Nix to version 1.11 first."); if (!lockFile(globalLock.get(), ltWrite, false)) { - printMsg(lvlError, "waiting for exclusive access to the Nix store..."); + printError("waiting for exclusive access to the Nix store..."); lockFile(globalLock.get(), ltWrite, true); } @@ -578,49 +578,54 @@ Hash parseHashField(const Path & path, const string & s) } -std::shared_ptr<ValidPathInfo> LocalStore::queryPathInfoUncached(const Path & path) +void LocalStore::queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) { - auto info = std::make_shared<ValidPathInfo>(); - info->path = path; + sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { - assertStorePath(path); + auto info = std::make_shared<ValidPathInfo>(); + info->path = path; - return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { - auto state(_state.lock()); + assertStorePath(path); + + return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { + auto state(_state.lock()); - /* Get the path info. */ - auto useQueryPathInfo(state->stmtQueryPathInfo.use()(path)); + /* Get the path info. */ + auto useQueryPathInfo(state->stmtQueryPathInfo.use()(path)); - if (!useQueryPathInfo.next()) - return std::shared_ptr<ValidPathInfo>(); + if (!useQueryPathInfo.next()) + return std::shared_ptr<ValidPathInfo>(); - info->id = useQueryPathInfo.getInt(0); + info->id = useQueryPathInfo.getInt(0); - info->narHash = parseHashField(path, useQueryPathInfo.getStr(1)); + info->narHash = parseHashField(path, useQueryPathInfo.getStr(1)); - info->registrationTime = useQueryPathInfo.getInt(2); + info->registrationTime = useQueryPathInfo.getInt(2); - auto s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 3); - if (s) info->deriver = s; + auto s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 3); + if (s) info->deriver = s; - /* Note that narSize = NULL yields 0. */ - info->narSize = useQueryPathInfo.getInt(4); + /* Note that narSize = NULL yields 0. */ + info->narSize = useQueryPathInfo.getInt(4); - info->ultimate = useQueryPathInfo.getInt(5) == 1; + info->ultimate = useQueryPathInfo.getInt(5) == 1; - s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 6); - if (s) info->sigs = tokenizeString<StringSet>(s, " "); + s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 6); + if (s) info->sigs = tokenizeString<StringSet>(s, " "); - s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 7); - if (s) info->ca = s; + s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 7); + if (s) info->ca = s; - /* Get the references. */ - auto useQueryReferences(state->stmtQueryReferences.use()(info->id)); + /* Get the references. */ + auto useQueryReferences(state->stmtQueryReferences.use()(info->id)); - while (useQueryReferences.next()) - info->references.insert(useQueryReferences.getStr(0)); + while (useQueryReferences.next()) + info->references.insert(useQueryReferences.getStr(0)); - return info; + return info; + }); }); } @@ -777,18 +782,27 @@ Path LocalStore::queryPathFromHashPart(const string & hashPart) PathSet LocalStore::querySubstitutablePaths(const PathSet & paths) { if (!settings.useSubstitutes) return PathSet(); + + auto remaining = paths; PathSet res; + for (auto & sub : getDefaultSubstituters()) { + if (remaining.empty()) break; if (sub->storeDir != storeDir) continue; if (!sub->wantMassQuery()) continue; - for (auto & path : paths) { - if (res.count(path)) continue; - debug(format("checking substituter ‘%s’ for path ‘%s’") - % sub->getUri() % path); - if (sub->isValidPath(path)) + + auto valid = sub->queryValidPaths(remaining); + + PathSet remaining2; + for (auto & path : remaining) + if (valid.count(path)) res.insert(path); - } + else + remaining2.insert(path); + + std::swap(remaining, remaining2); } + return res; } @@ -896,10 +910,10 @@ void LocalStore::invalidatePath(State & state, const Path & path) } -void LocalStore::addToStore(const ValidPathInfo & info, const std::string & nar, - bool repair, bool dontCheckSigs) +void LocalStore::addToStore(const ValidPathInfo & info, const ref<std::string> & nar, + bool repair, bool dontCheckSigs, std::shared_ptr<FSAccessor> accessor) { - Hash h = hashString(htSHA256, nar); + Hash h = hashString(htSHA256, *nar); if (h != info.narHash) throw Error(format("hash mismatch importing path ‘%s’; expected hash ‘%s’, got ‘%s’") % info.path % info.narHash.to_string() % h.to_string()); @@ -926,7 +940,7 @@ void LocalStore::addToStore(const ValidPathInfo & info, const std::string & nar, deletePath(realPath); - StringSource source(nar); + StringSource source(*nar); restorePath(realPath, source); canonicalisePathMetaData(realPath, -1); @@ -1104,7 +1118,7 @@ void LocalStore::invalidatePathChecked(const Path & path) bool LocalStore::verifyStore(bool checkContents, bool repair) { - printMsg(lvlError, format("reading the Nix store...")); + printError(format("reading the Nix store...")); bool errors = false; @@ -1115,7 +1129,7 @@ bool LocalStore::verifyStore(bool checkContents, bool repair) for (auto & i : readDirectory(realStoreDir)) store.insert(i.name); /* Check whether all valid paths actually exist. */ - printMsg(lvlInfo, "checking path existence..."); + printInfo("checking path existence..."); PathSet validPaths2 = queryAllValidPaths(), validPaths, done; @@ -1128,7 +1142,7 @@ bool LocalStore::verifyStore(bool checkContents, bool repair) /* Optionally, check the content hashes (slow). */ if (checkContents) { - printMsg(lvlInfo, "checking hashes..."); + printInfo("checking hashes..."); Hash nullHash(htSHA256); @@ -1141,7 +1155,7 @@ bool LocalStore::verifyStore(bool checkContents, bool repair) HashResult current = hashPath(info->narHash.type, i); if (info->narHash != nullHash && info->narHash != current.first) { - printMsg(lvlError, format("path ‘%1%’ was modified! " + printError(format("path ‘%1%’ was modified! " "expected hash ‘%2%’, got ‘%3%’") % i % printHash(info->narHash) % printHash(current.first)); if (repair) repairPath(i); else errors = true; @@ -1151,14 +1165,14 @@ bool LocalStore::verifyStore(bool checkContents, bool repair) /* Fill in missing hashes. */ if (info->narHash == nullHash) { - printMsg(lvlError, format("fixing missing hash on ‘%1%’") % i); + printError(format("fixing missing hash on ‘%1%’") % i); info->narHash = current.first; update = true; } /* Fill in missing narSize fields (from old stores). */ if (info->narSize == 0) { - printMsg(lvlError, format("updating size field on ‘%1%’ to %2%") % i % current.second); + printError(format("updating size field on ‘%1%’ to %2%") % i % current.second); info->narSize = current.second; update = true; } @@ -1174,9 +1188,9 @@ bool LocalStore::verifyStore(bool checkContents, bool repair) /* It's possible that the path got GC'ed, so ignore errors on invalid paths. */ if (isValidPath(i)) - printMsg(lvlError, format("error: %1%") % e.msg()); + printError(format("error: %1%") % e.msg()); else - printMsg(lvlError, format("warning: %1%") % e.msg()); + printError(format("warning: %1%") % e.msg()); errors = true; } } @@ -1195,7 +1209,7 @@ void LocalStore::verifyPath(const Path & path, const PathSet & store, done.insert(path); if (!isStorePath(path)) { - printMsg(lvlError, format("path ‘%1%’ is not in the Nix store") % path); + printError(format("path ‘%1%’ is not in the Nix store") % path); auto state(_state.lock()); invalidatePath(*state, path); return; @@ -1214,16 +1228,16 @@ void LocalStore::verifyPath(const Path & path, const PathSet & store, } if (canInvalidate) { - printMsg(lvlError, format("path ‘%1%’ disappeared, removing from database...") % path); + printError(format("path ‘%1%’ disappeared, removing from database...") % path); auto state(_state.lock()); invalidatePath(*state, path); } else { - printMsg(lvlError, format("path ‘%1%’ disappeared, but it still has valid referrers!") % path); + printError(format("path ‘%1%’ disappeared, but it still has valid referrers!") % path); if (repair) try { repairPath(path); } catch (Error & e) { - printMsg(lvlError, format("warning: %1%") % e.msg()); + printError(format("warning: %1%") % e.msg()); errors = true; } else errors = true; @@ -1275,7 +1289,7 @@ static void makeMutable(const Path & path) void LocalStore::upgradeStore7() { if (getuid() != 0) return; - printMsg(lvlError, "removing immutable bits from the Nix store (this may take a while)..."); + printError("removing immutable bits from the Nix store (this may take a while)..."); makeMutable(realStoreDir); } diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh index 5b5960cf245f..511209d8404a 100644 --- a/src/libstore/local-store.hh +++ b/src/libstore/local-store.hh @@ -106,7 +106,9 @@ public: PathSet queryAllValidPaths() override; - std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) override; void queryReferrers(const Path & path, PathSet & referrers) override; @@ -123,12 +125,13 @@ public: void querySubstitutablePathInfos(const PathSet & paths, SubstitutablePathInfos & infos) override; - void addToStore(const ValidPathInfo & info, const std::string & nar, - bool repair, bool dontCheckSigs) override; + void addToStore(const ValidPathInfo & info, const ref<std::string> & nar, + bool repair, bool dontCheckSigs, + std::shared_ptr<FSAccessor> accessor) override; Path addToStore(const string & name, const Path & srcPath, - bool recursive = true, HashType hashAlgo = htSHA256, - PathFilter & filter = defaultPathFilter, bool repair = false) override; + bool recursive, HashType hashAlgo, + PathFilter & filter, bool repair) override; /* Like addToStore(), but the contents of the path are contained in `dump', which is either a NAR serialisation (if recursive == @@ -138,7 +141,7 @@ public: bool recursive = true, HashType hashAlgo = htSHA256, bool repair = false); Path addTextToStore(const string & name, const string & s, - const PathSet & references, bool repair = false) override; + const PathSet & references, bool repair) override; void buildPaths(const PathSet & paths, BuildMode buildMode) override; diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index da654ba0d2c3..0c2c49e5531f 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -8,66 +8,90 @@ namespace nix { -void Store::computeFSClosure(const Path & path, - PathSet & paths, bool flipDirection, bool includeOutputs, bool includeDerivers) +void Store::computeFSClosure(const Path & startPath, + PathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) { - ThreadPool pool; + struct State + { + size_t pending; + PathSet & paths; + std::exception_ptr exc; + }; - Sync<bool> state_; + Sync<State> state_(State{0, paths_, 0}); - std::function<void(Path)> doPath; + std::function<void(const Path &)> enqueue; - doPath = [&](const Path & path) { + std::condition_variable done; + + enqueue = [&](const Path & path) -> void { { auto state(state_.lock()); - if (paths.count(path)) return; - paths.insert(path); + if (state->exc) return; + if (state->paths.count(path)) return; + state->paths.insert(path); + state->pending++; } - auto info = queryPathInfo(path); + queryPathInfo(path, + [&, path](ref<ValidPathInfo> info) { + // FIXME: calls to isValidPath() should be async - if (flipDirection) { + if (flipDirection) { - PathSet referrers; - queryReferrers(path, referrers); - for (auto & ref : referrers) - if (ref != path) - pool.enqueue(std::bind(doPath, ref)); + PathSet referrers; + queryReferrers(path, referrers); + for (auto & ref : referrers) + if (ref != path) + enqueue(ref); - if (includeOutputs) { - PathSet derivers = queryValidDerivers(path); - for (auto & i : derivers) - pool.enqueue(std::bind(doPath, i)); - } + if (includeOutputs) + for (auto & i : queryValidDerivers(path)) + enqueue(i); - if (includeDerivers && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i) && queryPathInfo(i)->deriver == path) - pool.enqueue(std::bind(doPath, i)); - } + if (includeDerivers && isDerivation(path)) + for (auto & i : queryDerivationOutputs(path)) + if (isValidPath(i) && queryPathInfo(i)->deriver == path) + enqueue(i); - } else { + } else { - for (auto & ref : info->references) - if (ref != path) - pool.enqueue(std::bind(doPath, ref)); + for (auto & ref : info->references) + if (ref != path) + enqueue(ref); - if (includeOutputs && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i)) pool.enqueue(std::bind(doPath, i)); - } + if (includeOutputs && isDerivation(path)) + for (auto & i : queryDerivationOutputs(path)) + if (isValidPath(i)) enqueue(i); - if (includeDerivers && isValidPath(info->deriver)) - pool.enqueue(std::bind(doPath, info->deriver)); + if (includeDerivers && isValidPath(info->deriver)) + enqueue(info->deriver); - } + } + + { + auto state(state_.lock()); + assert(state->pending); + if (!--state->pending) done.notify_one(); + } + + }, + + [&, path](std::exception_ptr exc) { + auto state(state_.lock()); + if (!state->exc) state->exc = exc; + assert(state->pending); + if (!--state->pending) done.notify_one(); + }); }; - pool.enqueue(std::bind(doPath, path)); + enqueue(startPath); - pool.process(); + { + auto state(state_.lock()); + while (state->pending) state.wait(done); + if (state->exc) std::rethrow_exception(state->exc); + } } diff --git a/src/libstore/optimise-store.cc b/src/libstore/optimise-store.cc index 927478121244..1bf8b7d83bbc 100644 --- a/src/libstore/optimise-store.cc +++ b/src/libstore/optimise-store.cc @@ -43,7 +43,7 @@ struct MakeReadOnly LocalStore::InodeHash LocalStore::loadInodeHash() { - printMsg(lvlDebug, "loading hash inodes in memory"); + debug("loading hash inodes in memory"); InodeHash inodeHash; AutoCloseDir dir = opendir(linksDir.c_str()); @@ -75,7 +75,7 @@ Strings LocalStore::readDirectoryIgnoringInodes(const Path & path, const InodeHa checkInterrupt(); if (inodeHash.count(dirent->d_ino)) { - printMsg(lvlDebug, format("‘%1%’ is already linked") % dirent->d_name); + debug(format("‘%1%’ is already linked") % dirent->d_name); continue; } @@ -116,13 +116,13 @@ void LocalStore::optimisePath_(OptimiseStats & stats, const Path & path, InodeHa NixOS (example: $fontconfig/var/cache being modified). Skip those files. FIXME: check the modification time. */ if (S_ISREG(st.st_mode) && (st.st_mode & S_IWUSR)) { - printMsg(lvlError, format("skipping suspicious writable file ‘%1%’") % path); + printError(format("skipping suspicious writable file ‘%1%’") % path); return; } /* This can still happen on top-level files. */ if (st.st_nlink > 1 && inodeHash.count(st.st_ino)) { - printMsg(lvlDebug, format("‘%1%’ is already linked, with %2% other file(s)") % path % (st.st_nlink - 2)); + debug(format("‘%1%’ is already linked, with %2% other file(s)") % path % (st.st_nlink - 2)); return; } @@ -136,7 +136,7 @@ void LocalStore::optimisePath_(OptimiseStats & stats, const Path & path, InodeHa contents of the symlink (i.e. the result of readlink()), not the contents of the target (which may not even exist). */ Hash hash = hashPath(htSHA256, path).first; - printMsg(lvlDebug, format("‘%1%’ has hash ‘%2%’") % path % printHash(hash)); + debug(format("‘%1%’ has hash ‘%2%’") % path % printHash(hash)); /* Check if this is a known hash. */ Path linkPath = linksDir + "/" + printHash32(hash); @@ -161,12 +161,12 @@ void LocalStore::optimisePath_(OptimiseStats & stats, const Path & path, InodeHa throw SysError(format("getting attributes of path ‘%1%’") % linkPath); if (st.st_ino == stLink.st_ino) { - printMsg(lvlDebug, format("‘%1%’ is already linked to ‘%2%’") % path % linkPath); + debug(format("‘%1%’ is already linked to ‘%2%’") % path % linkPath); return; } if (st.st_size != stLink.st_size) { - printMsg(lvlError, format("removing corrupted link ‘%1%’") % linkPath); + printError(format("removing corrupted link ‘%1%’") % linkPath); unlink(linkPath.c_str()); goto retry; } @@ -192,7 +192,7 @@ void LocalStore::optimisePath_(OptimiseStats & stats, const Path & path, InodeHa systems). This is likely to happen with empty files. Just shrug and ignore. */ if (st.st_size) - printMsg(lvlInfo, format("‘%1%’ has maximum number of links") % linkPath); + printInfo(format("‘%1%’ has maximum number of links") % linkPath); return; } throw SysError(format("cannot link ‘%1%’ to ‘%2%’") % tempLink % linkPath); @@ -201,14 +201,14 @@ void LocalStore::optimisePath_(OptimiseStats & stats, const Path & path, InodeHa /* Atomically replace the old file with the new hard link. */ if (rename(tempLink.c_str(), path.c_str()) == -1) { if (unlink(tempLink.c_str()) == -1) - printMsg(lvlError, format("unable to unlink ‘%1%’") % tempLink); + printError(format("unable to unlink ‘%1%’") % tempLink); if (errno == EMLINK) { /* Some filesystems generate too many links on the rename, rather than on the original link. (Probably it temporarily increases the st_nlink field before decreasing it again.) */ if (st.st_size) - printMsg(lvlInfo, format("‘%1%’ has maximum number of links") % linkPath); + printInfo(format("‘%1%’ has maximum number of links") % linkPath); return; } throw SysError(format("cannot rename ‘%1%’ to ‘%2%’") % tempLink % path); @@ -244,7 +244,7 @@ void LocalStore::optimiseStore() optimiseStore(stats); - printMsg(lvlError, + printError( format("%1% freed by hard-linking %2% files") % showBytes(stats.bytesFreed) % stats.filesLinked); diff --git a/src/libstore/pathlocks.cc b/src/libstore/pathlocks.cc index b9e178d61f3c..8788ee1649fb 100644 --- a/src/libstore/pathlocks.cc +++ b/src/libstore/pathlocks.cc @@ -121,7 +121,7 @@ bool PathLocks::lockPaths(const PathSet & _paths, /* Acquire an exclusive lock. */ if (!lockFile(fd.get(), ltWrite, false)) { if (wait) { - if (waitMsg != "") printMsg(lvlError, waitMsg); + if (waitMsg != "") printError(waitMsg); lockFile(fd.get(), ltWrite, true); } else { /* Failed to lock this path; release all other @@ -174,7 +174,7 @@ void PathLocks::unlock() lockedPaths.erase(i.second); if (close(i.first) == -1) - printMsg(lvlError, + printError( format("error (ignored): cannot close lock file on ‘%1%’") % i.second); debug(format("lock released on ‘%1%’") % i.second); diff --git a/src/libstore/profiles.cc b/src/libstore/profiles.cc index 449c88b576b6..f24daa8862a1 100644 --- a/src/libstore/profiles.cc +++ b/src/libstore/profiles.cc @@ -132,9 +132,9 @@ void deleteGeneration(const Path & profile, unsigned int gen) static void deleteGeneration2(const Path & profile, unsigned int gen, bool dryRun) { if (dryRun) - printMsg(lvlInfo, format("would remove generation %1%") % gen); + printInfo(format("would remove generation %1%") % gen); else { - printMsg(lvlInfo, format("removing generation %1%") % gen); + printInfo(format("removing generation %1%") % gen); deleteGeneration(profile, gen); } } diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index f03f33fc175c..77faa2f801f1 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -260,36 +260,40 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, } -std::shared_ptr<ValidPathInfo> RemoteStore::queryPathInfoUncached(const Path & path) -{ - auto conn(connections->get()); - conn->to << wopQueryPathInfo << path; - try { - conn->processStderr(); - } catch (Error & e) { - // Ugly backwards compatibility hack. - if (e.msg().find("is not valid") != std::string::npos) - throw InvalidPath(e.what()); - throw; - } - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { - bool valid = readInt(conn->from) != 0; - if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path); - } - auto info = std::make_shared<ValidPathInfo>(); - info->path = path; - info->deriver = readString(conn->from); - if (info->deriver != "") assertStorePath(info->deriver); - info->narHash = parseHash(htSHA256, readString(conn->from)); - info->references = readStorePaths<PathSet>(*this, conn->from); - info->registrationTime = readInt(conn->from); - info->narSize = readLongLong(conn->from); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { - info->ultimate = readInt(conn->from) != 0; - info->sigs = readStrings<StringSet>(conn->from); - info->ca = readString(conn->from); - } - return info; +void RemoteStore::queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) +{ + sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { + auto conn(connections->get()); + conn->to << wopQueryPathInfo << path; + try { + conn->processStderr(); + } catch (Error & e) { + // Ugly backwards compatibility hack. + if (e.msg().find("is not valid") != std::string::npos) + throw InvalidPath(e.what()); + throw; + } + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { + bool valid = readInt(conn->from) != 0; + if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path); + } + auto info = std::make_shared<ValidPathInfo>(); + info->path = path; + info->deriver = readString(conn->from); + if (info->deriver != "") assertStorePath(info->deriver); + info->narHash = parseHash(htSHA256, readString(conn->from)); + info->references = readStorePaths<PathSet>(*this, conn->from); + info->registrationTime = readInt(conn->from); + info->narSize = readLongLong(conn->from); + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { + info->ultimate = readInt(conn->from) != 0; + info->sigs = readStrings<StringSet>(conn->from); + info->ca = readString(conn->from); + } + return info; + }); } @@ -342,15 +346,43 @@ Path RemoteStore::queryPathFromHashPart(const string & hashPart) } -void RemoteStore::addToStore(const ValidPathInfo & info, const std::string & nar, - bool repair, bool dontCheckSigs) +void RemoteStore::addToStore(const ValidPathInfo & info, const ref<std::string> & nar, + bool repair, bool dontCheckSigs, std::shared_ptr<FSAccessor> accessor) { auto conn(connections->get()); - conn->to << wopAddToStoreNar - << info.path << info.deriver << printHash(info.narHash) - << info.references << info.registrationTime << info.narSize - << info.ultimate << info.sigs << nar << repair << dontCheckSigs; - conn->processStderr(); + + if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) { + conn->to << wopImportPaths; + + StringSink sink; + sink << 1 // == path follows + ; + assert(nar->size() % 8 == 0); + sink((unsigned char *) nar->data(), nar->size()); + sink + << exportMagic + << info.path + << info.references + << info.deriver + << 0 // == no legacy signature + << 0 // == no path follows + ; + + StringSource source(*sink.s); + conn->processStderr(0, &source); + + auto importedPaths = readStorePaths<PathSet>(*this, conn->from); + assert(importedPaths.size() <= 1); + } + + else { + conn->to << wopAddToStoreNar + << info.path << info.deriver << printHash(info.narHash) + << info.references << info.registrationTime << info.narSize + << info.ultimate << info.sigs << *nar << repair << dontCheckSigs; + // FIXME: don't send nar as a string + conn->processStderr(); + } } @@ -573,12 +605,12 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source) to.flush(); } else - printMsg(lvlError, chomp(readString(from))); + printError(chomp(readString(from))); } if (msg == STDERR_ERROR) { string error = readString(from); unsigned int status = readInt(from); - throw Error(format("%1%") % error, status); + throw Error(status, error); } else if (msg != STDERR_LAST) throw Error("protocol error processing standard error"); diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index 5c9c617d93e4..40f17da300d0 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -32,7 +32,9 @@ public: PathSet queryAllValidPaths() override; - std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) override; void queryReferrers(const Path & path, PathSet & referrers) override; @@ -49,8 +51,9 @@ public: void querySubstitutablePathInfos(const PathSet & paths, SubstitutablePathInfos & infos) override; - void addToStore(const ValidPathInfo & info, const std::string & nar, - bool repair, bool dontCheckSigs) override; + void addToStore(const ValidPathInfo & info, const ref<std::string> & nar, + bool repair, bool dontCheckSigs, + std::shared_ptr<FSAccessor> accessor) override; Path addToStore(const string & name, const Path & srcPath, bool recursive = true, HashType hashAlgo = htSHA256, diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index ed95620bbd7c..c11f2b06b990 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -99,7 +99,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore } } - const Stats & getS3Stats() + const Stats & getS3Stats() override { return stats; } @@ -161,52 +161,56 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); - printMsg(lvlInfo, format("uploaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + printInfo(format("uploaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") % bucketName % path % data.size() % duration); stats.putTimeMs += duration; } - std::shared_ptr<std::string> getFile(const std::string & path) override + void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) override { - debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); + sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { + debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); - auto request = - Aws::S3::Model::GetObjectRequest() - .WithBucket(bucketName) - .WithKey(path); + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(path); - request.SetResponseStreamFactory([&]() { - return Aws::New<std::stringstream>("STRINGSTREAM"); - }); + request.SetResponseStreamFactory([&]() { + return Aws::New<std::stringstream>("STRINGSTREAM"); + }); - stats.get++; + stats.get++; - try { + try { - auto now1 = std::chrono::steady_clock::now(); + auto now1 = std::chrono::steady_clock::now(); - auto result = checkAws(format("AWS error fetching ‘%s’") % path, - client->GetObject(request)); + auto result = checkAws(format("AWS error fetching ‘%s’") % path, + client->GetObject(request)); - auto now2 = std::chrono::steady_clock::now(); + auto now2 = std::chrono::steady_clock::now(); - auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); + auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); - printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % res.size() % duration); + printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + % bucketName % path % res.size() % duration); - stats.getBytes += res.size(); - stats.getTimeMs += duration; + stats.getBytes += res.size(); + stats.getTimeMs += duration; - return std::make_shared<std::string>(res); + return std::make_shared<std::string>(res); - } catch (S3Error & e) { - if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0; - throw; - } + } catch (S3Error & e) { + if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>(); + throw; + } + }); } PathSet queryAllValidPaths() override diff --git a/src/libstore/s3-binary-cache-store.hh b/src/libstore/s3-binary-cache-store.hh index 79ab72e5a940..4d43fe4d23d8 100644 --- a/src/libstore/s3-binary-cache-store.hh +++ b/src/libstore/s3-binary-cache-store.hh @@ -27,7 +27,7 @@ public: std::atomic<uint64_t> head{0}; }; - const Stats & getS3Stats(); + virtual const Stats & getS3Stats() = 0; }; } diff --git a/src/libstore/sqlite.cc b/src/libstore/sqlite.cc index ea0b843f5752..0197b091cd12 100644 --- a/src/libstore/sqlite.cc +++ b/src/libstore/sqlite.cc @@ -10,11 +10,11 @@ namespace nix { int err = sqlite3_errcode(db); if (err == SQLITE_BUSY || err == SQLITE_PROTOCOL) { if (err == SQLITE_PROTOCOL) - printMsg(lvlError, "warning: SQLite database is busy (SQLITE_PROTOCOL)"); + printError("warning: SQLite database is busy (SQLITE_PROTOCOL)"); else { static bool warned = false; if (!warned) { - printMsg(lvlError, "warning: SQLite database is busy"); + printError("warning: SQLite database is busy"); warned = true; } } diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 1ce483ca991a..f7f6c9696688 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -4,6 +4,8 @@ #include "util.hh" #include "nar-info-disk-cache.hh" +#include <future> + namespace nix { @@ -283,51 +285,125 @@ bool Store::isValidPath(const Path & storePath) ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath) { + std::promise<ref<ValidPathInfo>> promise; + + queryPathInfo(storePath, + [&](ref<ValidPathInfo> info) { + promise.set_value(info); + }, + [&](std::exception_ptr exc) { + promise.set_exception(exc); + }); + + return promise.get_future().get(); +} + + +void Store::queryPathInfo(const Path & storePath, + std::function<void(ref<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) +{ auto hashPart = storePathToHash(storePath); - { - auto state_(state.lock()); - auto res = state_->pathInfoCache.get(hashPart); - if (res) { - stats.narInfoReadAverted++; - if (!*res) - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - return ref<ValidPathInfo>(*res); + try { + + { + auto res = state.lock()->pathInfoCache.get(hashPart); + if (res) { + stats.narInfoReadAverted++; + if (!*res) + throw InvalidPath(format("path ‘%s’ is not valid") % storePath); + return success(ref<ValidPathInfo>(*res)); + } } - } - if (diskCache) { - auto res = diskCache->lookupNarInfo(getUri(), hashPart); - if (res.first != NarInfoDiskCache::oUnknown) { - stats.narInfoReadAverted++; - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, - res.first == NarInfoDiskCache::oInvalid ? 0 : res.second); - if (res.first == NarInfoDiskCache::oInvalid || - (res.second->path != storePath && storePathToName(storePath) != "")) - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - return ref<ValidPathInfo>(res.second); + if (diskCache) { + auto res = diskCache->lookupNarInfo(getUri(), hashPart); + if (res.first != NarInfoDiskCache::oUnknown) { + stats.narInfoReadAverted++; + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, + res.first == NarInfoDiskCache::oInvalid ? 0 : res.second); + if (res.first == NarInfoDiskCache::oInvalid || + (res.second->path != storePath && storePathToName(storePath) != "")) + throw InvalidPath(format("path ‘%s’ is not valid") % storePath); + } + return success(ref<ValidPathInfo>(res.second)); + } } + + } catch (std::exception & e) { + return callFailure(failure); } - auto info = queryPathInfoUncached(storePath); + queryPathInfoUncached(storePath, + [this, storePath, hashPart, success, failure](std::shared_ptr<ValidPathInfo> info) { - if (diskCache) - diskCache->upsertNarInfo(getUri(), hashPart, info); + if (diskCache) + diskCache->upsertNarInfo(getUri(), hashPart, info); + + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, info); + } + + if (!info + || (info->path != storePath && storePathToName(storePath) != "")) + { + stats.narInfoMissing++; + return failure(std::make_exception_ptr(InvalidPath(format("path ‘%s’ is not valid") % storePath))); + } + + callSuccess(success, failure, ref<ValidPathInfo>(info)); + + }, failure); +} - { - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, info); - } - if (!info - || (info->path != storePath && storePathToName(storePath) != "")) +PathSet Store::queryValidPaths(const PathSet & paths) +{ + struct State { - stats.narInfoMissing++; - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - } + size_t left; + PathSet valid; + std::exception_ptr exc; + }; - return ref<ValidPathInfo>(info); + Sync<State> state_(State{paths.size(), PathSet()}); + + std::condition_variable wakeup; + + for (auto & path : paths) + queryPathInfo(path, + [path, &state_, &wakeup](ref<ValidPathInfo> info) { + auto state(state_.lock()); + state->valid.insert(path); + assert(state->left); + if (!--state->left) + wakeup.notify_one(); + }, + [path, &state_, &wakeup](std::exception_ptr exc) { + auto state(state_.lock()); + try { + std::rethrow_exception(exc); + } catch (InvalidPath &) { + } catch (...) { + state->exc = exc; + } + assert(state->left); + if (!--state->left) + wakeup.notify_one(); + }); + + while (true) { + auto state(state_.lock()); + if (!state->left) { + if (state->exc) std::rethrow_exception(state->exc); + return state->valid; + } + state.wait(wakeup); + } } @@ -380,7 +456,31 @@ void copyStorePath(ref<Store> srcStore, ref<Store> dstStore, StringSink sink; srcStore->narFromPath({storePath}, sink); - dstStore->addToStore(*info, *sink.s, repair); + dstStore->addToStore(*info, sink.s, repair); +} + + +void copyClosure(ref<Store> srcStore, ref<Store> dstStore, + const PathSet & storePaths, bool repair) +{ + PathSet closure; + for (auto & path : storePaths) + srcStore->computeFSClosure(path, closure); + + PathSet valid = dstStore->queryValidPaths(closure); + + if (valid.size() == closure.size()) return; + + Paths sorted = srcStore->topoSortPaths(closure); + + Paths missing; + for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) + if (!valid.count(*i)) missing.push_back(*i); + + printMsg(lvlDebug, format("copying %1% missing paths") % missing.size()); + + for (auto & i : missing) + copyStorePath(srcStore, dstStore, i, repair); } @@ -442,7 +542,7 @@ void ValidPathInfo::sign(const SecretKey & secretKey) bool ValidPathInfo::isContentAddressed(const Store & store) const { auto warn = [&]() { - printMsg(lvlError, format("warning: path ‘%s’ claims to be content-addressed but isn't") % path); + printError(format("warning: path ‘%s’ claims to be content-addressed but isn't") % path); }; if (hasPrefix(ca, "text:")) { diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index ae1d51016e99..6762852cf30e 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -189,6 +189,9 @@ enum BuildMode { bmNormal, bmRepair, bmCheck, bmHash }; struct BuildResult { + /* Note: don't remove status codes, and only add new status codes + at the end of the list, to prevent client/server + incompatibilities in the nix-store --serve protocol. */ enum Status { Built = 0, Substituted, @@ -197,6 +200,7 @@ struct BuildResult InputRejected, OutputRejected, TransientFailure, // possibly transient + CachedFailure, // no longer used TimedOut, MiscFailure, DependencyFailed, @@ -307,7 +311,7 @@ protected: public: /* Query which of the given paths is valid. */ - virtual PathSet queryValidPaths(const PathSet & paths) = 0; + virtual PathSet queryValidPaths(const PathSet & paths); /* Query the set of all valid paths. Note that for some store backends, the name part of store paths may be omitted @@ -320,9 +324,16 @@ public: the name part of the store path. */ ref<const ValidPathInfo> queryPathInfo(const Path & path); + /* Asynchronous version of queryPathInfo(). */ + void queryPathInfo(const Path & path, + std::function<void(ref<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure); + protected: - virtual std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) = 0; + virtual void queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) = 0; public: @@ -359,8 +370,9 @@ public: virtual bool wantMassQuery() { return false; } /* Import a path into the store. */ - virtual void addToStore(const ValidPathInfo & info, const std::string & nar, - bool repair = false, bool dontCheckSigs = false) = 0; + virtual void addToStore(const ValidPathInfo & info, const ref<std::string> & nar, + bool repair = false, bool dontCheckSigs = false, + std::shared_ptr<FSAccessor> accessor = 0) = 0; /* Copy the contents of a path to the store and register the validity the resulting path. The resulting path is returned. @@ -568,6 +580,11 @@ void copyStorePath(ref<Store> srcStore, ref<Store> dstStore, const Path & storePath, bool repair = false); +/* Copy the closure of the specified paths from one store to another. */ +void copyClosure(ref<Store> srcStore, ref<Store> dstStore, + const PathSet & storePaths, bool repair = false); + + /* Remove the temporary roots file for this process. Any temporary root becomes garbage after this point unless it has been registered as a (permanent) root. */ diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 2cd246dab5df..6a4ed47cc9fa 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -6,7 +6,7 @@ namespace nix { #define WORKER_MAGIC_1 0x6e697863 #define WORKER_MAGIC_2 0x6478696f -#define PROTOCOL_VERSION 0x111 +#define PROTOCOL_VERSION 0x112 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) |