diff options
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 166 |
1 files changed, 80 insertions, 86 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 26144ccb40cc..4f1e23198ffe 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -19,8 +19,6 @@ #include <aws/core/utils/logging/LogMacros.h> #include <aws/core/utils/threading/Executor.h> #include <aws/s3/S3Client.h> -#include <aws/s3/model/CreateBucketRequest.h> -#include <aws/s3/model/GetBucketLocationRequest.h> #include <aws/s3/model/GetObjectRequest.h> #include <aws/s3/model/HeadObjectRequest.h> #include <aws/s3/model/ListObjectsRequest.h> @@ -84,8 +82,8 @@ static void initAWS() }); } -S3Helper::S3Helper(const std::string & profile, const std::string & region) - : config(makeConfig(region)) +S3Helper::S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint) + : config(makeConfig(region, endpoint)) , client(make_ref<Aws::S3::S3Client>( profile == "" ? std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>( @@ -99,7 +97,7 @@ S3Helper::S3Helper(const std::string & profile, const std::string & region) #else Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, #endif - false)) + endpoint.empty())) { } @@ -116,11 +114,14 @@ class RetryStrategy : public Aws::Client::DefaultRetryStrategy } }; -ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region) +ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region, const string & endpoint) { initAWS(); auto res = make_ref<Aws::Client::ClientConfiguration>(); res->region = region; + if (!endpoint.empty()) { + res->endpointOverride = endpoint; + } res->requestTimeoutMs = 600 * 1000; res->retryStrategy = std::make_shared<RetryStrategy>(); res->caFile = settings.caFile; @@ -150,10 +151,8 @@ S3Helper::DownloadResult S3Helper::getObject( auto result = checkAws(fmt("AWS error fetching '%s'", key), client->GetObject(request)); - res.data = decodeContent( - result.GetContentEncoding(), - make_ref<std::string>( - dynamic_cast<std::stringstream &>(result.GetBody()).str())); + res.data = decompress(result.GetContentEncoding(), + dynamic_cast<std::stringstream &>(result.GetBody()).str()); } catch (S3Error & e) { if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw; @@ -170,9 +169,12 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { const Setting<std::string> profile{this, "", "profile", "The name of the AWS configuration profile to use."}; const Setting<std::string> region{this, Aws::Region::US_EAST_1, "region", {"aws-region"}}; + const Setting<std::string> endpoint{this, "", "endpoint", "An optional override of the endpoint to use when talking to S3."}; const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"}; + const Setting<bool> multipartUpload{ + this, false, "multipart-upload", "whether to use multi-part uploads"}; const Setting<uint64_t> bufferSize{ this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"}; @@ -186,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Params & params, const std::string & bucketName) : S3BinaryCacheStore(params) , bucketName(bucketName) - , s3Helper(profile, region) + , s3Helper(profile, region, endpoint) { diskCache = getNarInfoDiskCache(); } @@ -200,32 +202,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) { - /* Create the bucket if it doesn't already exists. */ - // FIXME: HeadBucket would be more appropriate, but doesn't return - // an easily parsed 404 message. - auto res = s3Helper.client->GetBucketLocation( - Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName)); - - if (!res.IsSuccess()) { - if (res.GetError().GetErrorType() != Aws::S3::S3Errors::NO_SUCH_BUCKET) - throw Error(format("AWS error checking bucket '%s': %s") % bucketName % res.GetError().GetMessage()); - - printInfo("creating S3 bucket '%s'...", bucketName); - - // Stupid S3 bucket locations. - auto bucketConfig = Aws::S3::Model::CreateBucketConfiguration(); - if (s3Helper.config->region != "us-east-1") - bucketConfig.SetLocationConstraint( - Aws::S3::Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName( - s3Helper.config->region)); - - checkAws(format("AWS error creating bucket '%s'") % bucketName, - s3Helper.client->CreateBucket( - Aws::S3::Model::CreateBucketRequest() - .WithBucket(bucketName) - .WithCreateBucketConfiguration(bucketConfig))); - } - BinaryCacheStore::init(); diskCache->createCache(getUri(), storeDir, wantMassQuery_, priority); @@ -273,6 +249,9 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore return true; } + std::shared_ptr<TransferManager> transferManager; + std::once_flag transferManagerCreated; + void uploadFile(const std::string & path, const std::string & data, const std::string & mimeType, const std::string & contentEncoding) @@ -284,60 +263,73 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads); - TransferManagerConfiguration transferConfig(executor.get()); - - transferConfig.s3Client = s3Helper.client; - transferConfig.bufferSize = bufferSize; - - if (contentEncoding != "") - transferConfig.createMultipartUploadTemplate.SetContentEncoding( - contentEncoding); - - transferConfig.uploadProgressCallback = - [&](const TransferManager *transferManager, - const std::shared_ptr<const TransferHandle> - &transferHandle) { - //FIXME: find a way to properly abort the multipart upload. - checkInterrupt(); - debug("upload progress ('%s'): '%d' of '%d' bytes", - path, - transferHandle->GetBytesTransferred(), - transferHandle->GetBytesTotalSize()); - }; + std::call_once(transferManagerCreated, [&]() + { + if (multipartUpload) { + TransferManagerConfiguration transferConfig(executor.get()); + + transferConfig.s3Client = s3Helper.client; + transferConfig.bufferSize = bufferSize; + + transferConfig.uploadProgressCallback = + [](const TransferManager *transferManager, + const std::shared_ptr<const TransferHandle> + &transferHandle) + { + //FIXME: find a way to properly abort the multipart upload. + //checkInterrupt(); + debug("upload progress ('%s'): '%d' of '%d' bytes", + transferHandle->GetKey(), + transferHandle->GetBytesTransferred(), + transferHandle->GetBytesTotalSize()); + }; + + transferManager = TransferManager::Create(transferConfig); + } + }); - transferConfig.transferStatusUpdatedCallback = - [&](const TransferManager *, - const std::shared_ptr<const TransferHandle> - &transferHandle) { - switch (transferHandle->GetStatus()) { - case TransferStatus::COMPLETED: - printTalkative("upload of '%s' completed", path); - stats.put++; - stats.putBytes += data.size(); - break; - case TransferStatus::IN_PROGRESS: - break; - case TransferStatus::FAILED: - throw Error("AWS error: failed to upload 's3://%s/%s'", - bucketName, path); - break; - default: - throw Error("AWS error: transfer status of 's3://%s/%s' " - "in unexpected state", - bucketName, path); - }; - }; + auto now1 = std::chrono::steady_clock::now(); - std::shared_ptr<TransferManager> transferManager = - TransferManager::Create(transferConfig); + if (transferManager) { - auto now1 = std::chrono::steady_clock::now(); + if (contentEncoding != "") + throw Error("setting a content encoding is not supported with S3 multi-part uploads"); + + std::shared_ptr<TransferHandle> transferHandle = + transferManager->UploadFile( + stream, bucketName, path, mimeType, + Aws::Map<Aws::String, Aws::String>(), + nullptr /*, contentEncoding */); - std::shared_ptr<TransferHandle> transferHandle = - transferManager->UploadFile(stream, bucketName, path, mimeType, - Aws::Map<Aws::String, Aws::String>()); + transferHandle->WaitUntilFinished(); - transferHandle->WaitUntilFinished(); + if (transferHandle->GetStatus() == TransferStatus::FAILED) + throw Error("AWS error: failed to upload 's3://%s/%s': %s", + bucketName, path, transferHandle->GetLastError().GetMessage()); + + if (transferHandle->GetStatus() != TransferStatus::COMPLETED) + throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state", + bucketName, path); + + } else { + + auto request = + Aws::S3::Model::PutObjectRequest() + .WithBucket(bucketName) + .WithKey(path); + + request.SetContentType(mimeType); + + if (contentEncoding != "") + request.SetContentEncoding(contentEncoding); + + auto stream = std::make_shared<istringstream_nocopy>(data); + + request.SetBody(stream); + + auto result = checkAws(fmt("AWS error uploading '%s'", path), + s3Helper.client->PutObject(request)); + } auto now2 = std::chrono::steady_clock::now(); @@ -349,6 +341,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore bucketName % path % data.size() % duration); stats.putTimeMs += duration; + stats.putBytes += data.size(); + stats.put++; } void upsertFile(const std::string & path, const std::string & data, |