diff options
author | Eelco Dolstra <edolstra@gmail.com> | 2016-10-19T14·37+0200 |
---|---|---|
committer | Eelco Dolstra <edolstra@gmail.com> | 2016-10-19T14·37+0200 |
commit | 307cc8c33d1dc4abaaf15d8f4ec64c02d5bb81aa (patch) | |
tree | d522e2c62241997d2cf45b8d0c5988f4f3e8ddfa /src/libstore | |
parent | 795d9b866881871419651081eb112f23b2f5bff5 (diff) | |
parent | efb938468c0428703addcd4ac7635c8b91b5dafc (diff) |
Merge branch 'priorityqueue' of https://github.com/groxxda/nix
Diffstat (limited to 'src/libstore')
-rw-r--r-- | src/libstore/download.cc | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 16a0bec67ea1..c01ba63ef3c2 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -10,6 +10,7 @@ #include <curl/curl.h> +#include <queue> #include <iostream> #include <thread> #include <cmath> @@ -281,8 +282,13 @@ struct CurlDownloader : public Downloader struct State { + struct EmbargoComparator { + bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) { + return i1->embargo > i2->embargo; + } + }; bool quit = false; - std::vector<std::shared_ptr<DownloadItem>> incoming; + std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming; }; Sync<State> state_; @@ -380,9 +386,7 @@ struct CurlDownloader : public Downloader /* Add new curl requests from the incoming requests queue, except for requests that are embargoed (waiting for a - retry timeout to expire). FIXME: should use a priority - queue for the embargoed items to prevent repeated O(n) - checks. */ + retry timeout to expire). */ if (extraFDs[0].revents & CURL_WAIT_POLLIN) { char buf[1024]; auto res = read(extraFDs[0].fd, buf, sizeof(buf)); @@ -390,22 +394,23 @@ struct CurlDownloader : public Downloader throw SysError("reading curl wakeup socket"); } - std::vector<std::shared_ptr<DownloadItem>> incoming, embargoed; + std::vector<std::shared_ptr<DownloadItem>> incoming; auto now = std::chrono::steady_clock::now(); { auto state(state_.lock()); - for (auto & item: state->incoming) { - if (item->embargo <= now) + while (!state->incoming.empty()) { + auto item = state->incoming.top(); + if (item->embargo <= now) { incoming.push_back(item); - else { - embargoed.push_back(item); + state->incoming.pop(); + } else { if (nextWakeup == std::chrono::steady_clock::time_point() || item->embargo < nextWakeup) nextWakeup = item->embargo; + break; } } - state->incoming = embargoed; quit = state->quit; } @@ -432,7 +437,7 @@ struct CurlDownloader : public Downloader { auto state(state_.lock()); - state->incoming.clear(); + while (!state->incoming.empty()) state->incoming.pop(); state->quit = true; } } @@ -443,7 +448,7 @@ struct CurlDownloader : public Downloader auto state(state_.lock()); if (state->quit) throw nix::Error("cannot enqueue download request because the download thread is shutting down"); - state->incoming.push_back(item); + state->incoming.push(item); } writeFull(wakeupPipe.writeSide.get(), " "); } |