about summary refs log tree commit diff
path: root/third_party/nix/src/libstore/download.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/nix/src/libstore/download.cc')
-rw-r--r--third_party/nix/src/libstore/download.cc1024
1 files changed, 0 insertions, 1024 deletions
diff --git a/third_party/nix/src/libstore/download.cc b/third_party/nix/src/libstore/download.cc
deleted file mode 100644
index fd472713e6a7..000000000000
--- a/third_party/nix/src/libstore/download.cc
+++ /dev/null
@@ -1,1024 +0,0 @@
-#include "libstore/download.hh"
-
-#include <absl/strings/ascii.h>
-#include <absl/strings/match.h>
-#include <absl/strings/numbers.h>
-#include <absl/strings/str_split.h>
-
-#include "libstore/globals.hh"
-#include "libstore/pathlocks.hh"
-#include "libstore/s3.hh"
-#include "libstore/store-api.hh"
-#include "libutil/archive.hh"
-#include "libutil/compression.hh"
-#include "libutil/finally.hh"
-#include "libutil/hash.hh"
-#include "libutil/util.hh"
-
-#ifdef ENABLE_S3
-#include <aws/core/client/ClientConfiguration.h>
-#endif
-
-#include <algorithm>
-#include <cmath>
-#include <cstring>
-#include <iostream>
-#include <queue>
-#include <random>
-#include <thread>
-
-#include <curl/curl.h>
-#include <fcntl.h>
-#include <glog/logging.h>
-#include <unistd.h>
-
-using namespace std::string_literals;
-
-namespace nix {
-
-DownloadSettings downloadSettings;
-
-static GlobalConfig::Register r1(&downloadSettings);
-
-std::string resolveUri(const std::string& uri) {
-  if (uri.compare(0, 8, "channel:") == 0) {
-    return "https://nixos.org/channels/" + std::string(uri, 8) +
-           "/nixexprs.tar.xz";
-  }
-  return uri;
-}
-
-struct CurlDownloader : public Downloader {
-  CURLM* curlm = nullptr;
-
-  std::random_device rd;
-  std::mt19937 mt19937;
-
-  struct DownloadItem : public std::enable_shared_from_this<DownloadItem> {
-    CurlDownloader& downloader;
-    DownloadRequest request;
-    DownloadResult result;
-    bool done = false;  // whether either the success or failure function has
-                        // been called
-    Callback<DownloadResult> callback;
-    CURL* req = nullptr;
-    bool active =
-        false;  // whether the handle has been added to the multi object
-    std::string status;
-
-    unsigned int attempt = 0;
-
-    /* Don't start this download until the specified time point
-       has been reached. */
-    std::chrono::steady_clock::time_point embargo;
-
-    struct curl_slist* requestHeaders = nullptr;
-
-    std::string encoding;
-
-    bool acceptRanges = false;
-
-    curl_off_t writtenToSink = 0;
-
-    DownloadItem(CurlDownloader& downloader, const DownloadRequest& request,
-                 Callback<DownloadResult>&& callback)
-        : downloader(downloader),
-          request(request),
-          callback(std::move(callback)),
-          finalSink([this](const unsigned char* data, size_t len) {
-            if (this->request.dataCallback) {
-              long httpStatus = 0;
-              curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);
-
-              /* Only write data to the sink if this is a
-                 successful response. */
-              if (httpStatus == 0 || httpStatus == 200 || httpStatus == 201 ||
-                  httpStatus == 206) {
-                writtenToSink += len;
-                this->request.dataCallback((char*)data, len);
-              }
-            } else {
-              this->result.data->append((char*)data, len);
-            }
-          }) {
-      LOG(INFO) << (request.data ? "uploading '" : "downloading '")
-                << request.uri << "'";
-
-      if (!request.expectedETag.empty()) {
-        requestHeaders = curl_slist_append(
-            requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
-      }
-      if (!request.mimeType.empty()) {
-        requestHeaders = curl_slist_append(
-            requestHeaders, ("Content-Type: " + request.mimeType).c_str());
-      }
-    }
-
-    ~DownloadItem() {
-      if (req != nullptr) {
-        if (active) {
-          curl_multi_remove_handle(downloader.curlm, req);
-        }
-        curl_easy_cleanup(req);
-      }
-      if (requestHeaders != nullptr) {
-        curl_slist_free_all(requestHeaders);
-      }
-      try {
-        if (!done) {
-          fail(DownloadError(
-              Interrupted,
-              format("download of '%s' was interrupted") % request.uri));
-        }
-      } catch (...) {
-        ignoreException();
-      }
-    }
-
-    void failEx(const std::exception_ptr& ex) {
-      assert(!done);
-      done = true;
-      callback.rethrow(ex);
-    }
-
-    template <class T>
-    void fail(const T& e) {
-      failEx(std::make_exception_ptr(e));
-    }
-
-    LambdaSink finalSink;
-    std::shared_ptr<CompressionSink> decompressionSink;
-
-    std::exception_ptr writeException;
-
-    size_t writeCallback(void* contents, size_t size, size_t nmemb) {
-      try {
-        size_t realSize = size * nmemb;
-        result.bodySize += realSize;
-
-        if (!decompressionSink) {
-          decompressionSink = makeDecompressionSink(encoding, finalSink);
-        }
-
-        (*decompressionSink)(static_cast<unsigned char*>(contents), realSize);
-
-        return realSize;
-      } catch (...) {
-        writeException = std::current_exception();
-        return 0;
-      }
-    }
-
-    static size_t writeCallbackWrapper(void* contents, size_t size,
-                                       size_t nmemb, void* userp) {
-      return (static_cast<DownloadItem*>(userp))
-          ->writeCallback(contents, size, nmemb);
-    }
-
-    size_t headerCallback(void* contents, size_t size, size_t nmemb) {
-      size_t realSize = size * nmemb;
-      std::string line(static_cast<char*>(contents), realSize);
-      DLOG(INFO) << "got header for '" << request.uri
-                 << "': " << absl::StripAsciiWhitespace(line);
-      if (line.compare(0, 5, "HTTP/") == 0) {  // new response starts
-        result.etag = "";
-        std::vector<std::string> ss =
-            absl::StrSplit(line, absl::ByChar(' '), absl::SkipEmpty());
-        status = ss.size() >= 2 ? ss[1] : "";
-        result.data = std::make_shared<std::string>();
-        result.bodySize = 0;
-        acceptRanges = false;
-        encoding = "";
-      } else {
-        auto i = line.find(':');
-        if (i != std::string::npos) {
-          std::string name = absl::AsciiStrToLower(
-              absl::StripAsciiWhitespace(std::string(line, 0, i)));
-          if (name == "etag") {
-            result.etag = absl::StripAsciiWhitespace(std::string(line, i + 1));
-            /* Hack to work around a GitHub bug: it sends
-               ETags, but ignores If-None-Match. So if we get
-               the expected ETag on a 200 response, then shut
-               down the connection because we already have the
-               data. */
-            if (result.etag == request.expectedETag && status == "200") {
-              DLOG(INFO)
-                  << "shutting down on 200 HTTP response with expected ETag";
-              return 0;
-            }
-          } else if (name == "content-encoding") {
-            encoding = absl::StripAsciiWhitespace(std::string(line, i + 1));
-          } else if (name == "accept-ranges" &&
-                     absl::AsciiStrToLower(absl::StripAsciiWhitespace(
-                         std::string(line, i + 1))) == "bytes") {
-            acceptRanges = true;
-          }
-        }
-      }
-      return realSize;
-    }
-
-    static size_t headerCallbackWrapper(void* contents, size_t size,
-                                        size_t nmemb, void* userp) {
-      return (static_cast<DownloadItem*>(userp))
-          ->headerCallback(contents, size, nmemb);
-    }
-
-    static int debugCallback(CURL* handle, curl_infotype type, char* data,
-                             size_t size, void* userptr) {
-      if (type == CURLINFO_TEXT) {
-        DLOG(INFO) << "curl: "
-                   << absl::StripTrailingAsciiWhitespace(
-                          std::string(data, size));
-      }
-      return 0;
-    }
-
-    size_t readOffset = 0;
-    size_t readCallback(char* buffer, size_t size, size_t nitems) {
-      if (readOffset == request.data->length()) {
-        return 0;
-      }
-      auto count = std::min(size * nitems, request.data->length() - readOffset);
-      assert(count);
-      memcpy(buffer, request.data->data() + readOffset, count);
-      readOffset += count;
-      return count;
-    }
-
-    static size_t readCallbackWrapper(char* buffer, size_t size, size_t nitems,
-                                      void* userp) {
-      return (static_cast<DownloadItem*>(userp))
-          ->readCallback(buffer, size, nitems);
-    }
-
-    void init() {
-      if (req == nullptr) {
-        req = curl_easy_init();
-      }
-
-      curl_easy_reset(req);
-
-      // TODO(tazjin): Add an Abseil flag for this
-      // if (verbosity >= lvlVomit) {
-      //   curl_easy_setopt(req, CURLOPT_VERBOSE, 1);
-      //   curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION,
-      //                    DownloadItem::debugCallback);
-      // }
-
-      curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str());
-      curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
-      curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10);
-      curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
-      curl_easy_setopt(req, CURLOPT_USERAGENT,
-                       ("curl/" LIBCURL_VERSION " Nix/" + nixVersion +
-                        (downloadSettings.userAgentSuffix != ""
-                             ? " " + downloadSettings.userAgentSuffix.get()
-                             : ""))
-                           .c_str());
-#if LIBCURL_VERSION_NUM >= 0x072b00
-      curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
-#endif
-#if LIBCURL_VERSION_NUM >= 0x072f00
-      if (downloadSettings.enableHttp2) {
-        curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
-      } else {
-        curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
-      }
-#endif
-      curl_easy_setopt(req, CURLOPT_WRITEFUNCTION,
-                       DownloadItem::writeCallbackWrapper);
-      curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
-      curl_easy_setopt(req, CURLOPT_HEADERFUNCTION,
-                       DownloadItem::headerCallbackWrapper);
-      curl_easy_setopt(req, CURLOPT_HEADERDATA, this);
-
-      curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders);
-
-      if (request.head) {
-        curl_easy_setopt(req, CURLOPT_NOBODY, 1);
-      }
-
-      if (request.data) {
-        curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
-        curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
-        curl_easy_setopt(req, CURLOPT_READDATA, this);
-        curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE,
-                         (curl_off_t)request.data->length());
-      }
-
-      if (request.verifyTLS) {
-        if (!settings.caFile.empty()) {
-          curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.c_str());
-        }
-      } else {
-        curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0);
-        curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
-      }
-
-      curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT,
-                       downloadSettings.connectTimeout.get());
-
-      curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L);
-      curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME,
-                       downloadSettings.stalledDownloadTimeout.get());
-
-      /* If no file exist in the specified path, curl continues to work
-         anyway as if netrc support was disabled. */
-      curl_easy_setopt(req, CURLOPT_NETRC_FILE,
-                       settings.netrcFile.get().c_str());
-      curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
-
-      if (writtenToSink != 0) {
-        curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink);
-      }
-
-      result.data = std::make_shared<std::string>();
-      result.bodySize = 0;
-    }
-
-    void finish(CURLcode code) {
-      long httpStatus = 0;
-      curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);
-
-      char* effectiveUriCStr;
-      curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
-      if (effectiveUriCStr != nullptr) {
-        result.effectiveUri = effectiveUriCStr;
-      }
-
-      DLOG(INFO) << "finished " << request.verb() << " of " << request.uri
-                 << "; curl status = " << code
-                 << ", HTTP status = " << httpStatus
-                 << ", body = " << result.bodySize << " bytes";
-
-      if (decompressionSink) {
-        try {
-          decompressionSink->finish();
-        } catch (...) {
-          writeException = std::current_exception();
-        }
-      }
-
-      if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
-        code = CURLE_OK;
-        httpStatus = 304;
-      }
-
-      if (writeException) {
-        failEx(writeException);
-
-      } else if (code == CURLE_OK &&
-                 (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 ||
-                  httpStatus == 206 || httpStatus == 304 ||
-                  httpStatus == 226 /* FTP */ ||
-                  httpStatus == 0 /* other protocol */)) {
-        result.cached = httpStatus == 304;
-        done = true;
-        callback(std::move(result));
-      }
-
-      else {
-        // We treat most errors as transient, but won't retry when hopeless
-        Error err = Transient;
-
-        if (httpStatus == 404 || httpStatus == 410 ||
-            code == CURLE_FILE_COULDNT_READ_FILE) {
-          // The file is definitely not there
-          err = NotFound;
-        } else if (httpStatus == 401 || httpStatus == 403 ||
-                   httpStatus == 407) {
-          // Don't retry on authentication/authorization failures
-          err = Forbidden;
-        } else if (httpStatus >= 400 && httpStatus < 500 && httpStatus != 408 &&
-                   httpStatus != 429) {
-          // Most 4xx errors are client errors and are probably not worth
-          // retrying:
-          //   * 408 means the server timed out waiting for us, so we try again
-          //   * 429 means too many requests, so we retry (with a delay)
-          err = Misc;
-        } else if (httpStatus == 501 || httpStatus == 505 ||
-                   httpStatus == 511) {
-          // Let's treat most 5xx (server) errors as transient, except for a
-          // handful:
-          //   * 501 not implemented
-          //   * 505 http version not supported
-          //   * 511 we're behind a captive portal
-          err = Misc;
-        } else {
-          // Don't bother retrying on certain cURL errors either
-          switch (code) {
-            case CURLE_FAILED_INIT:
-            case CURLE_URL_MALFORMAT:
-            case CURLE_NOT_BUILT_IN:
-            case CURLE_REMOTE_ACCESS_DENIED:
-            case CURLE_FILE_COULDNT_READ_FILE:
-            case CURLE_FUNCTION_NOT_FOUND:
-            case CURLE_ABORTED_BY_CALLBACK:
-            case CURLE_BAD_FUNCTION_ARGUMENT:
-            case CURLE_INTERFACE_FAILED:
-            case CURLE_UNKNOWN_OPTION:
-            case CURLE_SSL_CACERT_BADFILE:
-            case CURLE_TOO_MANY_REDIRECTS:
-            case CURLE_WRITE_ERROR:
-            case CURLE_UNSUPPORTED_PROTOCOL:
-              err = Misc;
-              break;
-            default:  // Shut up warnings
-              break;
-          }
-        }
-
-        attempt++;
-
-        auto exc =
-            code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
-                ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted",
-                                                 request.verb(), request.uri))
-            : httpStatus != 0
-                ? DownloadError(
-                      err,
-                      fmt("unable to %s '%s': HTTP error %d", request.verb(),
-                          request.uri, httpStatus) +
-                          (code == CURLE_OK ? ""
-                                            : fmt(" (curl error: %s)",
-                                                  curl_easy_strerror(code))))
-                : DownloadError(
-                      err, fmt("unable to %s '%s': %s (%d)", request.verb(),
-                               request.uri, curl_easy_strerror(code), code));
-
-        /* If this is a transient error, then maybe retry the
-           download after a while. If we're writing to a
-           sink, we can only retry if the server supports
-           ranged requests. */
-        if (err == Transient && attempt < request.tries &&
-            (!this->request.dataCallback || writtenToSink == 0 ||
-             (acceptRanges && encoding.empty()))) {
-          int ms = request.baseRetryTimeMs *
-                   std::pow(2.0F, attempt - 1 +
-                                      std::uniform_real_distribution<>(
-                                          0.0, 0.5)(downloader.mt19937));
-          if (writtenToSink != 0) {
-            LOG(WARNING) << exc.what() << "; retrying from offset "
-                         << writtenToSink << " in " << ms << "ms";
-          } else {
-            LOG(WARNING) << exc.what() << "; retrying in " << ms << "ms";
-          }
-          embargo =
-              std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
-          downloader.enqueueItem(shared_from_this());
-        } else {
-          fail(exc);
-        }
-      }
-    }
-  };
-
-  struct State {
-    struct EmbargoComparator {
-      bool operator()(const std::shared_ptr<DownloadItem>& i1,
-                      const std::shared_ptr<DownloadItem>& i2) {
-        return i1->embargo > i2->embargo;
-      }
-    };
-    bool quit = false;
-    std::priority_queue<std::shared_ptr<DownloadItem>,
-                        std::vector<std::shared_ptr<DownloadItem>>,
-                        EmbargoComparator>
-        incoming;
-  };
-
-  Sync<State> state_;
-
-  /* We can't use a std::condition_variable to wake up the curl
-     thread, because it only monitors file descriptors. So use a
-     pipe instead. */
-  Pipe wakeupPipe;
-
-  std::thread workerThread;
-
-  CurlDownloader() : mt19937(rd()) {
-    static std::once_flag globalInit;
-    std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
-
-    curlm = curl_multi_init();
-
-#if LIBCURL_VERSION_NUM >= 0x072b00  // Multiplex requires >= 7.43.0
-    curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
-#endif
-#if LIBCURL_VERSION_NUM >= 0x071e00  // Max connections requires >= 7.30.0
-    curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
-                      downloadSettings.httpConnections.get());
-#endif
-
-    wakeupPipe.create();
-    fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
-
-    workerThread = std::thread([&]() { workerThreadEntry(); });
-  }
-
-  ~CurlDownloader() override {
-    stopWorkerThread();
-
-    workerThread.join();
-
-    if (curlm != nullptr) {
-      curl_multi_cleanup(curlm);
-    }
-  }
-
-  void stopWorkerThread() {
-    /* Signal the worker thread to exit. */
-    {
-      auto state(state_.lock());
-      state->quit = true;
-    }
-    writeFull(wakeupPipe.writeSide.get(), " ", false);
-  }
-
-  void workerThreadMain() {
-    /* Cause this thread to be notified on SIGINT. */
-    auto callback = createInterruptCallback([&]() { stopWorkerThread(); });
-
-    std::map<CURL*, std::shared_ptr<DownloadItem>> items;
-
-    bool quit = false;
-
-    std::chrono::steady_clock::time_point nextWakeup;
-
-    while (!quit) {
-      checkInterrupt();
-
-      /* Let curl do its thing. */
-      int running;
-      CURLMcode mc = curl_multi_perform(curlm, &running);
-      if (mc != CURLM_OK) {
-        throw nix::Error(
-            format("unexpected error from curl_multi_perform(): %s") %
-            curl_multi_strerror(mc));
-      }
-
-      /* Set the promises of any finished requests. */
-      CURLMsg* msg;
-      int left;
-      while ((msg = curl_multi_info_read(curlm, &left)) != nullptr) {
-        if (msg->msg == CURLMSG_DONE) {
-          auto i = items.find(msg->easy_handle);
-          assert(i != items.end());
-          i->second->finish(msg->data.result);
-          curl_multi_remove_handle(curlm, i->second->req);
-          i->second->active = false;
-          items.erase(i);
-        }
-      }
-
-      /* Wait for activity, including wakeup events. */
-      int numfds = 0;
-      struct curl_waitfd extraFDs[1];
-      extraFDs[0].fd = wakeupPipe.readSide.get();
-      extraFDs[0].events = CURL_WAIT_POLLIN;
-      extraFDs[0].revents = 0;
-      long maxSleepTimeMs = items.empty() ? 10000 : 100;
-      auto sleepTimeMs =
-          nextWakeup != std::chrono::steady_clock::time_point()
-              ? std::max(
-                    0,
-                    static_cast<int>(
-                        std::chrono::duration_cast<std::chrono::milliseconds>(
-                            nextWakeup - std::chrono::steady_clock::now())
-                            .count()))
-              : maxSleepTimeMs;
-      VLOG(2) << "download thread waiting for " << sleepTimeMs << " ms";
-      mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds);
-      if (mc != CURLM_OK) {
-        throw nix::Error(format("unexpected error from curl_multi_wait(): %s") %
-                         curl_multi_strerror(mc));
-      }
-
-      nextWakeup = std::chrono::steady_clock::time_point();
-
-      /* Add new curl requests from the incoming requests queue,
-         except for requests that are embargoed (waiting for a
-         retry timeout to expire). */
-      if ((extraFDs[0].revents & CURL_WAIT_POLLIN) != 0) {
-        char buf[1024];
-        auto res = read(extraFDs[0].fd, buf, sizeof(buf));
-        if (res == -1 && errno != EINTR) {
-          throw SysError("reading curl wakeup socket");
-        }
-      }
-
-      std::vector<std::shared_ptr<DownloadItem>> incoming;
-      auto now = std::chrono::steady_clock::now();
-
-      {
-        auto state(state_.lock());
-        while (!state->incoming.empty()) {
-          auto item = state->incoming.top();
-          if (item->embargo <= now) {
-            incoming.push_back(item);
-            state->incoming.pop();
-          } else {
-            if (nextWakeup == std::chrono::steady_clock::time_point() ||
-                item->embargo < nextWakeup) {
-              nextWakeup = item->embargo;
-            }
-            break;
-          }
-        }
-        quit = state->quit;
-      }
-
-      for (auto& item : incoming) {
-        DLOG(INFO) << "starting " << item->request.verb() << " of "
-                   << item->request.uri;
-        item->init();
-        curl_multi_add_handle(curlm, item->req);
-        item->active = true;
-        items[item->req] = item;
-      }
-    }
-
-    DLOG(INFO) << "download thread shutting down";
-  }
-
-  void workerThreadEntry() {
-    try {
-      workerThreadMain();
-    } catch (nix::Interrupted& e) {
-    } catch (std::exception& e) {
-      LOG(ERROR) << "unexpected error in download thread: " << e.what();
-    }
-
-    {
-      auto state(state_.lock());
-      while (!state->incoming.empty()) {
-        state->incoming.pop();
-      }
-      state->quit = true;
-    }
-  }
-
-  void enqueueItem(const std::shared_ptr<DownloadItem>& item) {
-    if (item->request.data && !absl::StartsWith(item->request.uri, "http://") &&
-        !absl::StartsWith(item->request.uri, "https://")) {
-      throw nix::Error("uploading to '%s' is not supported", item->request.uri);
-    }
-
-    {
-      auto state(state_.lock());
-      if (state->quit) {
-        throw nix::Error(
-            "cannot enqueue download request because the download thread is "
-            "shutting down");
-      }
-      state->incoming.push(item);
-    }
-    writeFull(wakeupPipe.writeSide.get(), " ");
-  }
-
-#ifdef ENABLE_S3
-  std::tuple<std::string, std::string, Store::Params> parseS3Uri(
-      std::string uri) {
-    auto [path, params] = splitUriAndParams(uri);
-
-    auto slash = path.find('/', 5);  // 5 is the length of "s3://" prefix
-    if (slash == std::string::npos) {
-      throw nix::Error("bad S3 URI '%s'", path);
-    }
-
-    std::string bucketName(path, 5, slash - 5);
-    std::string key(path, slash + 1);
-
-    return {bucketName, key, params};
-  }
-#endif
-
-  void enqueueDownload(const DownloadRequest& request,
-                       Callback<DownloadResult> callback) override {
-    /* Ugly hack to support s3:// URIs. */
-    if (absl::StartsWith(request.uri, "s3://")) {
-      // FIXME: do this on a worker thread
-      try {
-#ifdef ENABLE_S3
-        auto [bucketName, key, params] = parseS3Uri(request.uri);
-
-        std::string profile = get(params, "profile", "");
-        std::string region = get(params, "region", Aws::Region::US_EAST_1);
-        std::string scheme = get(params, "scheme", "");
-        std::string endpoint = get(params, "endpoint", "");
-
-        S3Helper s3Helper(profile, region, scheme, endpoint);
-
-        // FIXME: implement ETag
-        auto s3Res = s3Helper.getObject(bucketName, key);
-        DownloadResult res;
-        if (!s3Res.data)
-          throw DownloadError(
-              NotFound, fmt("S3 object '%s' does not exist", request.uri));
-        res.data = s3Res.data;
-        callback(std::move(res));
-#else
-        throw nix::Error(
-            "cannot download '%s' because Nix is not built with S3 support",
-            request.uri);
-#endif
-      } catch (...) {
-        callback.rethrow();
-      }
-      return;
-    }
-
-    enqueueItem(
-        std::make_shared<DownloadItem>(*this, request, std::move(callback)));
-  }
-};
-
-ref<Downloader> getDownloader() {
-  static ref<Downloader> downloader = makeDownloader();
-  return downloader;
-}
-
-ref<Downloader> makeDownloader() { return make_ref<CurlDownloader>(); }
-
-std::future<DownloadResult> Downloader::enqueueDownload(
-    const DownloadRequest& request) {
-  auto promise = std::make_shared<std::promise<DownloadResult>>();
-  enqueueDownload(
-      request,
-      Callback<DownloadResult>([promise](std::future<DownloadResult> fut) {
-        try {
-          promise->set_value(fut.get());
-        } catch (...) {
-          promise->set_exception(std::current_exception());
-        }
-      }));
-  return promise->get_future();
-}
-
-DownloadResult Downloader::download(const DownloadRequest& request) {
-  return enqueueDownload(request).get();
-}
-
-void Downloader::download(DownloadRequest&& request, Sink& sink) {
-  /* Note: we can't call 'sink' via request.dataCallback, because
-     that would cause the sink to execute on the downloader
-     thread. If 'sink' is a coroutine, this will fail. Also, if the
-     sink is expensive (e.g. one that does decompression and writing
-     to the Nix store), it would stall the download thread too much.
-     Therefore we use a buffer to communicate data between the
-     download thread and the calling thread. */
-
-  struct State {
-    bool quit = false;
-    std::exception_ptr exc;
-    std::string data;
-    std::condition_variable avail, request;
-  };
-
-  auto _state = std::make_shared<Sync<State>>();
-
-  /* In case of an exception, wake up the download thread. FIXME:
-     abort the download request. */
-  Finally finally([&]() {
-    auto state(_state->lock());
-    state->quit = true;
-    state->request.notify_one();
-  });
-
-  request.dataCallback = [_state](char* buf, size_t len) {
-    auto state(_state->lock());
-
-    if (state->quit) {
-      return;
-    }
-
-    /* If the buffer is full, then go to sleep until the calling
-       thread wakes us up (i.e. when it has removed data from the
-       buffer). We don't wait forever to prevent stalling the
-       download thread. (Hopefully sleeping will throttle the
-       sender.) */
-    if (state->data.size() > 1024 * 1024) {
-      DLOG(INFO) << "download buffer is full; going to sleep";
-      state.wait_for(state->request, std::chrono::seconds(10));
-    }
-
-    /* Append data to the buffer and wake up the calling
-       thread. */
-    state->data.append(buf, len);
-    state->avail.notify_one();
-  };
-
-  enqueueDownload(request, Callback<DownloadResult>(
-                               [_state](std::future<DownloadResult> fut) {
-                                 auto state(_state->lock());
-                                 state->quit = true;
-                                 try {
-                                   fut.get();
-                                 } catch (...) {
-                                   state->exc = std::current_exception();
-                                 }
-                                 state->avail.notify_one();
-                                 state->request.notify_one();
-                               }));
-
-  while (true) {
-    checkInterrupt();
-
-    std::string chunk;
-
-    /* Grab data if available, otherwise wait for the download
-       thread to wake us up. */
-    {
-      auto state(_state->lock());
-
-      while (state->data.empty()) {
-        if (state->quit) {
-          if (state->exc) {
-            std::rethrow_exception(state->exc);
-          }
-          return;
-        }
-
-        state.wait(state->avail);
-      }
-
-      chunk = std::move(state->data);
-      state->data = std::string();
-
-      state->request.notify_one();
-    }
-
-    /* Flush the data to the sink and wake up the download thread
-       if it's blocked on a full buffer. We don't hold the state
-       lock while doing this to prevent blocking the download
-       thread if sink() takes a long time. */
-    sink(reinterpret_cast<unsigned char*>(chunk.data()), chunk.size());
-  }
-}
-
-CachedDownloadResult Downloader::downloadCached(
-    const ref<Store>& store, const CachedDownloadRequest& request) {
-  auto url = resolveUri(request.uri);
-
-  auto name = request.name;
-  if (name.empty()) {
-    auto p = url.rfind('/');
-    if (p != std::string::npos) {
-      name = std::string(url, p + 1);
-    }
-  }
-
-  Path expectedStorePath;
-  if (request.expectedHash) {
-    expectedStorePath =
-        store->makeFixedOutputPath(request.unpack, request.expectedHash, name);
-    if (store->isValidPath(expectedStorePath)) {
-      CachedDownloadResult result;
-      result.storePath = expectedStorePath;
-      result.path = store->toRealPath(expectedStorePath);
-      return result;
-    }
-  }
-
-  Path cacheDir = getCacheDir() + "/nix/tarballs";
-  createDirs(cacheDir);
-
-  std::string urlHash = hashString(htSHA256, name + std::string("\0"s) + url)
-                            .to_string(Base32, false);
-
-  Path dataFile = cacheDir + "/" + urlHash + ".info";
-  Path fileLink = cacheDir + "/" + urlHash + "-file";
-
-  PathLocks lock({fileLink}, fmt("waiting for lock on '%1%'...", fileLink));
-
-  Path storePath;
-
-  std::string expectedETag;
-
-  bool skip = false;
-
-  CachedDownloadResult result;
-
-  if (pathExists(fileLink) && pathExists(dataFile)) {
-    storePath = readLink(fileLink);
-    store->addTempRoot(storePath);
-    if (store->isValidPath(storePath)) {
-      std::vector<std::string> ss = absl::StrSplit(
-          readFile(dataFile), absl::ByChar('\n'), absl::SkipEmpty());
-      if (ss.size() >= 3 && ss[0] == url) {
-        time_t lastChecked;
-        if (absl::SimpleAtoi(ss[2], &lastChecked) &&
-            static_cast<uint64_t>(lastChecked) + request.ttl >=
-                static_cast<uint64_t>(time(nullptr))) {
-          skip = true;
-          result.effectiveUri = request.uri;
-          result.etag = ss[1];
-        } else if (!ss[1].empty()) {
-          DLOG(INFO) << "verifying previous ETag: " << ss[1];
-          expectedETag = ss[1];
-        }
-      }
-    } else {
-      storePath = "";
-    }
-  }
-
-  if (!skip) {
-    try {
-      DownloadRequest request2(url);
-      request2.expectedETag = expectedETag;
-      auto res = download(request2);
-      result.effectiveUri = res.effectiveUri;
-      result.etag = res.etag;
-
-      if (!res.cached) {
-        ValidPathInfo info;
-        StringSink sink;
-        dumpString(*res.data, sink);
-        Hash hash = hashString(
-            request.expectedHash ? request.expectedHash.type : htSHA256,
-            *res.data);
-        info.path = store->makeFixedOutputPath(false, hash, name);
-        info.narHash = hashString(htSHA256, *sink.s);
-        info.narSize = sink.s->size();
-        info.ca = makeFixedOutputCA(false, hash);
-        store->addToStore(info, sink.s, NoRepair, NoCheckSigs);
-        storePath = info.path;
-      }
-
-      assert(!storePath.empty());
-      replaceSymlink(storePath, fileLink);
-
-      writeFile(dataFile, url + "\n" + res.etag + "\n" +
-                              std::to_string(time(nullptr)) + "\n");
-    } catch (DownloadError& e) {
-      if (storePath.empty()) {
-        throw;
-      }
-      LOG(WARNING) << e.msg() << "; using cached result";
-      result.etag = expectedETag;
-    }
-  }
-
-  if (request.unpack) {
-    Path unpackedLink = cacheDir + "/" + baseNameOf(storePath) + "-unpacked";
-    PathLocks lock2({unpackedLink},
-                    fmt("waiting for lock on '%1%'...", unpackedLink));
-    Path unpackedStorePath;
-    if (pathExists(unpackedLink)) {
-      unpackedStorePath = readLink(unpackedLink);
-      store->addTempRoot(unpackedStorePath);
-      if (!store->isValidPath(unpackedStorePath)) {
-        unpackedStorePath = "";
-      }
-    }
-    if (unpackedStorePath.empty()) {
-      LOG(INFO) << "unpacking '" << url << "' ...";
-      Path tmpDir = createTempDir();
-      AutoDelete autoDelete(tmpDir, true);
-      // FIXME: this requires GNU tar for decompression.
-      runProgram("tar", true,
-                 {"xf", store->toRealPath(storePath), "-C", tmpDir,
-                  "--strip-components", "1"});
-      unpackedStorePath = store->addToStore(name, tmpDir, true, htSHA256,
-                                            defaultPathFilter, NoRepair);
-    }
-    replaceSymlink(unpackedStorePath, unpackedLink);
-    storePath = unpackedStorePath;
-  }
-
-  if (!expectedStorePath.empty() && storePath != expectedStorePath) {
-    unsigned int statusCode = 102;
-    Hash gotHash =
-        request.unpack
-            ? hashPath(request.expectedHash.type, store->toRealPath(storePath))
-                  .first
-            : hashFile(request.expectedHash.type, store->toRealPath(storePath));
-    throw nix::Error(statusCode,
-                     "hash mismatch in file downloaded from '%s':\n  wanted: "
-                     "%s\n  got:    %s",
-                     url, request.expectedHash.to_string(),
-                     gotHash.to_string());
-  }
-
-  result.storePath = storePath;
-  result.path = store->toRealPath(storePath);
-  return result;
-}
-
-bool isUri(const std::string& s) {
-  if (s.compare(0, 8, "channel:") == 0) {
-    return true;
-  }
-  size_t pos = s.find("://");
-  if (pos == std::string::npos) {
-    return false;
-  }
-  std::string scheme(s, 0, pos);
-  return scheme == "http" || scheme == "https" || scheme == "file" ||
-         scheme == "channel" || scheme == "git" || scheme == "s3" ||
-         scheme == "ssh";
-}
-
-}  // namespace nix