diff options
author | Eelco Dolstra <edolstra@gmail.com> | 2018-10-30T13·25+0100 |
---|---|---|
committer | Eelco Dolstra <edolstra@gmail.com> | 2018-10-30T13·25+0100 |
commit | 9f99d62480cf7c58c0a110b180f2096b7d25adab (patch) | |
tree | e99f4a412eb90a7f23db294a74ff95f2bee57d63 | |
parent | 0163e8928c624251456adacb669a94a4adf230ff (diff) |
S3BinaryCacheStore: Allow disabling multipart uploads
The use of TransferManager has several issues, including that it doesn't allow setting a Content-Encoding without a patch, and it doesn't handle exceptions in worker threads (causing termination on memory allocation failure). Fixes #2493.
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 88 |
1 files changed, 57 insertions, 31 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 13ee257ba253..c5c6b89b1977 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -173,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"}; + const Setting<bool> multipartUpload{ + this, false, "multipart-upload", "whether to use multi-part uploads"}; const Setting<uint64_t> bufferSize{ this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"}; @@ -261,46 +263,70 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads); - std::call_once(transferManagerCreated, [&]() { + std::call_once(transferManagerCreated, [&]() + { + if (multipartUpload) { + TransferManagerConfiguration transferConfig(executor.get()); + + transferConfig.s3Client = s3Helper.client; + transferConfig.bufferSize = bufferSize; + + transferConfig.uploadProgressCallback = + [](const TransferManager *transferManager, + const std::shared_ptr<const TransferHandle> + &transferHandle) + { + //FIXME: find a way to properly abort the multipart upload. + //checkInterrupt(); + debug("upload progress ('%s'): '%d' of '%d' bytes", + transferHandle->GetKey(), + transferHandle->GetBytesTransferred(), + transferHandle->GetBytesTotalSize()); + }; + + transferManager = TransferManager::Create(transferConfig); + } + }); - TransferManagerConfiguration transferConfig(executor.get()); + auto now1 = std::chrono::steady_clock::now(); - transferConfig.s3Client = s3Helper.client; - transferConfig.bufferSize = bufferSize; + if (transferManager) { - transferConfig.uploadProgressCallback = - [](const TransferManager *transferManager, - const std::shared_ptr<const TransferHandle> - &transferHandle) - { - //FIXME: find a way to properly abort the multipart upload. - //checkInterrupt(); - debug("upload progress ('%s'): '%d' of '%d' bytes", - transferHandle->GetKey(), - transferHandle->GetBytesTransferred(), - transferHandle->GetBytesTotalSize()); - }; + std::shared_ptr<TransferHandle> transferHandle = + transferManager->UploadFile( + stream, bucketName, path, mimeType, + Aws::Map<Aws::String, Aws::String>(), + nullptr, contentEncoding); - transferManager = TransferManager::Create(transferConfig); - }); + transferHandle->WaitUntilFinished(); - auto now1 = std::chrono::steady_clock::now(); + if (transferHandle->GetStatus() == TransferStatus::FAILED) + throw Error("AWS error: failed to upload 's3://%s/%s': %s", + bucketName, path, transferHandle->GetLastError().GetMessage()); - std::shared_ptr<TransferHandle> transferHandle = - transferManager->UploadFile( - stream, bucketName, path, mimeType, - Aws::Map<Aws::String, Aws::String>(), - nullptr, contentEncoding); + if (transferHandle->GetStatus() != TransferStatus::COMPLETED) + throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", + bucketName, path); - transferHandle->WaitUntilFinished(); + } else { - if (transferHandle->GetStatus() == TransferStatus::FAILED) - throw Error("AWS error: failed to upload 's3://%s/%s': %s", - bucketName, path, transferHandle->GetLastError().GetMessage()); + auto request = + Aws::S3::Model::PutObjectRequest() + .WithBucket(bucketName) + .WithKey(path); - if (transferHandle->GetStatus() != TransferStatus::COMPLETED) - throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", - bucketName, path); + request.SetContentType(mimeType); + + if (contentEncoding != "") + request.SetContentEncoding(contentEncoding); + + auto stream = std::make_shared<istringstream_nocopy>(data); + + request.SetBody(stream); + + auto result = checkAws(fmt("AWS error uploading '%s'", path), + s3Helper.client->PutObject(request)); + } printTalkative("upload of '%s' completed", path); |