diff options
author | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-09-16T16·54+0200 |
---|---|---|
committer | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-09-16T16·54+0200 |
commit | 75989bdca773eedb8b8d1cc8a7675900358acd25 (patch) | |
tree | 2d1dce1431662f441cead67d8754e96eb4db6807 /src/libstore/s3-binary-cache-store.cc | |
parent | 054be5025762c5e1c7e853c4fa5d7eed8da1727f (diff) |
Make computeFSClosure() single-threaded again
The fact that queryPathInfo() is synchronous meant that we needed a thread for every concurrent binary cache lookup, even though they end up being handled by the same download thread. Requiring hundreds of threads is not a good idea. So now there is an asynchronous version of queryPathInfo() that takes a callback function to process the result. Similarly, enqueueDownload() now takes a callback rather than returning a future. Thus, a command like nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5 that returns 4941 paths now takes 1.87s using only 2 threads (the main thread and the downloader thread). (This is with a prewarmed CloudFront.)
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 56 |
1 files changed, 30 insertions, 26 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index ed95620bbd7c..0722e43d5e77 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -167,46 +167,50 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore stats.putTimeMs += duration; } - std::shared_ptr<std::string> getFile(const std::string & path) override + void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) override { - debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); + sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { + debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); - auto request = - Aws::S3::Model::GetObjectRequest() - .WithBucket(bucketName) - .WithKey(path); + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(path); - request.SetResponseStreamFactory([&]() { - return Aws::New<std::stringstream>("STRINGSTREAM"); - }); + request.SetResponseStreamFactory([&]() { + return Aws::New<std::stringstream>("STRINGSTREAM"); + }); - stats.get++; + stats.get++; - try { + try { - auto now1 = std::chrono::steady_clock::now(); + auto now1 = std::chrono::steady_clock::now(); - auto result = checkAws(format("AWS error fetching ‘%s’") % path, - client->GetObject(request)); + auto result = checkAws(format("AWS error fetching ‘%s’") % path, + client->GetObject(request)); - auto now2 = std::chrono::steady_clock::now(); + auto now2 = std::chrono::steady_clock::now(); - auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); + auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); - printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % res.size() % duration); + printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + % bucketName % path % res.size() % duration); - stats.getBytes += res.size(); - stats.getTimeMs += duration; + stats.getBytes += res.size(); + stats.getTimeMs += duration; - return std::make_shared<std::string>(res); + return std::make_shared<std::string>(res); - } catch (S3Error & e) { - if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0; - throw; - } + } catch (S3Error & e) { + if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>(); + throw; + } + }); } PathSet queryAllValidPaths() override |