about summary refs log tree commit diff
path: root/third_party/nix/src/libstore/s3-binary-cache-store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/nix/src/libstore/s3-binary-cache-store.cc')
-rw-r--r--third_party/nix/src/libstore/s3-binary-cache-store.cc667
1 files changed, 327 insertions, 340 deletions
diff --git a/third_party/nix/src/libstore/s3-binary-cache-store.cc b/third_party/nix/src/libstore/s3-binary-cache-store.cc
index cd547a9648..8730c2dd4d 100644
--- a/third_party/nix/src/libstore/s3-binary-cache-store.cc
+++ b/third_party/nix/src/libstore/s3-binary-cache-store.cc
@@ -1,14 +1,6 @@
 #if ENABLE_S3
 
-#include "s3.hh"
 #include "s3-binary-cache-store.hh"
-#include "nar-info.hh"
-#include "nar-info-disk-cache.hh"
-#include "globals.hh"
-#include "compression.hh"
-#include "download.hh"
-#include "istringstream_nocopy.hh"
-
 #include <aws/core/Aws.h>
 #include <aws/core/VersionConfig.h>
 #include <aws/core/auth/AWSCredentialsProvider.h>
@@ -24,408 +16,403 @@
 #include <aws/s3/model/ListObjectsRequest.h>
 #include <aws/s3/model/PutObjectRequest.h>
 #include <aws/transfer/TransferManager.h>
+#include "compression.hh"
+#include "download.hh"
+#include "globals.hh"
+#include "istringstream_nocopy.hh"
+#include "nar-info-disk-cache.hh"
+#include "nar-info.hh"
+#include "s3.hh"
 
 using namespace Aws::Transfer;
 
 namespace nix {
 
-struct S3Error : public Error
-{
-    Aws::S3::S3Errors err;
-    S3Error(Aws::S3::S3Errors err, const FormatOrString & fs)
-        : Error(fs), err(err) { };
+struct S3Error : public Error {
+  Aws::S3::S3Errors err;
+  S3Error(Aws::S3::S3Errors err, const FormatOrString& fs)
+      : Error(fs), err(err){};
 };
 
 /* Helper: given an Outcome<R, E>, return R in case of success, or
    throw an exception in case of an error. */
-template<typename R, typename E>
-R && checkAws(const FormatOrString & fs, Aws::Utils::Outcome<R, E> && outcome)
-{
-    if (!outcome.IsSuccess())
-        throw S3Error(
-            outcome.GetError().GetErrorType(),
-            fs.s + ": " + outcome.GetError().GetMessage());
-    return outcome.GetResultWithOwnership();
+template <typename R, typename E>
+R&& checkAws(const FormatOrString& fs, Aws::Utils::Outcome<R, E>&& outcome) {
+  if (!outcome.IsSuccess())
+    throw S3Error(outcome.GetError().GetErrorType(),
+                  fs.s + ": " + outcome.GetError().GetMessage());
+  return outcome.GetResultWithOwnership();
 }
 
-class AwsLogger : public Aws::Utils::Logging::FormattedLogSystem
-{
-    using Aws::Utils::Logging::FormattedLogSystem::FormattedLogSystem;
+class AwsLogger : public Aws::Utils::Logging::FormattedLogSystem {
+  using Aws::Utils::Logging::FormattedLogSystem::FormattedLogSystem;
 
-    void ProcessFormattedStatement(Aws::String && statement) override
-    {
-        debug("AWS: %s", chomp(statement));
-    }
+  void ProcessFormattedStatement(Aws::String&& statement) override {
+    debug("AWS: %s", chomp(statement));
+  }
 };
 
-static void initAWS()
-{
-    static std::once_flag flag;
-    std::call_once(flag, []() {
-        Aws::SDKOptions options;
-
-        /* We install our own OpenSSL locking function (see
-           shared.cc), so don't let aws-sdk-cpp override it. */
-        options.cryptoOptions.initAndCleanupOpenSSL = false;
-
-        if (verbosity >= lvlDebug) {
-            options.loggingOptions.logLevel =
-                verbosity == lvlDebug
-                ? Aws::Utils::Logging::LogLevel::Debug
-                : Aws::Utils::Logging::LogLevel::Trace;
-            options.loggingOptions.logger_create_fn = [options]() {
-                return std::make_shared<AwsLogger>(options.loggingOptions.logLevel);
-            };
-        }
+static void initAWS() {
+  static std::once_flag flag;
+  std::call_once(flag, []() {
+    Aws::SDKOptions options;
+
+    /* We install our own OpenSSL locking function (see
+       shared.cc), so don't let aws-sdk-cpp override it. */
+    options.cryptoOptions.initAndCleanupOpenSSL = false;
+
+    if (verbosity >= lvlDebug) {
+      options.loggingOptions.logLevel =
+          verbosity == lvlDebug ? Aws::Utils::Logging::LogLevel::Debug
+                                : Aws::Utils::Logging::LogLevel::Trace;
+      options.loggingOptions.logger_create_fn = [options]() {
+        return std::make_shared<AwsLogger>(options.loggingOptions.logLevel);
+      };
+    }
 
-        Aws::InitAPI(options);
-    });
+    Aws::InitAPI(options);
+  });
 }
 
-S3Helper::S3Helper(const string & profile, const string & region, const string & scheme, const string & endpoint)
-    : config(makeConfig(region, scheme, endpoint))
-    , client(make_ref<Aws::S3::S3Client>(
-            profile == ""
-            ? std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>(
-                std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>())
-            : std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>(
-                std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(profile.c_str())),
-            *config,
-            // FIXME: https://github.com/aws/aws-sdk-cpp/issues/759
+S3Helper::S3Helper(const string& profile, const string& region,
+                   const string& scheme, const string& endpoint)
+    : config(makeConfig(region, scheme, endpoint)),
+      client(make_ref<Aws::S3::S3Client>(
+          profile == ""
+              ? std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>(
+                    std::make_shared<
+                        Aws::Auth::DefaultAWSCredentialsProviderChain>())
+              : std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>(
+                    std::make_shared<
+                        Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(
+                        profile.c_str())),
+          *config,
+// FIXME: https://github.com/aws/aws-sdk-cpp/issues/759
 #if AWS_VERSION_MAJOR == 1 && AWS_VERSION_MINOR < 3
-            false,
+          false,
 #else
-            Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+          Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
 #endif
-            endpoint.empty()))
-{
+          endpoint.empty())) {
 }
 
 /* Log AWS retries. */
-class RetryStrategy : public Aws::Client::DefaultRetryStrategy
-{
-    bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override
-    {
-        auto retry = Aws::Client::DefaultRetryStrategy::ShouldRetry(error, attemptedRetries);
-        if (retry)
-            printError("AWS error '%s' (%s), will retry in %d ms",
-                error.GetExceptionName(), error.GetMessage(), CalculateDelayBeforeNextRetry(error, attemptedRetries));
-        return retry;
-    }
+class RetryStrategy : public Aws::Client::DefaultRetryStrategy {
+  bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
+                   long attemptedRetries) const override {
+    auto retry =
+        Aws::Client::DefaultRetryStrategy::ShouldRetry(error, attemptedRetries);
+    if (retry)
+      printError("AWS error '%s' (%s), will retry in %d ms",
+                 error.GetExceptionName(), error.GetMessage(),
+                 CalculateDelayBeforeNextRetry(error, attemptedRetries));
+    return retry;
+  }
 };
 
-ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region, const string & scheme, const string & endpoint)
-{
-    initAWS();
-    auto res = make_ref<Aws::Client::ClientConfiguration>();
-    res->region = region;
-    if (!scheme.empty()) {
-        res->scheme = Aws::Http::SchemeMapper::FromString(scheme.c_str());
-    }
-    if (!endpoint.empty()) {
-        res->endpointOverride = endpoint;
-    }
-    res->requestTimeoutMs = 600 * 1000;
-    res->connectTimeoutMs = 5 * 1000;
-    res->retryStrategy = std::make_shared<RetryStrategy>();
-    res->caFile = settings.caFile;
-    return res;
+ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(
+    const string& region, const string& scheme, const string& endpoint) {
+  initAWS();
+  auto res = make_ref<Aws::Client::ClientConfiguration>();
+  res->region = region;
+  if (!scheme.empty()) {
+    res->scheme = Aws::Http::SchemeMapper::FromString(scheme.c_str());
+  }
+  if (!endpoint.empty()) {
+    res->endpointOverride = endpoint;
+  }
+  res->requestTimeoutMs = 600 * 1000;
+  res->connectTimeoutMs = 5 * 1000;
+  res->retryStrategy = std::make_shared<RetryStrategy>();
+  res->caFile = settings.caFile;
+  return res;
 }
 
-S3Helper::DownloadResult S3Helper::getObject(
-    const std::string & bucketName, const std::string & key)
-{
-    debug("fetching 's3://%s/%s'...", bucketName, key);
+S3Helper::DownloadResult S3Helper::getObject(const std::string& bucketName,
+                                             const std::string& key) {
+  debug("fetching 's3://%s/%s'...", bucketName, key);
 
-    auto request =
-        Aws::S3::Model::GetObjectRequest()
-        .WithBucket(bucketName)
-        .WithKey(key);
-
-    request.SetResponseStreamFactory([&]() {
-        return Aws::New<std::stringstream>("STRINGSTREAM");
-    });
+  auto request =
+      Aws::S3::Model::GetObjectRequest().WithBucket(bucketName).WithKey(key);
 
-    DownloadResult res;
+  request.SetResponseStreamFactory(
+      [&]() { return Aws::New<std::stringstream>("STRINGSTREAM"); });
 
-    auto now1 = std::chrono::steady_clock::now();
+  DownloadResult res;
 
-    try {
+  auto now1 = std::chrono::steady_clock::now();
 
-        auto result = checkAws(fmt("AWS error fetching '%s'", key),
-            client->GetObject(request));
+  try {
+    auto result = checkAws(fmt("AWS error fetching '%s'", key),
+                           client->GetObject(request));
 
-        res.data = decompress(result.GetContentEncoding(),
-            dynamic_cast<std::stringstream &>(result.GetBody()).str());
+    res.data =
+        decompress(result.GetContentEncoding(),
+                   dynamic_cast<std::stringstream&>(result.GetBody()).str());
 
-    } catch (S3Error & e) {
-        if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
-    }
+  } catch (S3Error& e) {
+    if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
+  }
 
-    auto now2 = std::chrono::steady_clock::now();
+  auto now2 = std::chrono::steady_clock::now();
 
-    res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+  res.durationMs =
+      std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1)
+          .count();
 
-    return res;
+  return res;
 }
 
-struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
-{
-    const Setting<std::string> profile{this, "", "profile", "The name of the AWS configuration profile to use."};
-    const Setting<std::string> region{this, Aws::Region::US_EAST_1, "region", {"aws-region"}};
-    const Setting<std::string> scheme{this, "", "scheme", "The scheme to use for S3 requests, https by default."};
-    const Setting<std::string> endpoint{this, "", "endpoint", "An optional override of the endpoint to use when talking to S3."};
-    const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
-    const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
-    const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
-    const Setting<bool> multipartUpload{
-        this, false, "multipart-upload", "whether to use multi-part uploads"};
-    const Setting<uint64_t> bufferSize{
-        this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"};
-
-    std::string bucketName;
-
-    Stats stats;
-
-    S3Helper s3Helper;
-
-    S3BinaryCacheStoreImpl(
-        const Params & params, const std::string & bucketName)
-        : S3BinaryCacheStore(params)
-        , bucketName(bucketName)
-        , s3Helper(profile, region, scheme, endpoint)
-    {
-        diskCache = getNarInfoDiskCache();
-    }
-
-    std::string getUri() override
-    {
-        return "s3://" + bucketName;
-    }
-
-    void init() override
-    {
-        if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) {
-
-            BinaryCacheStore::init();
-
-            diskCache->createCache(getUri(), storeDir, wantMassQuery_, priority);
-        }
+struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore {
+  const Setting<std::string> profile{
+      this, "", "profile", "The name of the AWS configuration profile to use."};
+  const Setting<std::string> region{
+      this, Aws::Region::US_EAST_1, "region", {"aws-region"}};
+  const Setting<std::string> scheme{
+      this, "", "scheme",
+      "The scheme to use for S3 requests, https by default."};
+  const Setting<std::string> endpoint{
+      this, "", "endpoint",
+      "An optional override of the endpoint to use when talking to S3."};
+  const Setting<std::string> narinfoCompression{
+      this, "", "narinfo-compression", "compression method for .narinfo files"};
+  const Setting<std::string> lsCompression{this, "", "ls-compression",
+                                           "compression method for .ls files"};
+  const Setting<std::string> logCompression{
+      this, "", "log-compression", "compression method for log/* files"};
+  const Setting<bool> multipartUpload{this, false, "multipart-upload",
+                                      "whether to use multi-part uploads"};
+  const Setting<uint64_t> bufferSize{
+      this, 5 * 1024 * 1024, "buffer-size",
+      "size (in bytes) of each part in multi-part uploads"};
+
+  std::string bucketName;
+
+  Stats stats;
+
+  S3Helper s3Helper;
+
+  S3BinaryCacheStoreImpl(const Params& params, const std::string& bucketName)
+      : S3BinaryCacheStore(params),
+        bucketName(bucketName),
+        s3Helper(profile, region, scheme, endpoint) {
+    diskCache = getNarInfoDiskCache();
+  }
+
+  std::string getUri() override { return "s3://" + bucketName; }
+
+  void init() override {
+    if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) {
+      BinaryCacheStore::init();
+
+      diskCache->createCache(getUri(), storeDir, wantMassQuery_, priority);
     }
+  }
 
-    const Stats & getS3Stats() override
-    {
-        return stats;
-    }
+  const Stats& getS3Stats() override { return stats; }
 
-    /* This is a specialisation of isValidPath() that optimistically
-       fetches the .narinfo file, rather than first checking for its
-       existence via a HEAD request. Since .narinfos are small, doing
-       a GET is unlikely to be slower than HEAD. */
-    bool isValidPathUncached(const Path & storePath) override
-    {
-        try {
-            queryPathInfo(storePath);
-            return true;
-        } catch (InvalidPath & e) {
-            return false;
-        }
+  /* This is a specialisation of isValidPath() that optimistically
+     fetches the .narinfo file, rather than first checking for its
+     existence via a HEAD request. Since .narinfos are small, doing
+     a GET is unlikely to be slower than HEAD. */
+  bool isValidPathUncached(const Path& storePath) override {
+    try {
+      queryPathInfo(storePath);
+      return true;
+    } catch (InvalidPath& e) {
+      return false;
     }
-
-    bool fileExists(const std::string & path) override
-    {
-        stats.head++;
-
-        auto res = s3Helper.client->HeadObject(
-            Aws::S3::Model::HeadObjectRequest()
-            .WithBucket(bucketName)
-            .WithKey(path));
-
-        if (!res.IsSuccess()) {
-            auto & error = res.GetError();
-            if (error.GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND
-                || error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY
-                // If bucket listing is disabled, 404s turn into 403s
-                || error.GetErrorType() == Aws::S3::S3Errors::ACCESS_DENIED)
-                return false;
-            throw Error(format("AWS error fetching '%s': %s") % path % error.GetMessage());
-        }
-
-        return true;
+  }
+
+  bool fileExists(const std::string& path) override {
+    stats.head++;
+
+    auto res = s3Helper.client->HeadObject(Aws::S3::Model::HeadObjectRequest()
+                                               .WithBucket(bucketName)
+                                               .WithKey(path));
+
+    if (!res.IsSuccess()) {
+      auto& error = res.GetError();
+      if (error.GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND ||
+          error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY
+          // If bucket listing is disabled, 404s turn into 403s
+          || error.GetErrorType() == Aws::S3::S3Errors::ACCESS_DENIED)
+        return false;
+      throw Error(format("AWS error fetching '%s': %s") % path %
+                  error.GetMessage());
     }
 
-    std::shared_ptr<TransferManager> transferManager;
-    std::once_flag transferManagerCreated;
-
-    void uploadFile(const std::string & path, const std::string & data,
-        const std::string & mimeType,
-        const std::string & contentEncoding)
-    {
-        auto stream = std::make_shared<istringstream_nocopy>(data);
-
-        auto maxThreads = std::thread::hardware_concurrency();
-
-        static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
-            executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);
-
-        std::call_once(transferManagerCreated, [&]()
-        {
-            if (multipartUpload) {
-                TransferManagerConfiguration transferConfig(executor.get());
-
-                transferConfig.s3Client = s3Helper.client;
-                transferConfig.bufferSize = bufferSize;
-
-                transferConfig.uploadProgressCallback =
-                    [](const TransferManager *transferManager,
-                        const std::shared_ptr<const TransferHandle>
-                        &transferHandle)
-                    {
-                        //FIXME: find a way to properly abort the multipart upload.
-                        //checkInterrupt();
-                        debug("upload progress ('%s'): '%d' of '%d' bytes",
-                            transferHandle->GetKey(),
-                            transferHandle->GetBytesTransferred(),
-                            transferHandle->GetBytesTotalSize());
-                    };
-
-                transferManager = TransferManager::Create(transferConfig);
-            }
-        });
-
-        auto now1 = std::chrono::steady_clock::now();
-
-        if (transferManager) {
-
-            if (contentEncoding != "")
-                throw Error("setting a content encoding is not supported with S3 multi-part uploads");
-
-            std::shared_ptr<TransferHandle> transferHandle =
-                transferManager->UploadFile(
-                    stream, bucketName, path, mimeType,
-                    Aws::Map<Aws::String, Aws::String>(),
-                    nullptr /*, contentEncoding */);
+    return true;
+  }
 
-            transferHandle->WaitUntilFinished();
+  std::shared_ptr<TransferManager> transferManager;
+  std::once_flag transferManagerCreated;
 
-            if (transferHandle->GetStatus() == TransferStatus::FAILED)
-                throw Error("AWS error: failed to upload 's3://%s/%s': %s",
-                    bucketName, path, transferHandle->GetLastError().GetMessage());
+  void uploadFile(const std::string& path, const std::string& data,
+                  const std::string& mimeType,
+                  const std::string& contentEncoding) {
+    auto stream = std::make_shared<istringstream_nocopy>(data);
 
-            if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
-                throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
-                    bucketName, path);
+    auto maxThreads = std::thread::hardware_concurrency();
 
-        } else {
+    static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
+        executor =
+            std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(
+                maxThreads);
 
-            auto request =
-                Aws::S3::Model::PutObjectRequest()
-                .WithBucket(bucketName)
-                .WithKey(path);
+    std::call_once(transferManagerCreated, [&]() {
+      if (multipartUpload) {
+        TransferManagerConfiguration transferConfig(executor.get());
 
-            request.SetContentType(mimeType);
+        transferConfig.s3Client = s3Helper.client;
+        transferConfig.bufferSize = bufferSize;
 
-            if (contentEncoding != "")
-                request.SetContentEncoding(contentEncoding);
+        transferConfig.uploadProgressCallback =
+            [](const TransferManager* transferManager,
+               const std::shared_ptr<const TransferHandle>& transferHandle) {
+              // FIXME: find a way to properly abort the multipart upload.
+              // checkInterrupt();
+              debug("upload progress ('%s'): '%d' of '%d' bytes",
+                    transferHandle->GetKey(),
+                    transferHandle->GetBytesTransferred(),
+                    transferHandle->GetBytesTotalSize());
+            };
 
-            auto stream = std::make_shared<istringstream_nocopy>(data);
+        transferManager = TransferManager::Create(transferConfig);
+      }
+    });
 
-            request.SetBody(stream);
+    auto now1 = std::chrono::steady_clock::now();
 
-            auto result = checkAws(fmt("AWS error uploading '%s'", path),
-                s3Helper.client->PutObject(request));
-        }
+    if (transferManager) {
+      if (contentEncoding != "")
+        throw Error(
+            "setting a content encoding is not supported with S3 multi-part "
+            "uploads");
 
-        auto now2 = std::chrono::steady_clock::now();
+      std::shared_ptr<TransferHandle> transferHandle =
+          transferManager->UploadFile(stream, bucketName, path, mimeType,
+                                      Aws::Map<Aws::String, Aws::String>(),
+                                      nullptr /*, contentEncoding */);
 
-        auto duration =
-            std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1)
-                .count();
+      transferHandle->WaitUntilFinished();
 
-        printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") %
-                  bucketName % path % data.size() % duration);
+      if (transferHandle->GetStatus() == TransferStatus::FAILED)
+        throw Error("AWS error: failed to upload 's3://%s/%s': %s", bucketName,
+                    path, transferHandle->GetLastError().GetMessage());
 
-        stats.putTimeMs += duration;
-        stats.putBytes += data.size();
-        stats.put++;
-    }
+      if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
+        throw Error(
+            "AWS error: transfer status of 's3://%s/%s' in unexpected state",
+            bucketName, path);
 
-    void upsertFile(const std::string & path, const std::string & data,
-        const std::string & mimeType) override
-    {
-        if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
-            uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression);
-        else if (lsCompression != "" && hasSuffix(path, ".ls"))
-            uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression);
-        else if (logCompression != "" && hasPrefix(path, "log/"))
-            uploadFile(path, *compress(logCompression, data), mimeType, logCompression);
-        else
-            uploadFile(path, data, mimeType, "");
-    }
+    } else {
+      auto request = Aws::S3::Model::PutObjectRequest()
+                         .WithBucket(bucketName)
+                         .WithKey(path);
 
-    void getFile(const std::string & path, Sink & sink) override
-    {
-        stats.get++;
+      request.SetContentType(mimeType);
 
-        // FIXME: stream output to sink.
-        auto res = s3Helper.getObject(bucketName, path);
+      if (contentEncoding != "") request.SetContentEncoding(contentEncoding);
 
-        stats.getBytes += res.data ? res.data->size() : 0;
-        stats.getTimeMs += res.durationMs;
+      auto stream = std::make_shared<istringstream_nocopy>(data);
 
-        if (res.data) {
-            printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms",
-                bucketName, path, res.data->size(), res.durationMs);
+      request.SetBody(stream);
 
-            sink((unsigned char *) res.data->data(), res.data->size());
-        } else
-            throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
+      auto result = checkAws(fmt("AWS error uploading '%s'", path),
+                             s3Helper.client->PutObject(request));
     }
 
-    PathSet queryAllValidPaths() override
-    {
-        PathSet paths;
-        std::string marker;
-
-        do {
-            debug(format("listing bucket 's3://%s' from key '%s'...") % bucketName % marker);
-
-            auto res = checkAws(format("AWS error listing bucket '%s'") % bucketName,
-                s3Helper.client->ListObjects(
-                    Aws::S3::Model::ListObjectsRequest()
-                    .WithBucket(bucketName)
-                    .WithDelimiter("/")
-                    .WithMarker(marker)));
-
-            auto & contents = res.GetContents();
-
-            debug(format("got %d keys, next marker '%s'")
-                % contents.size() % res.GetNextMarker());
-
-            for (auto object : contents) {
-                auto & key = object.GetKey();
-                if (key.size() != 40 || !hasSuffix(key, ".narinfo")) continue;
-                paths.insert(storeDir + "/" + key.substr(0, key.size() - 8));
-            }
-
-            marker = res.GetNextMarker();
-        } while (!marker.empty());
-
-        return paths;
-    }
+    auto now2 = std::chrono::steady_clock::now();
 
+    auto duration =
+        std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1)
+            .count();
+
+    printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") %
+              bucketName % path % data.size() % duration);
+
+    stats.putTimeMs += duration;
+    stats.putBytes += data.size();
+    stats.put++;
+  }
+
+  void upsertFile(const std::string& path, const std::string& data,
+                  const std::string& mimeType) override {
+    if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
+      uploadFile(path, *compress(narinfoCompression, data), mimeType,
+                 narinfoCompression);
+    else if (lsCompression != "" && hasSuffix(path, ".ls"))
+      uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression);
+    else if (logCompression != "" && hasPrefix(path, "log/"))
+      uploadFile(path, *compress(logCompression, data), mimeType,
+                 logCompression);
+    else
+      uploadFile(path, data, mimeType, "");
+  }
+
+  void getFile(const std::string& path, Sink& sink) override {
+    stats.get++;
+
+    // FIXME: stream output to sink.
+    auto res = s3Helper.getObject(bucketName, path);
+
+    stats.getBytes += res.data ? res.data->size() : 0;
+    stats.getTimeMs += res.durationMs;
+
+    if (res.data) {
+      printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms", bucketName,
+                     path, res.data->size(), res.durationMs);
+
+      sink((unsigned char*)res.data->data(), res.data->size());
+    } else
+      throw NoSuchBinaryCacheFile(
+          "file '%s' does not exist in binary cache '%s'", path, getUri());
+  }
+
+  PathSet queryAllValidPaths() override {
+    PathSet paths;
+    std::string marker;
+
+    do {
+      debug(format("listing bucket 's3://%s' from key '%s'...") % bucketName %
+            marker);
+
+      auto res = checkAws(
+          format("AWS error listing bucket '%s'") % bucketName,
+          s3Helper.client->ListObjects(Aws::S3::Model::ListObjectsRequest()
+                                           .WithBucket(bucketName)
+                                           .WithDelimiter("/")
+                                           .WithMarker(marker)));
+
+      auto& contents = res.GetContents();
+
+      debug(format("got %d keys, next marker '%s'") % contents.size() %
+            res.GetNextMarker());
+
+      for (auto object : contents) {
+        auto& key = object.GetKey();
+        if (key.size() != 40 || !hasSuffix(key, ".narinfo")) continue;
+        paths.insert(storeDir + "/" + key.substr(0, key.size() - 8));
+      }
+
+      marker = res.GetNextMarker();
+    } while (!marker.empty());
+
+    return paths;
+  }
 };
 
-static RegisterStoreImplementation regStore([](
-    const std::string & uri, const Store::Params & params)
-    -> std::shared_ptr<Store>
-{
-    if (std::string(uri, 0, 5) != "s3://") return 0;
-    auto store = std::make_shared<S3BinaryCacheStoreImpl>(params, std::string(uri, 5));
-    store->init();
-    return store;
-});
+static RegisterStoreImplementation regStore(
+    [](const std::string& uri,
+       const Store::Params& params) -> std::shared_ptr<Store> {
+      if (std::string(uri, 0, 5) != "s3://") return 0;
+      auto store =
+          std::make_shared<S3BinaryCacheStoreImpl>(params, std::string(uri, 5));
+      store->init();
+      return store;
+    });
 
-}
+}  // namespace nix
 
 #endif