diff options
author | Eelco Dolstra <edolstra@gmail.com> | 2018-05-10T10·06+0200 |
---|---|---|
committer | Eelco Dolstra <edolstra@gmail.com> | 2018-05-10T10·06+0200 |
commit | 38def176271c97a427febaf381a32bebe6b3790b (patch) | |
tree | f7b71b5ebb3521f5e6d5ae4953f8b34dd1159ce5 /src/libstore/s3-binary-cache-store.cc | |
parent | 39c4d7f5b0aee228d33e5a0d83b18cff8a4b49cc (diff) | |
parent | 854c0860f4ec68e81869473cf0708529afd0486e (diff) |
Merge branch 's3-multipart-uploads' of https://github.com/AmineChikhaoui/nix
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 80 |
1 files changed, 64 insertions, 16 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 23af452094cf..96673a5b0cc4 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -17,6 +17,7 @@ #include <aws/core/client/DefaultRetryStrategy.h> #include <aws/core/utils/logging/FormattedLogSystem.h> #include <aws/core/utils/logging/LogMacros.h> +#include <aws/core/utils/threading/Executor.h> #include <aws/s3/S3Client.h> #include <aws/s3/model/CreateBucketRequest.h> #include <aws/s3/model/GetBucketLocationRequest.h> @@ -24,6 +25,9 @@ #include <aws/s3/model/HeadObjectRequest.h> #include <aws/s3/model/ListObjectsRequest.h> #include <aws/s3/model/PutObjectRequest.h> +#include <aws/transfer/TransferManager.h> + +using namespace Aws::Transfer; namespace nix { @@ -169,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"}; + const Setting<uint64_t> bufferSize{ + this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads. defaults to 5Mb"}; std::string bucketName; @@ -271,34 +277,76 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const std::string & mimeType, const std::string & contentEncoding) { - auto request = - Aws::S3::Model::PutObjectRequest() - .WithBucket(bucketName) - .WithKey(path); + auto stream = std::make_shared<istringstream_nocopy>(data); - request.SetContentType(mimeType); + auto maxThreads = std::thread::hardware_concurrency(); - if (contentEncoding != "") - request.SetContentEncoding(contentEncoding); + static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> + executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads); - auto stream = std::make_shared<istringstream_nocopy>(data); + TransferManagerConfiguration transferConfig(executor.get()); - request.SetBody(stream); + transferConfig.s3Client = s3Helper.client; + transferConfig.bufferSize = bufferSize; - stats.put++; - stats.putBytes += data.size(); + if (contentEncoding != "") + transferConfig.createMultipartUploadTemplate.SetContentEncoding( + contentEncoding); + + transferConfig.uploadProgressCallback = + [&](const TransferManager *transferManager, + const std::shared_ptr<const TransferHandle> + &transferHandle) { + //FIXME: find a way to properly abort the multipart upload. + checkInterrupt(); + printTalkative("upload progress ('%s'): '%d' of '%d' bytes", + path, + transferHandle->GetBytesTransferred(), + transferHandle->GetBytesTotalSize()); + }; + + transferConfig.transferStatusUpdatedCallback = + [&](const TransferManager *, + const std::shared_ptr<const TransferHandle> + &transferHandle) { + switch (transferHandle->GetStatus()) { + case TransferStatus::COMPLETED: + printTalkative("upload of '%s' completed", path); + stats.put++; + stats.putBytes += data.size(); + break; + case TransferStatus::IN_PROGRESS: + break; + case TransferStatus::FAILED: + throw Error("AWS error: failed to upload 's3://%s/%s'", + bucketName, path); + break; + default: + throw Error("AWS error: transfer status of 's3://%s/%s' " + "in unexpected state", + bucketName, path); + }; + }; + + std::shared_ptr<TransferManager> transferManager = + TransferManager::Create(transferConfig); auto now1 = std::chrono::steady_clock::now(); - auto result = checkAws(format("AWS error uploading '%s'") % path, - s3Helper.client->PutObject(request)); + std::shared_ptr<TransferHandle> transferHandle = + transferManager->UploadFile(stream, bucketName, path, mimeType, + Aws::Map<Aws::String, Aws::String>()); + + transferHandle->WaitUntilFinished(); auto now2 = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + auto duration = + std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1) + .count(); - printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") - % bucketName % path % data.size() % duration); + printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") % + bucketName % path % data.size() % duration); stats.putTimeMs += duration; } |