about summary refs log tree commit diff
path: root/src/libstore/s3-binary-cache-store.cc
diff options
context:
space:
mode:
authorEelco Dolstra <eelco.dolstra@logicblox.com>2016-09-16T16·54+0200
committerEelco Dolstra <eelco.dolstra@logicblox.com>2016-09-16T16·54+0200
commit75989bdca773eedb8b8d1cc8a7675900358acd25 (patch)
tree2d1dce1431662f441cead67d8754e96eb4db6807 /src/libstore/s3-binary-cache-store.cc
parent054be5025762c5e1c7e853c4fa5d7eed8da1727f (diff)
Make computeFSClosure() single-threaded again
The fact that queryPathInfo() is synchronous meant that we needed a
thread for every concurrent binary cache lookup, even though they end
up being handled by the same download thread. Requiring hundreds of
threads is not a good idea. So now there is an asynchronous version of
queryPathInfo() that takes a callback function to process the
result. Similarly, enqueueDownload() now takes a callback rather than
returning a future.

Thus, a command like

  nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5

that returns 4941 paths now takes 1.87s using only 2 threads (the main
thread and the downloader thread). (This is with a prewarmed
CloudFront.)
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r--src/libstore/s3-binary-cache-store.cc56
1 files changed, 30 insertions, 26 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index ed95620bbd..0722e43d5e 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -167,46 +167,50 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         stats.putTimeMs += duration;
     }
 
-    std::shared_ptr<std::string> getFile(const std::string & path) override
+    void getFile(const std::string & path,
+        std::function<void(std::shared_ptr<std::string>)> success,
+        std::function<void(std::exception_ptr exc)> failure) override
     {
-        debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path);
+        sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
+            debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path);
 
-        auto request =
-            Aws::S3::Model::GetObjectRequest()
-            .WithBucket(bucketName)
-            .WithKey(path);
+            auto request =
+                Aws::S3::Model::GetObjectRequest()
+                .WithBucket(bucketName)
+                .WithKey(path);
 
-        request.SetResponseStreamFactory([&]() {
-            return Aws::New<std::stringstream>("STRINGSTREAM");
-        });
+            request.SetResponseStreamFactory([&]() {
+                return Aws::New<std::stringstream>("STRINGSTREAM");
+            });
 
-        stats.get++;
+            stats.get++;
 
-        try {
+            try {
 
-            auto now1 = std::chrono::steady_clock::now();
+                auto now1 = std::chrono::steady_clock::now();
 
-            auto result = checkAws(format("AWS error fetching ‘%s’") % path,
-                client->GetObject(request));
+                auto result = checkAws(format("AWS error fetching ‘%s’") % path,
+                    client->GetObject(request));
 
-            auto now2 = std::chrono::steady_clock::now();
+                auto now2 = std::chrono::steady_clock::now();
 
-            auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
+                auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
 
-            auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+                auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
 
-            printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
-                % bucketName % path % res.size() % duration);
+                printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
+                    % bucketName % path % res.size() % duration);
 
-            stats.getBytes += res.size();
-            stats.getTimeMs += duration;
+                stats.getBytes += res.size();
+                stats.getTimeMs += duration;
 
-            return std::make_shared<std::string>(res);
+                return std::make_shared<std::string>(res);
 
-        } catch (S3Error & e) {
-            if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0;
-            throw;
-        }
+            } catch (S3Error & e) {
+                if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
+                throw;
+            }
+        });
     }
 
     PathSet queryAllValidPaths() override