about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2017-02-14T13·20+0100
committerEelco Dolstra <edolstra@gmail.com>2017-02-14T13·20+0100
commit9ff9c3f2f80ba4108e9c945bbfda2c64735f987b (patch)
tree7ce169da649ca6e0a0cd88d6b85544d5c6038e95
parent62ff5ad424547630e70f35406da85fbb5ec3445a (diff)
Add support for s3:// URIs
This adds support for s3:// URIs in all places where Nix allows URIs,
e.g. in builtins.fetchurl, builtins.fetchTarball, <nix/fetchurl.nix>
and NIX_PATH. It allows fetching resources from private S3 buckets,
using credentials obtained from the standard places (i.e. AWS_*
environment variables, ~/.aws/credentials and the EC2 metadata
server). This may not be super-useful in general, but since we already
depend on aws-sdk-cpp, it's a cheap feature to add.
-rw-r--r--src/libstore/download.cc28
-rw-r--r--src/libstore/download.hh2
-rw-r--r--src/libstore/s3-binary-cache-store.cc141
-rw-r--r--src/libstore/s3.hh33
-rw-r--r--src/libutil/logging.hh1
5 files changed, 142 insertions, 63 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 074e0ca6642a..85215439a0fa 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -4,6 +4,7 @@
 #include "hash.hh"
 #include "store-api.hh"
 #include "archive.hh"
+#include "s3.hh"
 
 #include <unistd.h>
 #include <fcntl.h>
@@ -480,6 +481,31 @@ struct CurlDownloader : public Downloader
         std::function<void(const DownloadResult &)> success,
         std::function<void(std::exception_ptr exc)> failure) override
     {
+        /* Ugly hack to support s3:// URIs. */
+        if (hasPrefix(request.uri, "s3://")) {
+            // FIXME: do this on a worker thread
+            sync2async<DownloadResult>(success, failure, [&]() {
+#ifdef ENABLE_S3
+                S3Helper s3Helper;
+                auto slash = request.uri.find('/', 5);
+                if (slash == std::string::npos)
+                    throw nix::Error("bad S3 URI ‘%s’", request.uri);
+                std::string bucketName(request.uri, 5, slash - 5);
+                std::string key(request.uri, slash + 1);
+                // FIXME: implement ETag
+                auto s3Res = s3Helper.getObject(bucketName, key);
+                DownloadResult res;
+                if (!s3Res.data)
+                    throw DownloadError(NotFound, fmt("S3 object ‘%s’ does not exist", request.uri));
+                res.data = s3Res.data;
+                return res;
+#else
+                throw nix::Error("cannot download ‘%s’ because Nix is not built with S3 support", request.uri);
+#endif
+            });
+            return;
+        }
+
         auto item = std::make_shared<DownloadItem>(*this, request);
         item->success = success;
         item->failure = failure;
@@ -629,7 +655,7 @@ bool isUri(const string & s)
     size_t pos = s.find("://");
     if (pos == string::npos) return false;
     string scheme(s, 0, pos);
-    return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git";
+    return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3";
 }
 
 
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index 82b5d641fde9..bdb5011e7830 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -23,7 +23,7 @@ struct DownloadRequest
 
 struct DownloadResult
 {
-    bool cached;
+    bool cached = false;
     std::string etag;
     std::string effectiveUrl;
     std::shared_ptr<std::string> data;
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index cc1b33104200..ac083410b353 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -1,6 +1,6 @@
 #if ENABLE_S3
-#if __linux__
 
+#include "s3.hh"
 #include "s3-binary-cache-store.hh"
 #include "nar-info.hh"
 #include "nar-info-disk-cache.hh"
@@ -18,15 +18,6 @@
 
 namespace nix {
 
-struct istringstream_nocopy : public std::stringstream
-{
-    istringstream_nocopy(const std::string & s)
-    {
-        rdbuf()->pubsetbuf(
-            (char *) s.data(), s.size());
-    }
-};
-
 struct S3Error : public Error
 {
     Aws::S3::S3Errors err;
@@ -60,21 +51,81 @@ static void initAWS()
     });
 }
 
+S3Helper::S3Helper()
+    : config(makeConfig())
+    , client(make_ref<Aws::S3::S3Client>(*config))
+{
+}
+
+ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig()
+{
+    initAWS();
+    auto res = make_ref<Aws::Client::ClientConfiguration>();
+    res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
+    res->requestTimeoutMs = 600 * 1000;
+    return res;
+}
+
+S3Helper::DownloadResult S3Helper::getObject(
+    const std::string & bucketName, const std::string & key)
+{
+    debug("fetching ‘s3://%s/%s’...", bucketName, key);
+
+    auto request =
+        Aws::S3::Model::GetObjectRequest()
+        .WithBucket(bucketName)
+        .WithKey(key);
+
+    request.SetResponseStreamFactory([&]() {
+        return Aws::New<std::stringstream>("STRINGSTREAM");
+    });
+
+    DownloadResult res;
+
+    auto now1 = std::chrono::steady_clock::now();
+
+    try {
+
+        auto result = checkAws(fmt("AWS error fetching ‘%s’", key),
+            client->GetObject(request));
+
+        res.data = std::make_shared<std::string>(
+            dynamic_cast<std::stringstream &>(result.GetBody()).str());
+
+    } catch (S3Error & e) {
+        if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
+    }
+
+    auto now2 = std::chrono::steady_clock::now();
+
+    res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+
+    return res;
+}
+
+#if __linux__
+
+struct istringstream_nocopy : public std::stringstream
+{
+    istringstream_nocopy(const std::string & s)
+    {
+        rdbuf()->pubsetbuf(
+            (char *) s.data(), s.size());
+    }
+};
+
 struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
 {
     std::string bucketName;
 
-    ref<Aws::Client::ClientConfiguration> config;
-    ref<Aws::S3::S3Client> client;
-
     Stats stats;
 
+    S3Helper s3Helper;
+
     S3BinaryCacheStoreImpl(
         const Params & params, const std::string & bucketName)
         : S3BinaryCacheStore(params)
         , bucketName(bucketName)
-        , config(makeConfig())
-        , client(make_ref<Aws::S3::S3Client>(*config))
     {
         diskCache = getNarInfoDiskCache();
     }
@@ -84,15 +135,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         return "s3://" + bucketName;
     }
 
-    ref<Aws::Client::ClientConfiguration> makeConfig()
-    {
-        initAWS();
-        auto res = make_ref<Aws::Client::ClientConfiguration>();
-        res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
-        res->requestTimeoutMs = 600 * 1000;
-        return res;
-    }
-
     void init() override
     {
         if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) {
@@ -100,7 +142,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
             /* Create the bucket if it doesn't already exists. */
             // FIXME: HeadBucket would be more appropriate, but doesn't return
             // an easily parsed 404 message.
-            auto res = client->GetBucketLocation(
+            auto res = s3Helper.client->GetBucketLocation(
                 Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName));
 
             if (!res.IsSuccess()) {
@@ -108,7 +150,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
                     throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage());
 
                 checkAws(format("AWS error creating bucket ‘%s’") % bucketName,
-                    client->CreateBucket(
+                    s3Helper.client->CreateBucket(
                         Aws::S3::Model::CreateBucketRequest()
                         .WithBucket(bucketName)
                         .WithCreateBucketConfiguration(
@@ -146,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
     {
         stats.head++;
 
-        auto res = client->HeadObject(
+        auto res = s3Helper.client->HeadObject(
             Aws::S3::Model::HeadObjectRequest()
             .WithBucket(bucketName)
             .WithKey(path));
@@ -179,7 +221,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         auto now1 = std::chrono::steady_clock::now();
 
         auto result = checkAws(format("AWS error uploading ‘%s’") % path,
-            client->PutObject(request));
+            s3Helper.client->PutObject(request));
 
         auto now2 = std::chrono::steady_clock::now();
 
@@ -198,42 +240,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
             debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path);
 
-            auto request =
-                Aws::S3::Model::GetObjectRequest()
-                .WithBucket(bucketName)
-                .WithKey(path);
-
-            request.SetResponseStreamFactory([&]() {
-                return Aws::New<std::stringstream>("STRINGSTREAM");
-            });
-
             stats.get++;
 
-            try {
-
-                auto now1 = std::chrono::steady_clock::now();
-
-                auto result = checkAws(format("AWS error fetching ‘%s’") % path,
-                    client->GetObject(request));
-
-                auto now2 = std::chrono::steady_clock::now();
+            auto res = s3Helper.getObject(bucketName, path);
 
-                auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
+            stats.getBytes += res.data ? res.data->size() : 0;
+            stats.getTimeMs += res.durationMs;
 
-                auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+            if (res.data)
+                printTalkative("downloaded ‘s3://%s/%s’ (%d bytes) in %d ms",
+                    bucketName, path, res.data->size(), res.durationMs);
 
-                printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
-                    % bucketName % path % res.size() % duration);
-
-                stats.getBytes += res.size();
-                stats.getTimeMs += duration;
-
-                return std::make_shared<std::string>(res);
-
-            } catch (S3Error & e) {
-                if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
-                throw;
-            }
+            return res.data;
         });
     }
 
@@ -246,7 +264,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
             debug(format("listing bucket ‘s3://%s’ from key ‘%s’...") % bucketName % marker);
 
             auto res = checkAws(format("AWS error listing bucket ‘%s’") % bucketName,
-                client->ListObjects(
+                s3Helper.client->ListObjects(
                     Aws::S3::Model::ListObjectsRequest()
                     .WithBucket(bucketName)
                     .WithDelimiter("/")
@@ -281,7 +299,8 @@ static RegisterStoreImplementation regStore([](
     return store;
 });
 
+#endif
+
 }
 
 #endif
-#endif
diff --git a/src/libstore/s3.hh b/src/libstore/s3.hh
new file mode 100644
index 000000000000..5d5d3475c449
--- /dev/null
+++ b/src/libstore/s3.hh
@@ -0,0 +1,33 @@
+#pragma once
+
+#if ENABLE_S3
+
+#include "ref.hh"
+
+namespace Aws { namespace Client { class ClientConfiguration; } }
+namespace Aws { namespace S3 { class S3Client; } }
+
+namespace nix {
+
+struct S3Helper
+{
+    ref<Aws::Client::ClientConfiguration> config;
+    ref<Aws::S3::S3Client> client;
+
+    S3Helper();
+
+    ref<Aws::Client::ClientConfiguration> makeConfig();
+
+    struct DownloadResult
+    {
+        std::shared_ptr<std::string> data;
+        unsigned int durationMs;
+    };
+
+    DownloadResult getObject(
+        const std::string & bucketName, const std::string & key);
+};
+
+}
+
+#endif
diff --git a/src/libutil/logging.hh b/src/libutil/logging.hh
index 3e6c4b54853c..3f83664794f7 100644
--- a/src/libutil/logging.hh
+++ b/src/libutil/logging.hh
@@ -78,6 +78,7 @@ extern Verbosity verbosity; /* suppress msgs > this */
 
 #define printError(args...) printMsg(lvlError, args)
 #define printInfo(args...) printMsg(lvlInfo, args)
+#define printTalkative(args...) printMsg(lvlTalkative, args)
 #define debug(args...) printMsg(lvlDebug, args)
 #define vomit(args...) printMsg(lvlVomit, args)