about summary refs log tree commit diff
path: root/src/libstore/download.cc
diff options
context:
space:
mode:
authorEelco Dolstra <eelco.dolstra@logicblox.com>2016-09-16T16·54+0200
committerEelco Dolstra <eelco.dolstra@logicblox.com>2016-09-16T16·54+0200
commit75989bdca773eedb8b8d1cc8a7675900358acd25 (patch)
tree2d1dce1431662f441cead67d8754e96eb4db6807 /src/libstore/download.cc
parent054be5025762c5e1c7e853c4fa5d7eed8da1727f (diff)
Make computeFSClosure() single-threaded again
The fact that queryPathInfo() is synchronous meant that we needed a
thread for every concurrent binary cache lookup, even though they end
up being handled by the same download thread. Requiring hundreds of
threads is not a good idea. So now there is an asynchronous version of
queryPathInfo() that takes a callback function to process the
result. Similarly, enqueueDownload() now takes a callback rather than
returning a future.

Thus, a command like

  nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5

that returns 4941 paths now takes 1.87s using only 2 threads (the main
thread and the downloader thread). (This is with a prewarmed
CloudFront.)
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r--src/libstore/download.cc38
1 files changed, 27 insertions, 11 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index b2d223da96..ca324595a3 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -47,8 +47,9 @@ struct CurlDownloader : public Downloader
         CurlDownloader & downloader;
         DownloadRequest request;
         DownloadResult result;
-        bool done = false; // whether the promise has been set
-        std::promise<DownloadResult> promise;
+        bool done = false; // whether either the success or failure function has been called
+        std::function<void(const DownloadResult &)> success;
+        std::function<void(std::exception_ptr exc)> failure;
         CURL * req = 0;
         bool active = false; // whether the handle has been added to the multi object
         std::string status;
@@ -86,7 +87,7 @@ struct CurlDownloader : public Downloader
             if (requestHeaders) curl_slist_free_all(requestHeaders);
             try {
                 if (!done)
-                    fail(DownloadError(Transient, format("download of ‘%s’ was interrupted") % request.uri));
+                    fail(DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri));
             } catch (...) {
                 ignoreException();
             }
@@ -95,8 +96,9 @@ struct CurlDownloader : public Downloader
         template<class T>
         void fail(const T & e)
         {
-            promise.set_exception(std::make_exception_ptr(e));
+            assert(!done);
             done = true;
+            failure(std::make_exception_ptr(e));
         }
 
         size_t writeCallback(void * contents, size_t size, size_t nmemb)
@@ -239,7 +241,7 @@ struct CurlDownloader : public Downloader
                 (httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
             {
                 result.cached = httpStatus == 304;
-                promise.set_value(result);
+                success(result);
                 done = true;
             } else {
                 Error err =
@@ -253,9 +255,11 @@ struct CurlDownloader : public Downloader
                 attempt++;
 
                 auto exc =
-                    httpStatus != 0
-                    ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus)
-                    : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code);
+                    code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
+                    ? DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri)
+                    : httpStatus != 0
+                      ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus)
+                      : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code);
 
                 /* If this is a transient error, then maybe retry the
                    download after a while. */
@@ -414,7 +418,7 @@ struct CurlDownloader : public Downloader
     {
         try {
             workerThreadMain();
-        } catch (Interrupted & e) {
+        } catch (nix::Interrupted & e) {
         } catch (std::exception & e) {
             printMsg(lvlError, format("unexpected error in download thread: %s") % e.what());
         }
@@ -437,11 +441,14 @@ struct CurlDownloader : public Downloader
         writeFull(wakeupPipe.writeSide.get(), " ");
     }
 
-    std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) override
+    void enqueueDownload(const DownloadRequest & request,
+        std::function<void(const DownloadResult &)> success,
+        std::function<void(std::exception_ptr exc)> failure) override
     {
         auto item = std::make_shared<DownloadItem>(*this, request);
+        item->success = success;
+        item->failure = failure;
         enqueueItem(item);
-        return item->promise.get_future();
     }
 };
 
@@ -458,6 +465,15 @@ ref<Downloader> makeDownloader()
     return make_ref<CurlDownloader>();
 }
 
+std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & request)
+{
+    auto promise = std::make_shared<std::promise<DownloadResult>>();
+    enqueueDownload(request,
+        [promise](const DownloadResult & result) { promise->set_value(result); },
+        [promise](std::exception_ptr exc) { promise->set_exception(exc); });
+    return promise->get_future();
+}
+
 DownloadResult Downloader::download(const DownloadRequest & request)
 {
     return enqueueDownload(request).get();