diff options
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 143 |
1 files changed, 80 insertions, 63 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index ccb71f1eefe5..ac083410b353 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -1,8 +1,6 @@ -#include "config.h" - #if ENABLE_S3 -#if __linux__ +#include "s3.hh" #include "s3-binary-cache-store.hh" #include "nar-info.hh" #include "nar-info-disk-cache.hh" @@ -20,15 +18,6 @@ namespace nix { -struct istringstream_nocopy : public std::stringstream -{ - istringstream_nocopy(const std::string & s) - { - rdbuf()->pubsetbuf( - (char *) s.data(), s.size()); - } -}; - struct S3Error : public Error { Aws::S3::S3Errors err; @@ -62,21 +51,81 @@ static void initAWS() }); } +S3Helper::S3Helper() + : config(makeConfig()) + , client(make_ref<Aws::S3::S3Client>(*config)) +{ +} + +ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig() +{ + initAWS(); + auto res = make_ref<Aws::Client::ClientConfiguration>(); + res->region = Aws::Region::US_EAST_1; // FIXME: make configurable + res->requestTimeoutMs = 600 * 1000; + return res; +} + +S3Helper::DownloadResult S3Helper::getObject( + const std::string & bucketName, const std::string & key) +{ + debug("fetching ‘s3://%s/%s’...", bucketName, key); + + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(key); + + request.SetResponseStreamFactory([&]() { + return Aws::New<std::stringstream>("STRINGSTREAM"); + }); + + DownloadResult res; + + auto now1 = std::chrono::steady_clock::now(); + + try { + + auto result = checkAws(fmt("AWS error fetching ‘%s’", key), + client->GetObject(request)); + + res.data = std::make_shared<std::string>( + dynamic_cast<std::stringstream &>(result.GetBody()).str()); + + } catch (S3Error & e) { + if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw; + } + + auto now2 = std::chrono::steady_clock::now(); + + res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + + return res; +} + +#if __linux__ + +struct istringstream_nocopy : public std::stringstream +{ + istringstream_nocopy(const std::string & s) + { + rdbuf()->pubsetbuf( + (char *) s.data(), s.size()); + } +}; + struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { std::string bucketName; - ref<Aws::Client::ClientConfiguration> config; - ref<Aws::S3::S3Client> client; - Stats stats; + S3Helper s3Helper; + S3BinaryCacheStoreImpl( const Params & params, const std::string & bucketName) : S3BinaryCacheStore(params) , bucketName(bucketName) - , config(makeConfig()) - , client(make_ref<Aws::S3::S3Client>(*config)) { diskCache = getNarInfoDiskCache(); } @@ -86,15 +135,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore return "s3://" + bucketName; } - ref<Aws::Client::ClientConfiguration> makeConfig() - { - initAWS(); - auto res = make_ref<Aws::Client::ClientConfiguration>(); - res->region = Aws::Region::US_EAST_1; // FIXME: make configurable - res->requestTimeoutMs = 600 * 1000; - return res; - } - void init() override { if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) { @@ -102,7 +142,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore /* Create the bucket if it doesn't already exists. */ // FIXME: HeadBucket would be more appropriate, but doesn't return // an easily parsed 404 message. - auto res = client->GetBucketLocation( + auto res = s3Helper.client->GetBucketLocation( Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName)); if (!res.IsSuccess()) { @@ -110,7 +150,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage()); checkAws(format("AWS error creating bucket ‘%s’") % bucketName, - client->CreateBucket( + s3Helper.client->CreateBucket( Aws::S3::Model::CreateBucketRequest() .WithBucket(bucketName) .WithCreateBucketConfiguration( @@ -148,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { stats.head++; - auto res = client->HeadObject( + auto res = s3Helper.client->HeadObject( Aws::S3::Model::HeadObjectRequest() .WithBucket(bucketName) .WithKey(path)); @@ -181,7 +221,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore auto now1 = std::chrono::steady_clock::now(); auto result = checkAws(format("AWS error uploading ‘%s’") % path, - client->PutObject(request)); + s3Helper.client->PutObject(request)); auto now2 = std::chrono::steady_clock::now(); @@ -200,42 +240,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); - auto request = - Aws::S3::Model::GetObjectRequest() - .WithBucket(bucketName) - .WithKey(path); - - request.SetResponseStreamFactory([&]() { - return Aws::New<std::stringstream>("STRINGSTREAM"); - }); - stats.get++; - try { - - auto now1 = std::chrono::steady_clock::now(); - - auto result = checkAws(format("AWS error fetching ‘%s’") % path, - client->GetObject(request)); - - auto now2 = std::chrono::steady_clock::now(); + auto res = s3Helper.getObject(bucketName, path); - auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); + stats.getBytes += res.data ? res.data->size() : 0; + stats.getTimeMs += res.durationMs; - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + if (res.data) + printTalkative("downloaded ‘s3://%s/%s’ (%d bytes) in %d ms", + bucketName, path, res.data->size(), res.durationMs); - printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % res.size() % duration); - - stats.getBytes += res.size(); - stats.getTimeMs += duration; - - return std::make_shared<std::string>(res); - - } catch (S3Error & e) { - if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>(); - throw; - } + return res.data; }); } @@ -248,7 +264,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore debug(format("listing bucket ‘s3://%s’ from key ‘%s’...") % bucketName % marker); auto res = checkAws(format("AWS error listing bucket ‘%s’") % bucketName, - client->ListObjects( + s3Helper.client->ListObjects( Aws::S3::Model::ListObjectsRequest() .WithBucket(bucketName) .WithDelimiter("/") @@ -283,7 +299,8 @@ static RegisterStoreImplementation regStore([]( return store; }); +#endif + } #endif -#endif |