about summary refs log tree commit diff
path: root/src/libstore/s3-binary-cache-store.cc
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2018-10-30T13·25+0100
committerEelco Dolstra <edolstra@gmail.com>2018-10-30T13·25+0100
commit9f99d62480cf7c58c0a110b180f2096b7d25adab (patch)
treee99f4a412eb90a7f23db294a74ff95f2bee57d63 /src/libstore/s3-binary-cache-store.cc
parent0163e8928c624251456adacb669a94a4adf230ff (diff)
S3BinaryCacheStore: Allow disabling multipart uploads
The use of TransferManager has several issues, including that it
doesn't allow setting a Content-Encoding without a patch, and it
doesn't handle exceptions in worker threads (causing termination on
memory allocation failure).

Fixes #2493.
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r--src/libstore/s3-binary-cache-store.cc88
1 files changed, 57 insertions, 31 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index 13ee257ba2..c5c6b89b19 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -173,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
     const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
     const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
     const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
+    const Setting<bool> multipartUpload{
+        this, false, "multipart-upload", "whether to use multi-part uploads"};
     const Setting<uint64_t> bufferSize{
         this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"};
 
@@ -261,46 +263,70 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
         static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
             executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);
 
-        std::call_once(transferManagerCreated, [&]() {
+        std::call_once(transferManagerCreated, [&]()
+        {
+            if (multipartUpload) {
+                TransferManagerConfiguration transferConfig(executor.get());
+
+                transferConfig.s3Client = s3Helper.client;
+                transferConfig.bufferSize = bufferSize;
+
+                transferConfig.uploadProgressCallback =
+                    [](const TransferManager *transferManager,
+                        const std::shared_ptr<const TransferHandle>
+                        &transferHandle)
+                    {
+                        //FIXME: find a way to properly abort the multipart upload.
+                        //checkInterrupt();
+                        debug("upload progress ('%s'): '%d' of '%d' bytes",
+                            transferHandle->GetKey(),
+                            transferHandle->GetBytesTransferred(),
+                            transferHandle->GetBytesTotalSize());
+                    };
+
+                transferManager = TransferManager::Create(transferConfig);
+            }
+        });
 
-            TransferManagerConfiguration transferConfig(executor.get());
+        auto now1 = std::chrono::steady_clock::now();
 
-            transferConfig.s3Client = s3Helper.client;
-            transferConfig.bufferSize = bufferSize;
+        if (transferManager) {
 
-            transferConfig.uploadProgressCallback =
-                [](const TransferManager *transferManager,
-                   const std::shared_ptr<const TransferHandle>
-                   &transferHandle)
-                {
-                    //FIXME: find a way to properly abort the multipart upload.
-                    //checkInterrupt();
-                    debug("upload progress ('%s'): '%d' of '%d' bytes",
-                        transferHandle->GetKey(),
-                        transferHandle->GetBytesTransferred(),
-                        transferHandle->GetBytesTotalSize());
-                };
+            std::shared_ptr<TransferHandle> transferHandle =
+                transferManager->UploadFile(
+                    stream, bucketName, path, mimeType,
+                    Aws::Map<Aws::String, Aws::String>(),
+                    nullptr, contentEncoding);
 
-            transferManager = TransferManager::Create(transferConfig);
-        });
+            transferHandle->WaitUntilFinished();
 
-        auto now1 = std::chrono::steady_clock::now();
+            if (transferHandle->GetStatus() == TransferStatus::FAILED)
+                throw Error("AWS error: failed to upload 's3://%s/%s': %s",
+                    bucketName, path, transferHandle->GetLastError().GetMessage());
 
-        std::shared_ptr<TransferHandle> transferHandle =
-            transferManager->UploadFile(
-                stream, bucketName, path, mimeType,
-                Aws::Map<Aws::String, Aws::String>(),
-                nullptr, contentEncoding);
+            if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
+                throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
+                    bucketName, path);
 
-        transferHandle->WaitUntilFinished();
+        } else {
 
-        if (transferHandle->GetStatus() == TransferStatus::FAILED)
-            throw Error("AWS error: failed to upload 's3://%s/%s': %s",
-                bucketName, path, transferHandle->GetLastError().GetMessage());
+            auto request =
+                Aws::S3::Model::PutObjectRequest()
+                .WithBucket(bucketName)
+                .WithKey(path);
 
-        if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
-            throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
-                bucketName, path);
+            request.SetContentType(mimeType);
+
+            if (contentEncoding != "")
+                request.SetContentEncoding(contentEncoding);
+
+            auto stream = std::make_shared<istringstream_nocopy>(data);
+
+            request.SetBody(stream);
+
+            auto result = checkAws(fmt("AWS error uploading '%s'", path),
+                s3Helper.client->PutObject(request));
+        }
 
         printTalkative("upload of '%s' completed", path);