about summary refs log tree commit diff
path: root/src/libstore/download.cc
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2018-03-27T22·01+0200
committerEelco Dolstra <edolstra@gmail.com>2018-05-30T11·42+0200
commite87e4a60d617bffadfedf23032254130cdb4d54d (patch)
tree2d17899e41f6372d0b64e74b2c260170248d1acd /src/libstore/download.cc
parent08ec757726e5ef47e71bf16ed0b252b288bcf0f3 (diff)
Make HttpBinaryCacheStore::narFromPath() run in constant memory
This reduces memory consumption of

  nix copy --from https://cache.nixos.org --to ~/my-nix /nix/store/95cwv4q54dc6giaqv6q6p4r02ia2km35-blender-2.79

from 176 MiB to 82 MiB. (The remaining memory is probably due to xz
decompression overhead.)

Issue https://github.com/NixOS/nix/issues/1681.
Issue https://github.com/NixOS/nix/issues/1969.
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r--src/libstore/download.cc92
1 files changed, 91 insertions, 1 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index afb066e1468e..d450714ca50f 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -7,6 +7,7 @@
 #include "s3.hh"
 #include "compression.hh"
 #include "pathlocks.hh"
+#include "finally.hh"
 
 #ifdef ENABLE_S3
 #include <aws/core/client/ClientConfiguration.h>
@@ -137,7 +138,10 @@ struct CurlDownloader : public Downloader
         size_t writeCallback(void * contents, size_t size, size_t nmemb)
         {
             size_t realSize = size * nmemb;
-            result.data->append((char *) contents, realSize);
+            if (request.dataCallback)
+                request.dataCallback((char *) contents, realSize);
+            else
+                result.data->append((char *) contents, realSize);
             return realSize;
         }
 
@@ -635,6 +639,92 @@ DownloadResult Downloader::download(const DownloadRequest & request)
     return enqueueDownload(request).get();
 }
 
+void Downloader::download(DownloadRequest && request, Sink & sink)
+{
+    /* Note: we can't call 'sink' via request.dataCallback, because
+       that would cause the sink to execute on the downloader
+       thread. If 'sink' is a coroutine, this will fail. Also, if the
+       sink is expensive (e.g. one that does decompression and writing
+       to the Nix store), it would stall the download thread too much.
+       Therefore we use a buffer to communicate data between the
+       download thread and the calling thread. */
+
+    struct State {
+        bool quit = false;
+        std::exception_ptr exc;
+        std::string data;
+        std::condition_variable avail, request;
+    };
+
+    auto _state = std::make_shared<Sync<State>>();
+
+    /* In case of an exception, wake up the download thread. FIXME:
+       abort the download request. */
+    Finally finally([&]() {
+        auto state(_state->lock());
+        state->quit = true;
+        state->request.notify_one();
+    });
+
+    request.dataCallback = [_state](char * buf, size_t len) {
+
+        auto state(_state->lock());
+
+        if (state->quit) return;
+
+        /* If the buffer is full, then go to sleep until the calling
+           thread wakes us up (i.e. when it has removed data from the
+           buffer). Note: this does stall the download thread. */
+        while (state->data.size() > 1024 * 1024) {
+            if (state->quit) return;
+            debug("download buffer is full; going to sleep");
+            state.wait(state->request);
+        }
+
+        /* Append data to the buffer and wake up the calling
+           thread. */
+        state->data.append(buf, len);
+        state->avail.notify_one();
+    };
+
+    enqueueDownload(request,
+        {[_state](std::future<DownloadResult> fut) {
+            auto state(_state->lock());
+            state->quit = true;
+            try {
+                fut.get();
+            } catch (...) {
+                state->exc = std::current_exception();
+            }
+            state->avail.notify_one();
+            state->request.notify_one();
+        }});
+
+    auto state(_state->lock());
+
+    while (true) {
+        checkInterrupt();
+
+        if (state->quit) {
+            if (state->exc) std::rethrow_exception(state->exc);
+            break;
+        }
+
+        /* If no data is available, then wait for the download thread
+           to wake us up. */
+        if (state->data.empty())
+            state.wait(state->avail);
+
+        /* If data is available, then flush it to the sink and wake up
+           the download thread if it's blocked on a full buffer. */
+        if (!state->data.empty()) {
+            sink((unsigned char *) state->data.data(), state->data.size());
+            state->data.clear();
+            state->request.notify_one();
+        }
+    }
+}
+
 Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl, int ttl)
 {
     auto url = resolveUri(url_);