#include "download.hh"

#include <absl/strings/ascii.h>

#include "archive.hh"
#include "compression.hh"
#include "finally.hh"
#include "globals.hh"
#include "hash.hh"
#include "pathlocks.hh"
#include "s3.hh"
#include "store-api.hh"
#include "util.hh"

#ifdef ENABLE_S3
#include <aws/core/client/ClientConfiguration.h>
#endif

#include <algorithm>
#include <cmath>
#include <cstring>
#include <iostream>
#include <queue>
#include <random>
#include <thread>

#include <curl/curl.h>
#include <fcntl.h>
#include <glog/logging.h>
#include <unistd.h>

using namespace std::string_literals;

namespace nix {

DownloadSettings downloadSettings;

static GlobalConfig::Register r1(&downloadSettings);

std::string resolveUri(const std::string& uri) {
  if (uri.compare(0, 8, "channel:") == 0) {
    return "https://nixos.org/channels/" + std::string(uri, 8) +
           "/nixexprs.tar.xz";
  }
  return uri;
}

struct CurlDownloader : public Downloader {
  CURLM* curlm = nullptr;

  std::random_device rd;
  std::mt19937 mt19937;

  struct DownloadItem : public std::enable_shared_from_this<DownloadItem> {
    CurlDownloader& downloader;
    DownloadRequest request;
    DownloadResult result;
    bool done = false;  // whether either the success or failure function has
                        // been called
    Callback<DownloadResult> callback;
    CURL* req = nullptr;
    bool active =
        false;  // whether the handle has been added to the multi object
    std::string status;

    unsigned int attempt = 0;

    /* Don't start this download until the specified time point
       has been reached. */
    std::chrono::steady_clock::time_point embargo;

    struct curl_slist* requestHeaders = nullptr;

    std::string encoding;

    bool acceptRanges = false;

    curl_off_t writtenToSink = 0;

    DownloadItem(CurlDownloader& downloader, const DownloadRequest& request,
                 Callback<DownloadResult>&& callback)
        : downloader(downloader),
          request(request),
          callback(std::move(callback)),
          finalSink([this](const unsigned char* data, size_t len) {
            if (this->request.dataCallback) {
              long httpStatus = 0;
              curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);

              /* Only write data to the sink if this is a
                 successful response. */
              if (httpStatus == 0 || httpStatus == 200 || httpStatus == 201 ||
                  httpStatus == 206) {
                writtenToSink += len;
                this->request.dataCallback((char*)data, len);
              }
            } else {
              this->result.data->append((char*)data, len);
            }
          }) {
      LOG(INFO) << (request.data ? "uploading '" : "downloading '")
                << request.uri << "'";

      if (!request.expectedETag.empty()) {
        requestHeaders = curl_slist_append(
            requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
      }
      if (!request.mimeType.empty()) {
        requestHeaders = curl_slist_append(
            requestHeaders, ("Content-Type: " + request.mimeType).c_str());
      }
    }

    ~DownloadItem() {
      if (req != nullptr) {
        if (active) {
          curl_multi_remove_handle(downloader.curlm, req);
        }
        curl_easy_cleanup(req);
      }
      if (requestHeaders != nullptr) {
        curl_slist_free_all(requestHeaders);
      }
      try {
        if (!done) {
          fail(DownloadError(
              Interrupted,
              format("download of '%s' was interrupted") % request.uri));
        }
      } catch (...) {
        ignoreException();
      }
    }

    void failEx(const std::exception_ptr& ex) {
      assert(!done);
      done = true;
      callback.rethrow(ex);
    }

    template <class T>
    void fail(const T& e) {
      failEx(std::make_exception_ptr(e));
    }

    LambdaSink finalSink;
    std::shared_ptr<CompressionSink> decompressionSink;

    std::exception_ptr writeException;

    size_t writeCallback(void* contents, size_t size, size_t nmemb) {
      try {
        size_t realSize = size * nmemb;
        result.bodySize += realSize;

        if (!decompressionSink) {
          decompressionSink = makeDecompressionSink(encoding, finalSink);
        }

        (*decompressionSink)((unsigned char*)contents, realSize);

        return realSize;
      } catch (...) {
        writeException = std::current_exception();
        return 0;
      }
    }

    static size_t writeCallbackWrapper(void* contents, size_t size,
                                       size_t nmemb, void* userp) {
      return ((DownloadItem*)userp)->writeCallback(contents, size, nmemb);
    }

    size_t headerCallback(void* contents, size_t size, size_t nmemb) {
      size_t realSize = size * nmemb;
      std::string line((char*)contents, realSize);
      DLOG(INFO) << "got header for '" << request.uri << "': " << trim(line);
      if (line.compare(0, 5, "HTTP/") == 0) {  // new response starts
        result.etag = "";
        auto ss = tokenizeString<std::vector<std::string>>(line, " ");
        status = ss.size() >= 2 ? ss[1] : "";
        result.data = std::make_shared<std::string>();
        result.bodySize = 0;
        acceptRanges = false;
        encoding = "";
      } else {
        auto i = line.find(':');
        if (i != std::string::npos) {
          std::string name = toLower(trim(std::string(line, 0, i)));
          if (name == "etag") {
            result.etag = trim(std::string(line, i + 1));
            /* Hack to work around a GitHub bug: it sends
               ETags, but ignores If-None-Match. So if we get
               the expected ETag on a 200 response, then shut
               down the connection because we already have the
               data. */
            if (result.etag == request.expectedETag && status == "200") {
              DLOG(INFO)
                  << "shutting down on 200 HTTP response with expected ETag";
              return 0;
            }
          } else if (name == "content-encoding") {
            encoding = trim(std::string(line, i + 1));
          } else if (name == "accept-ranges" &&
                     toLower(trim(std::string(line, i + 1))) == "bytes") {
            acceptRanges = true;
          }
        }
      }
      return realSize;
    }

    static size_t headerCallbackWrapper(void* contents, size_t size,
                                        size_t nmemb, void* userp) {
      return ((DownloadItem*)userp)->headerCallback(contents, size, nmemb);
    }

    static int debugCallback(CURL* handle, curl_infotype type, char* data,
                             size_t size, void* userptr) {
      if (type == CURLINFO_TEXT) {
        DLOG(INFO) << "curl: "
                   << absl::StripTrailingAsciiWhitespace(
                          std::string(data, size));
      }
      return 0;
    }

    size_t readOffset = 0;
    size_t readCallback(char* buffer, size_t size, size_t nitems) {
      if (readOffset == request.data->length()) {
        return 0;
      }
      auto count = std::min(size * nitems, request.data->length() - readOffset);
      assert(count);
      memcpy(buffer, request.data->data() + readOffset, count);
      readOffset += count;
      return count;
    }

    static size_t readCallbackWrapper(char* buffer, size_t size, size_t nitems,
                                      void* userp) {
      return ((DownloadItem*)userp)->readCallback(buffer, size, nitems);
    }

    void init() {
      if (req == nullptr) {
        req = curl_easy_init();
      }

      curl_easy_reset(req);

      // TODO(tazjin): Add an Abseil flag for this
      // if (verbosity >= lvlVomit) {
      //   curl_easy_setopt(req, CURLOPT_VERBOSE, 1);
      //   curl_easy_setopt(req, CURLOPT_DEBUGFUNCTION,
      //                    DownloadItem::debugCallback);
      // }

      curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str());
      curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
      curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10);
      curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
      curl_easy_setopt(req, CURLOPT_USERAGENT,
                       ("curl/" LIBCURL_VERSION " Nix/" + nixVersion +
                        (downloadSettings.userAgentSuffix != ""
                             ? " " + downloadSettings.userAgentSuffix.get()
                             : ""))
                           .c_str());
#if LIBCURL_VERSION_NUM >= 0x072b00
      curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
#endif
#if LIBCURL_VERSION_NUM >= 0x072f00
      if (downloadSettings.enableHttp2) {
        curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
      } else {
        curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
      }
#endif
      curl_easy_setopt(req, CURLOPT_WRITEFUNCTION,
                       DownloadItem::writeCallbackWrapper);
      curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
      curl_easy_setopt(req, CURLOPT_HEADERFUNCTION,
                       DownloadItem::headerCallbackWrapper);
      curl_easy_setopt(req, CURLOPT_HEADERDATA, this);

      curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders);

      if (request.head) {
        curl_easy_setopt(req, CURLOPT_NOBODY, 1);
      }

      if (request.data) {
        curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
        curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
        curl_easy_setopt(req, CURLOPT_READDATA, this);
        curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE,
                         (curl_off_t)request.data->length());
      }

      if (request.verifyTLS) {
        if (!settings.caFile.empty()) {
          curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.c_str());
        }
      } else {
        curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0);
        curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
      }

      curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT,
                       downloadSettings.connectTimeout.get());

      curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L);
      curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME,
                       downloadSettings.stalledDownloadTimeout.get());

      /* If no file exist in the specified path, curl continues to work
         anyway as if netrc support was disabled. */
      curl_easy_setopt(req, CURLOPT_NETRC_FILE,
                       settings.netrcFile.get().c_str());
      curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);

      if (writtenToSink != 0) {
        curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink);
      }

      result.data = std::make_shared<std::string>();
      result.bodySize = 0;
    }

    void finish(CURLcode code) {
      long httpStatus = 0;
      curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);

      char* effectiveUriCStr;
      curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUriCStr);
      if (effectiveUriCStr != nullptr) {
        result.effectiveUri = effectiveUriCStr;
      }

      DLOG(INFO) << "finished " << request.verb() << " of " << request.uri
                 << "; curl status = " << code
                 << ", HTTP status = " << httpStatus
                 << ", body = " << result.bodySize << " bytes";

      if (decompressionSink) {
        try {
          decompressionSink->finish();
        } catch (...) {
          writeException = std::current_exception();
        }
      }

      if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
        code = CURLE_OK;
        httpStatus = 304;
      }

      if (writeException) {
        failEx(writeException);

      } else if (code == CURLE_OK &&
                 (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 ||
                  httpStatus == 206 || httpStatus == 304 ||
                  httpStatus == 226 /* FTP */ ||
                  httpStatus == 0 /* other protocol */)) {
        result.cached = httpStatus == 304;
        done = true;
        callback(std::move(result));
      }

      else {
        // We treat most errors as transient, but won't retry when hopeless
        Error err = Transient;

        if (httpStatus == 404 || httpStatus == 410 ||
            code == CURLE_FILE_COULDNT_READ_FILE) {
          // The file is definitely not there
          err = NotFound;
        } else if (httpStatus == 401 || httpStatus == 403 ||
                   httpStatus == 407) {
          // Don't retry on authentication/authorization failures
          err = Forbidden;
        } else if (httpStatus >= 400 && httpStatus < 500 && httpStatus != 408 &&
                   httpStatus != 429) {
          // Most 4xx errors are client errors and are probably not worth
          // retrying:
          //   * 408 means the server timed out waiting for us, so we try again
          //   * 429 means too many requests, so we retry (with a delay)
          err = Misc;
        } else if (httpStatus == 501 || httpStatus == 505 ||
                   httpStatus == 511) {
          // Let's treat most 5xx (server) errors as transient, except for a
          // handful:
          //   * 501 not implemented
          //   * 505 http version not supported
          //   * 511 we're behind a captive portal
          err = Misc;
        } else {
          // Don't bother retrying on certain cURL errors either
          switch (code) {
            case CURLE_FAILED_INIT:
            case CURLE_URL_MALFORMAT:
            case CURLE_NOT_BUILT_IN:
            case CURLE_REMOTE_ACCESS_DENIED:
            case CURLE_FILE_COULDNT_READ_FILE:
            case CURLE_FUNCTION_NOT_FOUND:
            case CURLE_ABORTED_BY_CALLBACK:
            case CURLE_BAD_FUNCTION_ARGUMENT:
            case CURLE_INTERFACE_FAILED:
            case CURLE_UNKNOWN_OPTION:
            case CURLE_SSL_CACERT_BADFILE:
            case CURLE_TOO_MANY_REDIRECTS:
            case CURLE_WRITE_ERROR:
            case CURLE_UNSUPPORTED_PROTOCOL:
              err = Misc;
              break;
            default:  // Shut up warnings
              break;
          }
        }

        attempt++;

        auto exc =
            code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
                ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted",
                                                 request.verb(), request.uri))
                : httpStatus != 0
                      ? DownloadError(
                            err, fmt("unable to %s '%s': HTTP error %d",
                                     request.verb(), request.uri, httpStatus) +
                                     (code == CURLE_OK
                                          ? ""
                                          : fmt(" (curl error: %s)",
                                                curl_easy_strerror(code))))
                      : DownloadError(err, fmt("unable to %s '%s': %s (%d)",
                                               request.verb(), request.uri,
                                               curl_easy_strerror(code), code));

        /* If this is a transient error, then maybe retry the
           download after a while. If we're writing to a
           sink, we can only retry if the server supports
           ranged requests. */
        if (err == Transient && attempt < request.tries &&
            (!this->request.dataCallback || writtenToSink == 0 ||
             (acceptRanges && encoding.empty()))) {
          int ms = request.baseRetryTimeMs *
                   std::pow(2.0F, attempt - 1 +
                                      std::uniform_real_distribution<>(
                                          0.0, 0.5)(downloader.mt19937));
          if (writtenToSink != 0) {
            LOG(WARNING) << exc.what() << "; retrying from offset "
                         << writtenToSink << " in " << ms << "ms";
          } else {
            LOG(WARNING) << exc.what() << "; retrying in " << ms << "ms";
          }
          embargo =
              std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
          downloader.enqueueItem(shared_from_this());
        } else {
          fail(exc);
        }
      }
    }
  };

  struct State {
    struct EmbargoComparator {
      bool operator()(const std::shared_ptr<DownloadItem>& i1,
                      const std::shared_ptr<DownloadItem>& i2) {
        return i1->embargo > i2->embargo;
      }
    };
    bool quit = false;
    std::priority_queue<std::shared_ptr<DownloadItem>,
                        std::vector<std::shared_ptr<DownloadItem>>,
                        EmbargoComparator>
        incoming;
  };

  Sync<State> state_;

  /* We can't use a std::condition_variable to wake up the curl
     thread, because it only monitors file descriptors. So use a
     pipe instead. */
  Pipe wakeupPipe;

  std::thread workerThread;

  CurlDownloader() : mt19937(rd()) {
    static std::once_flag globalInit;
    std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);

    curlm = curl_multi_init();

#if LIBCURL_VERSION_NUM >= 0x072b00  // Multiplex requires >= 7.43.0
    curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
#endif
#if LIBCURL_VERSION_NUM >= 0x071e00  // Max connections requires >= 7.30.0
    curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
                      downloadSettings.httpConnections.get());
#endif

    wakeupPipe.create();
    fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);

    workerThread = std::thread([&]() { workerThreadEntry(); });
  }

  ~CurlDownloader() override {
    stopWorkerThread();

    workerThread.join();

    if (curlm != nullptr) {
      curl_multi_cleanup(curlm);
    }
  }

  void stopWorkerThread() {
    /* Signal the worker thread to exit. */
    {
      auto state(state_.lock());
      state->quit = true;
    }
    writeFull(wakeupPipe.writeSide.get(), " ", false);
  }

  void workerThreadMain() {
    /* Cause this thread to be notified on SIGINT. */
    auto callback = createInterruptCallback([&]() { stopWorkerThread(); });

    std::map<CURL*, std::shared_ptr<DownloadItem>> items;

    bool quit = false;

    std::chrono::steady_clock::time_point nextWakeup;

    while (!quit) {
      checkInterrupt();

      /* Let curl do its thing. */
      int running;
      CURLMcode mc = curl_multi_perform(curlm, &running);
      if (mc != CURLM_OK) {
        throw nix::Error(
            format("unexpected error from curl_multi_perform(): %s") %
            curl_multi_strerror(mc));
      }

      /* Set the promises of any finished requests. */
      CURLMsg* msg;
      int left;
      while ((msg = curl_multi_info_read(curlm, &left)) != nullptr) {
        if (msg->msg == CURLMSG_DONE) {
          auto i = items.find(msg->easy_handle);
          assert(i != items.end());
          i->second->finish(msg->data.result);
          curl_multi_remove_handle(curlm, i->second->req);
          i->second->active = false;
          items.erase(i);
        }
      }

      /* Wait for activity, including wakeup events. */
      int numfds = 0;
      struct curl_waitfd extraFDs[1];
      extraFDs[0].fd = wakeupPipe.readSide.get();
      extraFDs[0].events = CURL_WAIT_POLLIN;
      extraFDs[0].revents = 0;
      long maxSleepTimeMs = items.empty() ? 10000 : 100;
      auto sleepTimeMs =
          nextWakeup != std::chrono::steady_clock::time_point()
              ? std::max(
                    0,
                    (int)std::chrono::duration_cast<std::chrono::milliseconds>(
                        nextWakeup - std::chrono::steady_clock::now())
                        .count())
              : maxSleepTimeMs;
      DLOG(INFO) << "download thread waiting for " << sleepTimeMs << " ms";
      mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds);
      if (mc != CURLM_OK) {
        throw nix::Error(format("unexpected error from curl_multi_wait(): %s") %
                         curl_multi_strerror(mc));
      }

      nextWakeup = std::chrono::steady_clock::time_point();

      /* Add new curl requests from the incoming requests queue,
         except for requests that are embargoed (waiting for a
         retry timeout to expire). */
      if ((extraFDs[0].revents & CURL_WAIT_POLLIN) != 0) {
        char buf[1024];
        auto res = read(extraFDs[0].fd, buf, sizeof(buf));
        if (res == -1 && errno != EINTR) {
          throw SysError("reading curl wakeup socket");
        }
      }

      std::vector<std::shared_ptr<DownloadItem>> incoming;
      auto now = std::chrono::steady_clock::now();

      {
        auto state(state_.lock());
        while (!state->incoming.empty()) {
          auto item = state->incoming.top();
          if (item->embargo <= now) {
            incoming.push_back(item);
            state->incoming.pop();
          } else {
            if (nextWakeup == std::chrono::steady_clock::time_point() ||
                item->embargo < nextWakeup) {
              nextWakeup = item->embargo;
            }
            break;
          }
        }
        quit = state->quit;
      }

      for (auto& item : incoming) {
        DLOG(INFO) << "starting " << item->request.verb() << " of "
                   << item->request.uri;
        item->init();
        curl_multi_add_handle(curlm, item->req);
        item->active = true;
        items[item->req] = item;
      }
    }

    DLOG(INFO) << "download thread shutting down";
  }

  void workerThreadEntry() {
    try {
      workerThreadMain();
    } catch (nix::Interrupted& e) {
    } catch (std::exception& e) {
      LOG(ERROR) << "unexpected error in download thread: " << e.what();
    }

    {
      auto state(state_.lock());
      while (!state->incoming.empty()) {
        state->incoming.pop();
      }
      state->quit = true;
    }
  }

  void enqueueItem(const std::shared_ptr<DownloadItem>& item) {
    if (item->request.data && !hasPrefix(item->request.uri, "http://") &&
        !hasPrefix(item->request.uri, "https://")) {
      throw nix::Error("uploading to '%s' is not supported", item->request.uri);
    }

    {
      auto state(state_.lock());
      if (state->quit) {
        throw nix::Error(
            "cannot enqueue download request because the download thread is "
            "shutting down");
      }
      state->incoming.push(item);
    }
    writeFull(wakeupPipe.writeSide.get(), " ");
  }

#ifdef ENABLE_S3
  std::tuple<std::string, std::string, Store::Params> parseS3Uri(
      std::string uri) {
    auto [path, params] = splitUriAndParams(uri);

    auto slash = path.find('/', 5);  // 5 is the length of "s3://" prefix
    if (slash == std::string::npos) {
      throw nix::Error("bad S3 URI '%s'", path);
    }

    std::string bucketName(path, 5, slash - 5);
    std::string key(path, slash + 1);

    return {bucketName, key, params};
  }
#endif

  void enqueueDownload(const DownloadRequest& request,
                       Callback<DownloadResult> callback) override {
    /* Ugly hack to support s3:// URIs. */
    if (hasPrefix(request.uri, "s3://")) {
      // FIXME: do this on a worker thread
      try {
#ifdef ENABLE_S3
        auto [bucketName, key, params] = parseS3Uri(request.uri);

        std::string profile = get(params, "profile", "");
        std::string region = get(params, "region", Aws::Region::US_EAST_1);
        std::string scheme = get(params, "scheme", "");
        std::string endpoint = get(params, "endpoint", "");

        S3Helper s3Helper(profile, region, scheme, endpoint);

        // FIXME: implement ETag
        auto s3Res = s3Helper.getObject(bucketName, key);
        DownloadResult res;
        if (!s3Res.data)
          throw DownloadError(
              NotFound, fmt("S3 object '%s' does not exist", request.uri));
        res.data = s3Res.data;
        callback(std::move(res));
#else
        throw nix::Error(
            "cannot download '%s' because Nix is not built with S3 support",
            request.uri);
#endif
      } catch (...) {
        callback.rethrow();
      }
      return;
    }

    enqueueItem(
        std::make_shared<DownloadItem>(*this, request, std::move(callback)));
  }
};

ref<Downloader> getDownloader() {
  static ref<Downloader> downloader = makeDownloader();
  return downloader;
}

ref<Downloader> makeDownloader() { return make_ref<CurlDownloader>(); }

std::future<DownloadResult> Downloader::enqueueDownload(
    const DownloadRequest& request) {
  auto promise = std::make_shared<std::promise<DownloadResult>>();
  enqueueDownload(request, {[promise](std::future<DownloadResult> fut) {
                    try {
                      promise->set_value(fut.get());
                    } catch (...) {
                      promise->set_exception(std::current_exception());
                    }
                  }});
  return promise->get_future();
}

DownloadResult Downloader::download(const DownloadRequest& request) {
  return enqueueDownload(request).get();
}

void Downloader::download(DownloadRequest&& request, Sink& sink) {
  /* Note: we can't call 'sink' via request.dataCallback, because
     that would cause the sink to execute on the downloader
     thread. If 'sink' is a coroutine, this will fail. Also, if the
     sink is expensive (e.g. one that does decompression and writing
     to the Nix store), it would stall the download thread too much.
     Therefore we use a buffer to communicate data between the
     download thread and the calling thread. */

  struct State {
    bool quit = false;
    std::exception_ptr exc;
    std::string data;
    std::condition_variable avail, request;
  };

  auto _state = std::make_shared<Sync<State>>();

  /* In case of an exception, wake up the download thread. FIXME:
     abort the download request. */
  Finally finally([&]() {
    auto state(_state->lock());
    state->quit = true;
    state->request.notify_one();
  });

  request.dataCallback = [_state](char* buf, size_t len) {
    auto state(_state->lock());

    if (state->quit) {
      return;
    }

    /* If the buffer is full, then go to sleep until the calling
       thread wakes us up (i.e. when it has removed data from the
       buffer). We don't wait forever to prevent stalling the
       download thread. (Hopefully sleeping will throttle the
       sender.) */
    if (state->data.size() > 1024 * 1024) {
      DLOG(INFO) << "download buffer is full; going to sleep";
      state.wait_for(state->request, std::chrono::seconds(10));
    }

    /* Append data to the buffer and wake up the calling
       thread. */
    state->data.append(buf, len);
    state->avail.notify_one();
  };

  enqueueDownload(request, {[_state](std::future<DownloadResult> fut) {
                    auto state(_state->lock());
                    state->quit = true;
                    try {
                      fut.get();
                    } catch (...) {
                      state->exc = std::current_exception();
                    }
                    state->avail.notify_one();
                    state->request.notify_one();
                  }});

  while (true) {
    checkInterrupt();

    std::string chunk;

    /* Grab data if available, otherwise wait for the download
       thread to wake us up. */
    {
      auto state(_state->lock());

      while (state->data.empty()) {
        if (state->quit) {
          if (state->exc) {
            std::rethrow_exception(state->exc);
          }
          return;
        }

        state.wait(state->avail);
      }

      chunk = std::move(state->data);

      state->request.notify_one();
    }

    /* Flush the data to the sink and wake up the download thread
       if it's blocked on a full buffer. We don't hold the state
       lock while doing this to prevent blocking the download
       thread if sink() takes a long time. */
    sink((unsigned char*)chunk.data(), chunk.size());
  }
}

CachedDownloadResult Downloader::downloadCached(
    const ref<Store>& store, const CachedDownloadRequest& request) {
  auto url = resolveUri(request.uri);

  auto name = request.name;
  if (name.empty()) {
    auto p = url.rfind('/');
    if (p != std::string::npos) {
      name = std::string(url, p + 1);
    }
  }

  Path expectedStorePath;
  if (request.expectedHash) {
    expectedStorePath =
        store->makeFixedOutputPath(request.unpack, request.expectedHash, name);
    if (store->isValidPath(expectedStorePath)) {
      CachedDownloadResult result;
      result.storePath = expectedStorePath;
      result.path = store->toRealPath(expectedStorePath);
      return result;
    }
  }

  Path cacheDir = getCacheDir() + "/nix/tarballs";
  createDirs(cacheDir);

  std::string urlHash = hashString(htSHA256, name + std::string("\0"s) + url)
                            .to_string(Base32, false);

  Path dataFile = cacheDir + "/" + urlHash + ".info";
  Path fileLink = cacheDir + "/" + urlHash + "-file";

  PathLocks lock({fileLink}, fmt("waiting for lock on '%1%'...", fileLink));

  Path storePath;

  std::string expectedETag;

  bool skip = false;

  CachedDownloadResult result;

  if (pathExists(fileLink) && pathExists(dataFile)) {
    storePath = readLink(fileLink);
    store->addTempRoot(storePath);
    if (store->isValidPath(storePath)) {
      auto ss =
          tokenizeString<std::vector<std::string>>(readFile(dataFile), "\n");
      if (ss.size() >= 3 && ss[0] == url) {
        time_t lastChecked;
        if (string2Int(ss[2], lastChecked) &&
            (uint64_t)lastChecked + request.ttl >= (uint64_t)time(nullptr)) {
          skip = true;
          result.effectiveUri = request.uri;
          result.etag = ss[1];
        } else if (!ss[1].empty()) {
          DLOG(INFO) << "verifying previous ETag: " << ss[1];
          expectedETag = ss[1];
        }
      }
    } else {
      storePath = "";
    }
  }

  if (!skip) {
    try {
      DownloadRequest request2(url);
      request2.expectedETag = expectedETag;
      auto res = download(request2);
      result.effectiveUri = res.effectiveUri;
      result.etag = res.etag;

      if (!res.cached) {
        ValidPathInfo info;
        StringSink sink;
        dumpString(*res.data, sink);
        Hash hash = hashString(
            request.expectedHash ? request.expectedHash.type : htSHA256,
            *res.data);
        info.path = store->makeFixedOutputPath(false, hash, name);
        info.narHash = hashString(htSHA256, *sink.s);
        info.narSize = sink.s->size();
        info.ca = makeFixedOutputCA(false, hash);
        store->addToStore(info, sink.s, NoRepair, NoCheckSigs);
        storePath = info.path;
      }

      assert(!storePath.empty());
      replaceSymlink(storePath, fileLink);

      writeFile(dataFile, url + "\n" + res.etag + "\n" +
                              std::to_string(time(nullptr)) + "\n");
    } catch (DownloadError& e) {
      if (storePath.empty()) {
        throw;
      }
      LOG(WARNING) << e.msg() << "; using cached result";
      result.etag = expectedETag;
    }
  }

  if (request.unpack) {
    Path unpackedLink = cacheDir + "/" + baseNameOf(storePath) + "-unpacked";
    PathLocks lock2({unpackedLink},
                    fmt("waiting for lock on '%1%'...", unpackedLink));
    Path unpackedStorePath;
    if (pathExists(unpackedLink)) {
      unpackedStorePath = readLink(unpackedLink);
      store->addTempRoot(unpackedStorePath);
      if (!store->isValidPath(unpackedStorePath)) {
        unpackedStorePath = "";
      }
    }
    if (unpackedStorePath.empty()) {
      LOG(INFO) << "unpacking '" << url << "' ...";
      Path tmpDir = createTempDir();
      AutoDelete autoDelete(tmpDir, true);
      // FIXME: this requires GNU tar for decompression.
      runProgram("tar", true,
                 {"xf", store->toRealPath(storePath), "-C", tmpDir,
                  "--strip-components", "1"});
      unpackedStorePath = store->addToStore(name, tmpDir, true, htSHA256,
                                            defaultPathFilter, NoRepair);
    }
    replaceSymlink(unpackedStorePath, unpackedLink);
    storePath = unpackedStorePath;
  }

  if (!expectedStorePath.empty() && storePath != expectedStorePath) {
    unsigned int statusCode = 102;
    Hash gotHash =
        request.unpack
            ? hashPath(request.expectedHash.type, store->toRealPath(storePath))
                  .first
            : hashFile(request.expectedHash.type, store->toRealPath(storePath));
    throw nix::Error(statusCode,
                     "hash mismatch in file downloaded from '%s':\n  wanted: "
                     "%s\n  got:    %s",
                     url, request.expectedHash.to_string(),
                     gotHash.to_string());
  }

  result.storePath = storePath;
  result.path = store->toRealPath(storePath);
  return result;
}

bool isUri(const std::string& s) {
  if (s.compare(0, 8, "channel:") == 0) {
    return true;
  }
  size_t pos = s.find("://");
  if (pos == std::string::npos) {
    return false;
  }
  std::string scheme(s, 0, pos);
  return scheme == "http" || scheme == "https" || scheme == "file" ||
         scheme == "channel" || scheme == "git" || scheme == "s3" ||
         scheme == "ssh";
}

}  // namespace nix