about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libstore/download.cc29
1 files changed, 17 insertions, 12 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 97e9b0b2fd66..317ec37d16af 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -10,6 +10,7 @@
 
 #include <curl/curl.h>
 
+#include <queue>
 #include <iostream>
 #include <thread>
 #include <cmath>
@@ -281,8 +282,13 @@ struct CurlDownloader : public Downloader
 
     struct State
     {
+        struct EmbargoComparator {
+            bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
+                return i1->embargo > i2->embargo;
+            }
+        };
         bool quit = false;
-        std::vector<std::shared_ptr<DownloadItem>> incoming;
+        std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
     };
 
     Sync<State> state_;
@@ -380,9 +386,7 @@ struct CurlDownloader : public Downloader
 
             /* Add new curl requests from the incoming requests queue,
                except for requests that are embargoed (waiting for a
-               retry timeout to expire). FIXME: should use a priority
-               queue for the embargoed items to prevent repeated O(n)
-               checks. */
+               retry timeout to expire). */
             if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
                 char buf[1024];
                 auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -390,22 +394,23 @@ struct CurlDownloader : public Downloader
                     throw SysError("reading curl wakeup socket");
             }
 
-            std::vector<std::shared_ptr<DownloadItem>> incoming, embargoed;
+            std::vector<std::shared_ptr<DownloadItem>> incoming;
             auto now = std::chrono::steady_clock::now();
 
             {
                 auto state(state_.lock());
-                for (auto & item: state->incoming) {
-                    if (item->embargo <= now)
+                while (!state->incoming.empty()) {
+                    auto item = state->incoming.top();
+                    if (item->embargo <= now) {
                         incoming.push_back(item);
-                    else {
-                        embargoed.push_back(item);
+                        state->incoming.pop();
+                    } else {
                         if (nextWakeup == std::chrono::steady_clock::time_point()
                             || item->embargo < nextWakeup)
                             nextWakeup = item->embargo;
+                        break;
                     }
                 }
-                state->incoming = embargoed;
                 quit = state->quit;
             }
 
@@ -432,7 +437,7 @@ struct CurlDownloader : public Downloader
 
         {
             auto state(state_.lock());
-            state->incoming.clear();
+            while (!state->incoming.empty()) state->incoming.pop();
             state->quit = true;
         }
     }
@@ -443,7 +448,7 @@ struct CurlDownloader : public Downloader
             auto state(state_.lock());
             if (state->quit)
                 throw nix::Error("cannot enqueue download request because the download thread is shutting down");
-            state->incoming.push_back(item);
+            state->incoming.push(item);
         }
         writeFull(wakeupPipe.writeSide.get(), " ");
     }