From 81ea8bd5ceb3dcae6af0b79c81a39ecbf2ba97a8 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Tue, 27 Mar 2018 22:16:01 +0200 Subject: Simplify the callback mechanism --- src/libstore/binary-cache-store.cc | 37 ++++++++------ src/libstore/binary-cache-store.hh | 6 +-- src/libstore/download.cc | 37 ++++++++------ src/libstore/download.hh | 3 +- src/libstore/http-binary-cache-store.cc | 18 +++---- src/libstore/legacy-ssh-store.cc | 11 ++-- src/libstore/local-binary-cache-store.cc | 17 +++--- src/libstore/local-store.cc | 13 +++-- src/libstore/local-store.hh | 3 +- src/libstore/misc.cc | 17 +++--- src/libstore/remote-store.cc | 9 ++-- src/libstore/remote-store.hh | 3 +- src/libstore/s3-binary-cache-store.cc | 9 ++-- src/libstore/store-api.cc | 88 +++++++++++++++----------------- src/libstore/store-api.hh | 6 +-- src/libutil/util.hh | 53 ++++++++----------- 16 files changed, 151 insertions(+), 179 deletions(-) (limited to 'src') diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 2e9a13e564ca..45be490765a4 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -58,12 +58,13 @@ std::shared_ptr BinaryCacheStore::getFile(const std::string & path) { std::promise> promise; getFile(path, - [&](std::shared_ptr result) { - promise.set_value(result); - }, - [&](std::exception_ptr exc) { - promise.set_exception(exc); - }); + {[&](std::future> result) { + try { + promise.set_value(result.get()); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }}); return promise.get_future().get(); } @@ -218,8 +219,7 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) } void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, - std::function)> success, - std::function failure) + Callback> callback) { auto uri = getUri(); auto act = std::make_shared(*logger, lvlTalkative, actQueryPathInfo, @@ -229,17 +229,22 @@ void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, auto narInfoFile = narInfoFileFor(storePath); getFile(narInfoFile, - [=](std::shared_ptr data) { - if (!data) return success(0); + {[=](std::future> fut) { + try { + auto data = fut.get(); + + if (!data) return callback(nullptr); - stats.narInfoRead++; + stats.narInfoRead++; - callSuccess(success, failure, (std::shared_ptr) - std::make_shared(*this, *data, narInfoFile)); + callback((std::shared_ptr) + std::make_shared(*this, *data, narInfoFile)); - (void) act; // force Activity into this lambda to ensure it stays alive - }, - failure); + (void) act; // force Activity into this lambda to ensure it stays alive + } catch (...) { + callback.rethrow(); + } + }}); } Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index e20b968442b7..fcde666beec6 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -41,8 +41,7 @@ public: /* Return the contents of the specified file, or null if it doesn't exist. */ virtual void getFile(const std::string & path, - std::function)> success, - std::function failure) = 0; + Callback> callback) = 0; std::shared_ptr getFile(const std::string & path); @@ -71,8 +70,7 @@ public: { unsupported(); } void queryPathInfoUncached(const Path & path, - std::function)> success, - std::function failure) override; + Callback> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override diff --git a/src/libstore/download.cc b/src/libstore/download.cc index fce701a15a09..afb066e1468e 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -81,8 +81,7 @@ struct CurlDownloader : public Downloader DownloadResult result; Activity act; bool done = false; // whether either the success or failure function has been called - std::function success; - std::function failure; + Callback callback; CURL * req = 0; bool active = false; // whether the handle has been added to the multi object std::string status; @@ -97,10 +96,13 @@ struct CurlDownloader : public Downloader std::string encoding; - DownloadItem(CurlDownloader & downloader, const DownloadRequest & request) + DownloadItem(CurlDownloader & downloader, + const DownloadRequest & request, + Callback callback) : downloader(downloader) , request(request) , act(*logger, lvlTalkative, actDownload, fmt("downloading '%s'", request.uri), {request.uri}, request.parentAct) + , callback(callback) { if (!request.expectedETag.empty()) requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); @@ -129,7 +131,7 @@ struct CurlDownloader : public Downloader { assert(!done); done = true; - callFailure(failure, std::make_exception_ptr(e)); + callback.rethrow(std::make_exception_ptr(e)); } size_t writeCallback(void * contents, size_t size, size_t nmemb) @@ -316,11 +318,11 @@ struct CurlDownloader : public Downloader try { if (request.decompress) result.data = decodeContent(encoding, ref(result.data)); - callSuccess(success, failure, const_cast(result)); act.progress(result.data->size(), result.data->size()); + callback(std::move(result)); } catch (...) { done = true; - callFailure(failure, std::current_exception()); + callback.rethrow(); } } else { // We treat most errors as transient, but won't retry when hopeless @@ -570,13 +572,12 @@ struct CurlDownloader : public Downloader } void enqueueDownload(const DownloadRequest & request, - std::function success, - std::function failure) override + Callback callback) override { /* Ugly hack to support s3:// URIs. */ if (hasPrefix(request.uri, "s3://")) { // FIXME: do this on a worker thread - sync2async(success, failure, [&]() -> DownloadResult { + try { #ifdef ENABLE_S3 S3Helper s3Helper("", Aws::Region::US_EAST_1); // FIXME: make configurable auto slash = request.uri.find('/', 5); @@ -590,18 +591,15 @@ struct CurlDownloader : public Downloader if (!s3Res.data) throw DownloadError(NotFound, fmt("S3 object '%s' does not exist", request.uri)); res.data = s3Res.data; - return res; + callback(std::move(res)); #else throw nix::Error("cannot download '%s' because Nix is not built with S3 support", request.uri); #endif - }); + } catch (...) { callback.rethrow(); } return; } - auto item = std::make_shared(*this, request); - item->success = success; - item->failure = failure; - enqueueItem(item); + enqueueItem(std::make_shared(*this, request, callback)); } }; @@ -622,8 +620,13 @@ std::future Downloader::enqueueDownload(const DownloadRequest & { auto promise = std::make_shared>(); enqueueDownload(request, - [promise](const DownloadResult & result) { promise->set_value(result); }, - [promise](std::exception_ptr exc) { promise->set_exception(exc); }); + {[promise](std::future fut) { + try { + promise->set_value(fut.get()); + } catch (...) { + promise->set_exception(std::current_exception()); + } + }}); return promise->get_future(); } diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 7ade756fc356..01940f5447fe 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -42,8 +42,7 @@ struct Downloader the download. The future may throw a DownloadError exception. */ virtual void enqueueDownload(const DownloadRequest & request, - std::function success, - std::function failure) = 0; + Callback callback) = 0; std::future enqueueDownload(const DownloadRequest & request); diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index b9e9cd5daba5..b8d670417aa2 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -78,27 +78,23 @@ protected: } void getFile(const std::string & path, - std::function)> success, - std::function failure) override + Callback> callback) override { DownloadRequest request(cacheUri + "/" + path); request.tries = 8; getDownloader()->enqueueDownload(request, - [success](const DownloadResult & result) { - success(result.data); - }, - [success, failure](std::exception_ptr exc) { + {[callback](std::future result) { try { - std::rethrow_exception(exc); + callback(result.get().data); } catch (DownloadError & e) { if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return success(0); - failure(exc); + return callback(std::shared_ptr()); + callback.rethrow(); } catch (...) { - failure(exc); + callback.rethrow(); } - }); + }}); } }; diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc index 5dee25308f7f..02d91ded04cd 100644 --- a/src/libstore/legacy-ssh-store.cc +++ b/src/libstore/legacy-ssh-store.cc @@ -84,10 +84,9 @@ struct LegacySSHStore : public Store } void queryPathInfoUncached(const Path & path, - std::function)> success, - std::function failure) override + Callback> callback) override { - sync2async>(success, failure, [&]() -> std::shared_ptr { + try { auto conn(connections->get()); debug("querying remote host '%s' for info on '%s'", host, path); @@ -97,7 +96,7 @@ struct LegacySSHStore : public Store auto info = std::make_shared(); conn->from >> info->path; - if (info->path.empty()) return nullptr; + if (info->path.empty()) return callback(nullptr); assert(path == info->path); PathSet references; @@ -116,8 +115,8 @@ struct LegacySSHStore : public Store auto s = readString(conn->from); assert(s == ""); - return info; - }); + callback(std::move(info)); + } catch (...) { callback.rethrow(); } } void addToStore(const ValidPathInfo & info, Source & source, diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 2577e90aef23..ae0ffa6a56ef 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -35,17 +35,14 @@ protected: const std::string & mimeType) override; void getFile(const std::string & path, - std::function)> success, - std::function failure) override + Callback> callback) override { - sync2async>(success, failure, [&]() { - try { - return std::make_shared(readFile(binaryCacheDir + "/" + path)); - } catch (SysError & e) { - if (e.errNo == ENOENT) return std::shared_ptr(); - throw; - } - }); + try { + // FIXME: O(n) space + callback(std::make_shared(readFile(binaryCacheDir + "/" + path))); + } catch (SysError & e) { + if (e.errNo == ENOENT) callback(nullptr); else callback.rethrow(); + } catch (...) { callback.rethrow(); } } PathSet queryAllValidPaths() override diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc index 681abafef237..3b2ba65f3b46 100644 --- a/src/libstore/local-store.cc +++ b/src/libstore/local-store.cc @@ -629,17 +629,15 @@ uint64_t LocalStore::addValidPath(State & state, void LocalStore::queryPathInfoUncached(const Path & path, - std::function)> success, - std::function failure) + Callback> callback) { - sync2async>(success, failure, [&]() { - + try { auto info = std::make_shared(); info->path = path; assertStorePath(path); - return retrySQLite>([&]() { + callback(retrySQLite>([&]() { auto state(_state.lock()); /* Get the path info. */ @@ -679,8 +677,9 @@ void LocalStore::queryPathInfoUncached(const Path & path, info->references.insert(useQueryReferences.getStr(0)); return info; - }); - }); + })); + + } catch (...) { callback.rethrow(); } } diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh index 1209a06356f7..746bdbeed793 100644 --- a/src/libstore/local-store.hh +++ b/src/libstore/local-store.hh @@ -127,8 +127,7 @@ public: PathSet queryAllValidPaths() override; void queryPathInfoUncached(const Path & path, - std::function)> success, - std::function failure) override; + Callback> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index a82aa4e9cfa5..adcce026fa1d 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -33,9 +33,11 @@ void Store::computeFSClosure(const PathSet & startPaths, state->pending++; } - queryPathInfo(path, - [&, path](ref info) { - // FIXME: calls to isValidPath() should be async + queryPathInfo(path, {[&, path](std::future> fut) { + // FIXME: calls to isValidPath() should be async + + try { + auto info = fut.get(); if (flipDirection) { @@ -75,14 +77,13 @@ void Store::computeFSClosure(const PathSet & startPaths, if (!--state->pending) done.notify_one(); } - }, - - [&, path](std::exception_ptr exc) { + } catch (...) { auto state(state_.lock()); - if (!state->exc) state->exc = exc; + if (!state->exc) state->exc = std::current_exception(); assert(state->pending); if (!--state->pending) done.notify_one(); - }); + }; + }}); }; for (auto & startPath : startPaths) diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index a157c6c48582..b72c940cdcf9 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -294,10 +294,9 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, void RemoteStore::queryPathInfoUncached(const Path & path, - std::function)> success, - std::function failure) + Callback> callback) { - sync2async>(success, failure, [&]() { + try { auto conn(connections->get()); conn->to << wopQueryPathInfo << path; try { @@ -324,8 +323,8 @@ void RemoteStore::queryPathInfoUncached(const Path & path, info->sigs = readStrings(conn->from); conn->from >> info->ca; } - return info; - }); + callback(std::move(info)); + } catch (...) { callback.rethrow(); } } diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index 95fa59a2069d..b488e34ce263 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -40,8 +40,7 @@ public: PathSet queryAllValidPaths() override; void queryPathInfoUncached(const Path & path, - std::function)> success, - std::function failure) override; + Callback> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 103f141a1a11..f2e8efc16e60 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -365,10 +365,9 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore } void getFile(const std::string & path, - std::function)> success, - std::function failure) override + Callback> callback) override { - sync2async>(success, failure, [&]() { + try { stats.get++; auto res = s3Helper.getObject(bucketName, path); @@ -380,8 +379,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms", bucketName, path, res.data->size(), res.durationMs); - return res.data; - }); + callback(std::move(res.data)); + } catch (...) { callback.rethrow(); } } PathSet queryAllValidPaths() override diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 49b32d115849..9b0b7d6327e0 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -305,20 +305,20 @@ ref Store::queryPathInfo(const Path & storePath) std::promise> promise; queryPathInfo(storePath, - [&](ref info) { - promise.set_value(info); - }, - [&](std::exception_ptr exc) { - promise.set_exception(exc); - }); + {[&](std::future> result) { + try { + promise.set_value(result.get()); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }}); return promise.get_future().get(); } void Store::queryPathInfo(const Path & storePath, - std::function)> success, - std::function failure) + Callback> callback) { auto hashPart = storePathToHash(storePath); @@ -330,7 +330,7 @@ void Store::queryPathInfo(const Path & storePath, stats.narInfoReadAverted++; if (!*res) throw InvalidPath(format("path '%s' is not valid") % storePath); - return success(ref(*res)); + return callback(ref(*res)); } } @@ -346,35 +346,36 @@ void Store::queryPathInfo(const Path & storePath, (res.second->path != storePath && storePathToName(storePath) != "")) throw InvalidPath(format("path '%s' is not valid") % storePath); } - return success(ref(res.second)); + return callback(ref(res.second)); } } - } catch (std::exception & e) { - return callFailure(failure); - } + } catch (...) { return callback.rethrow(); } queryPathInfoUncached(storePath, - [this, storePath, hashPart, success, failure](std::shared_ptr info) { + {[this, storePath, hashPart, callback](std::future> fut) { - if (diskCache) - diskCache->upsertNarInfo(getUri(), hashPart, info); + try { + auto info = fut.get(); - { - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, info); - } + if (diskCache) + diskCache->upsertNarInfo(getUri(), hashPart, info); - if (!info - || (info->path != storePath && storePathToName(storePath) != "")) - { - stats.narInfoMissing++; - return failure(std::make_exception_ptr(InvalidPath(format("path '%s' is not valid") % storePath))); - } + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, info); + } - callSuccess(success, failure, ref(info)); + if (!info + || (info->path != storePath && storePathToName(storePath) != "")) + { + stats.narInfoMissing++; + throw InvalidPath("path '%s' is not valid", storePath); + } - }, failure); + callback(ref(info)); + } catch (...) { callback.rethrow(); } + }}); } @@ -394,26 +395,19 @@ PathSet Store::queryValidPaths(const PathSet & paths, SubstituteFlag maybeSubsti auto doQuery = [&](const Path & path ) { checkInterrupt(); - queryPathInfo(path, - [path, &state_, &wakeup](ref info) { - auto state(state_.lock()); + queryPathInfo(path, {[path, &state_, &wakeup](std::future> fut) { + auto state(state_.lock()); + try { + auto info = fut.get(); state->valid.insert(path); - assert(state->left); - if (!--state->left) - wakeup.notify_one(); - }, - [path, &state_, &wakeup](std::exception_ptr exc) { - auto state(state_.lock()); - try { - std::rethrow_exception(exc); - } catch (InvalidPath &) { - } catch (...) { - state->exc = exc; - } - assert(state->left); - if (!--state->left) - wakeup.notify_one(); - }); + } catch (InvalidPath &) { + } catch (...) { + state->exc = std::current_exception(); + } + assert(state->left); + if (!--state->left) + wakeup.notify_one(); + }}); }; for (auto & path : paths) diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index ea259f07e8ab..6ee2d550679b 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -355,14 +355,12 @@ public: /* Asynchronous version of queryPathInfo(). */ void queryPathInfo(const Path & path, - std::function)> success, - std::function failure); + Callback> callback); protected: virtual void queryPathInfoUncached(const Path & path, - std::function)> success, - std::function failure) = 0; + Callback> callback) = 0; public: diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 743d238611fc..215c7cecafef 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -15,6 +15,7 @@ #include #include #include +#include #ifndef HAVE_STRUCT_DIRENT_D_TYPE #define DT_UNKNOWN 0 @@ -424,44 +425,30 @@ string get(const T & map, const string & key, const string & def = "") } -/* Call ‘failure’ with the current exception as argument. If ‘failure’ - throws an exception, abort the program. */ -void callFailure(const std::function & failure, - std::exception_ptr exc = std::current_exception()); +/* A callback is a wrapper around a lambda that accepts a valid of + type T or an exception. (We abuse std::future to pass the value or + exception.) */ +template +struct Callback +{ + std::function)> fun; + Callback(std::function)> fun) : fun(fun) { } -/* Evaluate the function ‘f’. If it returns a value, call ‘success’ - with that value as its argument. If it or ‘success’ throws an - exception, call ‘failure’. If ‘failure’ throws an exception, abort - the program. */ -template -void sync2async( - const std::function & success, - const std::function & failure, - const std::function & f) -{ - try { - success(f()); - } catch (...) { - callFailure(failure); + void operator()(T && t) const + { + std::promise promise; + promise.set_value(std::move(t)); + fun(promise.get_future()); } -} - -/* Call the function ‘success’. If it throws an exception, call - ‘failure’. If that throws an exception, abort the program. */ -template -void callSuccess( - const std::function & success, - const std::function & failure, - T && arg) -{ - try { - success(arg); - } catch (...) { - callFailure(failure); + void rethrow(const std::exception_ptr & exc = std::current_exception()) const + { + std::promise promise; + promise.set_exception(exc); + fun(promise.get_future()); } -} +}; /* Start a thread that handles various signals. Also block those signals -- cgit 1.4.1