about summary refs log tree commit diff
path: root/src/libstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore')
-rw-r--r--src/libstore/binary-cache-store.cc18
-rw-r--r--src/libstore/build.cc38
-rw-r--r--src/libstore/download.cc103
-rw-r--r--src/libstore/download.hh11
-rw-r--r--src/libstore/http-binary-cache-store.cc56
-rw-r--r--src/libstore/store-api.cc107
6 files changed, 199 insertions, 134 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index 8b736056e01d..4527ee6ba660 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -10,8 +10,6 @@
 #include "nar-info-disk-cache.hh"
 #include "nar-accessor.hh"
 #include "json.hh"
-#include "retry.hh"
-#include "download.hh"
 
 #include <chrono>
 
@@ -81,15 +79,13 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink)
 
 std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
 {
-    return retry<std::shared_ptr<std::string>>(downloadSettings.tries, [&]() -> std::shared_ptr<std::string> {
-        StringSink sink;
-        try {
-            getFile(path, sink);
-        } catch (NoSuchBinaryCacheFile &) {
-            return nullptr;
-        }
-        return sink.s;
-    });
+    StringSink sink;
+    try {
+        getFile(path, sink);
+    } catch (NoSuchBinaryCacheFile &) {
+        return nullptr;
+    }
+    return sink.s;
 }
 
 Path BinaryCacheStore::narInfoFileFor(const Path & storePath)
diff --git a/src/libstore/build.cc b/src/libstore/build.cc
index 813d7e2c2d08..cf6428e12467 100644
--- a/src/libstore/build.cc
+++ b/src/libstore/build.cc
@@ -266,6 +266,12 @@ public:
     /* Set if at least one derivation had a timeout. */
     bool timedOut;
 
+    /* Set if at least one derivation fails with a hash mismatch. */
+    bool hashMismatch;
+
+    /* Set if at least one derivation is not deterministic in check mode. */
+    bool checkMismatch;
+
     LocalStore & store;
 
     std::unique_ptr<HookInstance> hook;
@@ -3219,6 +3225,7 @@ void DerivationGoal::registerOutputs()
 
                 /* Throw an error after registering the path as
                    valid. */
+                worker.hashMismatch = true;
                 delayedException = std::make_exception_ptr(
                     BuildError("hash mismatch in fixed-output derivation '%s':\n  wanted: %s\n  got:    %s",
                         dest, h.to_string(), h2.to_string()));
@@ -3261,6 +3268,7 @@ void DerivationGoal::registerOutputs()
             if (!worker.store.isValidPath(path)) continue;
             auto info = *worker.store.queryPathInfo(path);
             if (hash.first != info.narHash) {
+                worker.checkMismatch = true;
                 if (settings.runDiffHook || settings.keepFailed) {
                     Path dst = worker.store.toRealPath(path + checkSuffix);
                     deletePath(dst);
@@ -3272,10 +3280,10 @@ void DerivationGoal::registerOutputs()
                         buildUser ? buildUser->getGID() : getgid(),
                         path, dst, drvPath, tmpDir);
 
-                    throw Error(format("derivation '%1%' may not be deterministic: output '%2%' differs from '%3%'")
+                    throw NotDeterministic(format("derivation '%1%' may not be deterministic: output '%2%' differs from '%3%'")
                         % drvPath % path % dst);
                 } else
-                    throw Error(format("derivation '%1%' may not be deterministic: output '%2%' differs")
+                    throw NotDeterministic(format("derivation '%1%' may not be deterministic: output '%2%' differs")
                         % drvPath % path);
             }
 
@@ -4107,6 +4115,8 @@ Worker::Worker(LocalStore & store)
     lastWokenUp = steady_time_point::min();
     permanentFailure = false;
     timedOut = false;
+    hashMismatch = false;
+    checkMismatch = false;
 }
 
 
@@ -4467,7 +4477,29 @@ void Worker::waitForInput()
 
 unsigned int Worker::exitStatus()
 {
-    return timedOut ? 101 : (permanentFailure ? 100 : 1);
+    /*
+     * 1100100
+     *    ^^^^
+     *    |||`- timeout
+     *    ||`-- output hash mismatch
+     *    |`--- build failure
+     *    `---- not deterministic
+     */
+    unsigned int mask = 0;
+    bool buildFailure = permanentFailure || timedOut || hashMismatch;
+    if (buildFailure)
+        mask |= 0x04;  // 100
+    if (timedOut)
+        mask |= 0x01;  // 101
+    if (hashMismatch)
+        mask |= 0x02;  // 102
+    if (checkMismatch) {
+        mask |= 0x08;  // 104
+    }
+
+    if (mask)
+        mask |= 0x60;
+    return mask ? mask : 1;
 }
 
 
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 6ce9525c38de..91087eebcfcb 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -8,7 +8,6 @@
 #include "compression.hh"
 #include "pathlocks.hh"
 #include "finally.hh"
-#include "retry.hh"
 
 #ifdef ENABLE_S3
 #include <aws/core/client/ClientConfiguration.h>
@@ -20,9 +19,11 @@
 #include <curl/curl.h>
 
 #include <algorithm>
+#include <cmath>
 #include <cstring>
 #include <iostream>
 #include <queue>
+#include <random>
 #include <thread>
 
 using namespace std::string_literals;
@@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader
 {
     CURLM * curlm = 0;
 
+    std::random_device rd;
+    std::mt19937 mt19937;
+
     struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
     {
         CurlDownloader & downloader;
@@ -57,10 +61,20 @@ struct CurlDownloader : public Downloader
         bool active = false; // whether the handle has been added to the multi object
         std::string status;
 
+        unsigned int attempt = 0;
+
+        /* Don't start this download until the specified time point
+           has been reached. */
+        std::chrono::steady_clock::time_point embargo;
+
         struct curl_slist * requestHeaders = 0;
 
         std::string encoding;
 
+        bool acceptRanges = false;
+
+        curl_off_t writtenToSink = 0;
+
         DownloadItem(CurlDownloader & downloader,
             const DownloadRequest & request,
             Callback<DownloadResult> callback)
@@ -71,9 +85,10 @@ struct CurlDownloader : public Downloader
                 {request.uri}, request.parentAct)
             , callback(callback)
             , finalSink([this](const unsigned char * data, size_t len) {
-                if (this->request.dataCallback)
+                if (this->request.dataCallback) {
+                    writtenToSink += len;
                     this->request.dataCallback((char *) data, len);
-                else
+                } else
                     this->result.data->append((char *) data, len);
               })
         {
@@ -151,6 +166,7 @@ struct CurlDownloader : public Downloader
                 status = ss.size() >= 2 ? ss[1] : "";
                 result.data = std::make_shared<std::string>();
                 result.bodySize = 0;
+                acceptRanges = false;
                 encoding = "";
             } else {
                 auto i = line.find(':');
@@ -168,7 +184,9 @@ struct CurlDownloader : public Downloader
                             return 0;
                         }
                     } else if (name == "content-encoding")
-                        encoding = trim(string(line, i + 1));;
+                        encoding = trim(string(line, i + 1));
+                    else if (name == "accept-ranges" && toLower(trim(std::string(line, i + 1))) == "bytes")
+                        acceptRanges = true;
                 }
             }
             return realSize;
@@ -244,6 +262,8 @@ struct CurlDownloader : public Downloader
             #if LIBCURL_VERSION_NUM >= 0x072f00
             if (downloadSettings.enableHttp2)
                 curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
+            else
+                curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
             #endif
             curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper);
             curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
@@ -284,6 +304,9 @@ struct CurlDownloader : public Downloader
             curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str());
             curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
 
+            if (writtenToSink)
+                curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink);
+
             result.data = std::make_shared<std::string>();
             result.bodySize = 0;
         }
@@ -318,7 +341,7 @@ struct CurlDownloader : public Downloader
                 failEx(writeException);
 
             else if (code == CURLE_OK &&
-                (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
+                (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 206 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
             {
                 result.cached = httpStatus == 304;
                 done = true;
@@ -375,7 +398,9 @@ struct CurlDownloader : public Downloader
                     }
                 }
 
-                fail(
+                attempt++;
+
+                auto exc =
                     code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
                     ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
                     : httpStatus != 0
@@ -386,15 +411,41 @@ struct CurlDownloader : public Downloader
                         )
                     : DownloadError(err,
                         fmt("unable to %s '%s': %s (%d)",
-                            request.verb(), request.uri, curl_easy_strerror(code), code)));
+                            request.verb(), request.uri, curl_easy_strerror(code), code));
+
+                /* If this is a transient error, then maybe retry the
+                   download after a while. If we're writing to a
+                   sink, we can only retry if the server supports
+                   ranged requests. */
+                if (err == Transient
+                    && attempt < request.tries
+                    && (!this->request.dataCallback
+                        || writtenToSink == 0
+                        || (acceptRanges && encoding.empty())))
+                {
+                    int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
+                    if (writtenToSink)
+                        warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
+                    else
+                        warn("%s; retrying in %d ms", exc.what(), ms);
+                    embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
+                    downloader.enqueueItem(shared_from_this());
+                }
+                else
+                    fail(exc);
             }
         }
     };
 
     struct State
     {
+        struct EmbargoComparator {
+            bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
+                return i1->embargo > i2->embargo;
+            }
+        };
         bool quit = false;
-        std::vector<std::shared_ptr<DownloadItem>> incoming;
+        std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
     };
 
     Sync<State> state_;
@@ -407,6 +458,7 @@ struct CurlDownloader : public Downloader
     std::thread workerThread;
 
     CurlDownloader()
+        : mt19937(rd())
     {
         static std::once_flag globalInit;
         std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@@ -500,7 +552,9 @@ struct CurlDownloader : public Downloader
 
             nextWakeup = std::chrono::steady_clock::time_point();
 
-            /* Add new curl requests from the incoming requests queue. */
+            /* Add new curl requests from the incoming requests queue,
+               except for requests that are embargoed (waiting for a
+               retry timeout to expire). */
             if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
                 char buf[1024];
                 auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -509,9 +563,22 @@ struct CurlDownloader : public Downloader
             }
 
             std::vector<std::shared_ptr<DownloadItem>> incoming;
+            auto now = std::chrono::steady_clock::now();
+
             {
                 auto state(state_.lock());
-                std::swap(state->incoming, incoming);
+                while (!state->incoming.empty()) {
+                    auto item = state->incoming.top();
+                    if (item->embargo <= now) {
+                        incoming.push_back(item);
+                        state->incoming.pop();
+                    } else {
+                        if (nextWakeup == std::chrono::steady_clock::time_point()
+                            || item->embargo < nextWakeup)
+                            nextWakeup = item->embargo;
+                        break;
+                    }
+                }
                 quit = state->quit;
             }
 
@@ -533,12 +600,12 @@ struct CurlDownloader : public Downloader
             workerThreadMain();
         } catch (nix::Interrupted & e) {
         } catch (std::exception & e) {
-            printError(format("unexpected error in download thread: %s") % e.what());
+            printError("unexpected error in download thread: %s", e.what());
         }
 
         {
             auto state(state_.lock());
-            state->incoming.clear();
+            while (!state->incoming.empty()) state->incoming.pop();
             state->quit = true;
         }
     }
@@ -554,7 +621,7 @@ struct CurlDownloader : public Downloader
             auto state(state_.lock());
             if (state->quit)
                 throw nix::Error("cannot enqueue download request because the download thread is shutting down");
-            state->incoming.push_back(item);
+            state->incoming.push(item);
         }
         writeFull(wakeupPipe.writeSide.get(), " ");
     }
@@ -637,9 +704,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
 
 DownloadResult Downloader::download(const DownloadRequest & request)
 {
-    return retry<DownloadResult>(request.tries, [&]() {
-        return enqueueDownload(request).get();
-    });
+    return enqueueDownload(request).get();
 }
 
 void Downloader::download(DownloadRequest && request, Sink & sink)
@@ -825,7 +890,7 @@ CachedDownloadResult Downloader::downloadCached(
             writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
         } catch (DownloadError & e) {
             if (storePath.empty()) throw;
-            warn("%s; using cached result", e.msg());
+            warn("warning: %s; using cached result", e.msg());
             result.etag = expectedETag;
         }
     }
@@ -853,10 +918,11 @@ CachedDownloadResult Downloader::downloadCached(
     }
 
     if (expectedStorePath != "" && storePath != expectedStorePath) {
+        unsigned int statusCode = 102;
         Hash gotHash = request.unpack
             ? hashPath(request.expectedHash.type, store->toRealPath(storePath)).first
             : hashFile(request.expectedHash.type, store->toRealPath(storePath));
-        throw nix::Error("hash mismatch in file downloaded from '%s':\n  wanted: %s\n  got:    %s",
+        throw nix::Error(statusCode, "hash mismatch in file downloaded from '%s':\n  wanted: %s\n  got:    %s",
             url, request.expectedHash.to_string(), gotHash.to_string());
     }
 
@@ -875,4 +941,5 @@ bool isUri(const string & s)
     return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
 }
 
+
 }
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index 9e965b506d0a..3b7fff3ba4cb 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -96,13 +96,11 @@ struct Downloader
 
     std::future<DownloadResult> enqueueDownload(const DownloadRequest & request);
 
-    /* Synchronously download a file. The request will be retried in
-       case of transient failures. */
+    /* Synchronously download a file. */
     DownloadResult download(const DownloadRequest & request);
 
     /* Download a file, writing its data to a sink. The sink will be
-       invoked on the thread of the caller. The request will not be
-       retried in case of transient failures. */
+       invoked on the thread of the caller. */
     void download(DownloadRequest && request, Sink & sink);
 
     /* Check if the specified file is already in ~/.cache/nix/tarballs
@@ -128,11 +126,6 @@ public:
     DownloadError(Downloader::Error error, const FormatOrString & fs)
         : Error(fs), error(error)
     { }
-
-    bool isTransient() override
-    {
-        return error == Downloader::Error::Transient;
-    }
 };
 
 bool isUri(const string & s);
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index 5633b4355d25..df2fb93320fc 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -2,7 +2,6 @@
 #include "download.hh"
 #include "globals.hh"
 #include "nar-info-disk-cache.hh"
-#include "retry.hh"
 
 namespace nix {
 
@@ -136,46 +135,21 @@ protected:
     {
         checkEnabled();
 
-        struct State
-        {
-            DownloadRequest request;
-            std::function<void()> tryDownload;
-            unsigned int attempt = 0;
-            State(DownloadRequest && request) : request(request) {}
-        };
-
-        auto state = std::make_shared<State>(makeRequest(path));
-
-        state->tryDownload = [callback, state, this]() {
-            getDownloader()->enqueueDownload(state->request,
-                {[callback, state, this](std::future<DownloadResult> result) {
-                    try {
-                        callback(result.get().data);
-                    } catch (DownloadError & e) {
-                        if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
-                            return callback(std::shared_ptr<std::string>());
-                        ++state->attempt;
-                        if (state->attempt < state->request.tries && e.isTransient()) {
-                            auto ms = retrySleepTime(state->attempt);
-                            warn("%s; retrying in %d ms", e.what(), ms);
-                            /* We can't sleep here because that would
-                               block the download thread. So use a
-                               separate thread for sleeping. */
-                            std::thread([state, ms]() {
-                                std::this_thread::sleep_for(std::chrono::milliseconds(ms));
-                                state->tryDownload();
-                            }).detach();
-                        } else {
-                            maybeDisable();
-                            callback.rethrow();
-                        }
-                    } catch (...) {
-                        callback.rethrow();
-                    }
-                }});
-        };
-
-        state->tryDownload();
+        auto request(makeRequest(path));
+
+        getDownloader()->enqueueDownload(request,
+            {[callback, this](std::future<DownloadResult> result) {
+                try {
+                    callback(result.get().data);
+                } catch (DownloadError & e) {
+                    if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
+                        return callback(std::shared_ptr<std::string>());
+                    maybeDisable();
+                    callback.rethrow();
+                } catch (...) {
+                    callback.rethrow();
+                }
+            }});
     }
 
 };
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index 28ad7c019a94..f5608d3849f1 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -6,11 +6,10 @@
 #include "thread-pool.hh"
 #include "json.hh"
 #include "derivations.hh"
-#include "retry.hh"
-#include "download.hh"
 
 #include <future>
 
+
 namespace nix {
 
 
@@ -86,18 +85,25 @@ string storePathToHash(const Path & path)
 void checkStoreName(const string & name)
 {
     string validChars = "+-._?=";
+
+    auto baseError = format("The path name '%2%' is invalid: %3%. "
+        "Path names are alphanumeric and can include the symbols %1% "
+        "and must not begin with a period. "
+        "Note: If '%2%' is a source file and you cannot rename it on "
+        "disk, builtins.path { name = ... } can be used to give it an "
+        "alternative name.") % validChars % name;
+
     /* Disallow names starting with a dot for possible security
        reasons (e.g., "." and ".."). */
     if (string(name, 0, 1) == ".")
-        throw Error(format("illegal name: '%1%'") % name);
+        throw Error(baseError % "it is illegal to start the name with a period");
     for (auto & i : name)
         if (!((i >= 'A' && i <= 'Z') ||
               (i >= 'a' && i <= 'z') ||
               (i >= '0' && i <= '9') ||
               validChars.find(i) != string::npos))
         {
-            throw Error(format("invalid character '%1%' in name '%2%'")
-                % i % name);
+            throw Error(baseError % (format("the '%1%' character is invalid") % i));
         }
 }
 
@@ -573,57 +579,54 @@ void Store::buildPaths(const PathSet & paths, BuildMode buildMode)
 void copyStorePath(ref<Store> srcStore, ref<Store> dstStore,
     const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs)
 {
-    retry<void>(downloadSettings.tries, [&]() {
-
-        auto srcUri = srcStore->getUri();
-        auto dstUri = dstStore->getUri();
-
-        Activity act(*logger, lvlInfo, actCopyPath,
-            srcUri == "local" || srcUri == "daemon"
-              ? fmt("copying path '%s' to '%s'", storePath, dstUri)
-              : dstUri == "local" || dstUri == "daemon"
-                ? fmt("copying path '%s' from '%s'", storePath, srcUri)
-                : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri),
-            {storePath, srcUri, dstUri});
-        PushActivity pact(act.id);
-
-        auto info = srcStore->queryPathInfo(storePath);
-
-        uint64_t total = 0;
-
-        if (!info->narHash) {
-            StringSink sink;
-            srcStore->narFromPath({storePath}, sink);
-            auto info2 = make_ref<ValidPathInfo>(*info);
-            info2->narHash = hashString(htSHA256, *sink.s);
-            if (!info->narSize) info2->narSize = sink.s->size();
-            if (info->ultimate) info2->ultimate = false;
-            info = info2;
-
-            StringSource source(*sink.s);
-            dstStore->addToStore(*info, source, repair, checkSigs);
-            return;
-        }
+    auto srcUri = srcStore->getUri();
+    auto dstUri = dstStore->getUri();
+
+    Activity act(*logger, lvlInfo, actCopyPath,
+        srcUri == "local" || srcUri == "daemon"
+          ? fmt("copying path '%s' to '%s'", storePath, dstUri)
+          : dstUri == "local" || dstUri == "daemon"
+            ? fmt("copying path '%s' from '%s'", storePath, srcUri)
+            : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri),
+        {storePath, srcUri, dstUri});
+    PushActivity pact(act.id);
+
+    auto info = srcStore->queryPathInfo(storePath);
+
+    uint64_t total = 0;
+
+    if (!info->narHash) {
+        StringSink sink;
+        srcStore->narFromPath({storePath}, sink);
+        auto info2 = make_ref<ValidPathInfo>(*info);
+        info2->narHash = hashString(htSHA256, *sink.s);
+        if (!info->narSize) info2->narSize = sink.s->size();
+        if (info->ultimate) info2->ultimate = false;
+        info = info2;
+
+        StringSource source(*sink.s);
+        dstStore->addToStore(*info, source, repair, checkSigs);
+        return;
+    }
 
-        if (info->ultimate) {
-            auto info2 = make_ref<ValidPathInfo>(*info);
-            info2->ultimate = false;
-            info = info2;
-        }
+    if (info->ultimate) {
+        auto info2 = make_ref<ValidPathInfo>(*info);
+        info2->ultimate = false;
+        info = info2;
+    }
 
-        auto source = sinkToSource([&](Sink & sink) {
-            LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
-                sink(data, len);
-                total += len;
-                act.progress(total, info->narSize);
-            });
-            srcStore->narFromPath({storePath}, wrapperSink);
-        }, [&]() {
-            throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
+    auto source = sinkToSource([&](Sink & sink) {
+        LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
+            sink(data, len);
+            total += len;
+            act.progress(total, info->narSize);
         });
-
-        dstStore->addToStore(*info, *source, repair, checkSigs);
+        srcStore->narFromPath({storePath}, wrapperSink);
+    }, [&]() {
+        throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
     });
+
+    dstStore->addToStore(*info, *source, repair, checkSigs);
 }