diff options
Diffstat (limited to 'src/libstore')
-rw-r--r-- | src/libstore/binary-cache-store.cc | 23 | ||||
-rw-r--r-- | src/libstore/builtins/fetchurl.cc | 21 | ||||
-rw-r--r-- | src/libstore/download.cc | 87 | ||||
-rw-r--r-- | src/libstore/download.hh | 4 | ||||
-rw-r--r-- | src/libstore/legacy-ssh-store.cc | 62 | ||||
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 30 | ||||
-rw-r--r-- | src/libstore/s3.hh | 4 | ||||
-rw-r--r-- | src/libstore/serve-protocol.hh | 3 | ||||
-rw-r--r-- | src/libstore/ssh.cc | 22 | ||||
-rw-r--r-- | src/libstore/ssh.hh | 1 | ||||
-rw-r--r-- | src/libstore/store-api.cc | 34 |
11 files changed, 189 insertions, 102 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 76c0a1a891b8..9c75c85993f9 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -217,17 +217,6 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) { auto info = queryPathInfo(storePath).cast<const NarInfo>(); - auto source = sinkToSource([this, url{info->url}](Sink & sink) { - try { - getFile(url, sink); - } catch (NoSuchBinaryCacheFile & e) { - throw SubstituteGone(e.what()); - } - }); - - stats.narRead++; - //stats.narReadCompressedBytes += nar->size(); // FIXME - uint64_t narSize = 0; LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { @@ -235,8 +224,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) narSize += len; }); - decompress(info->compression, *source, wrapperSink); + auto decompressor = makeDecompressionSink(info->compression, wrapperSink); + try { + getFile(info->url, *decompressor); + } catch (NoSuchBinaryCacheFile & e) { + throw SubstituteGone(e.what()); + } + + decompressor->flush(); + + stats.narRead++; + //stats.narReadCompressedBytes += nar->size(); // FIXME stats.narReadBytes += narSize; } diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc index 1f4abd374f54..b4dcb35f951a 100644 --- a/src/libstore/builtins/fetchurl.cc +++ b/src/libstore/builtins/fetchurl.cc @@ -39,21 +39,16 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) request.verifyTLS = false; request.decompress = false; - downloader->download(std::move(request), sink); + auto decompressor = makeDecompressionSink( + hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink); + downloader->download(std::move(request), *decompressor); + decompressor->finish(); }); - if (get(drv.env, "unpack", "") == "1") { - - if (hasSuffix(mainUrl, ".xz")) { - auto source2 = sinkToSource([&](Sink & sink) { - decompress("xz", *source, sink); - }); - restorePath(storePath, *source2); - } else - restorePath(storePath, *source); - - } else - writeFile(storePath, *source); + if (get(drv.env, "unpack", "") == "1") + restorePath(storePath, *source); + else + writeFile(storePath, *source); auto executable = drv.env.find("executable"); if (executable != drv.env.end() && executable->second == "1") { diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 07acd5d0e004..973fca0b130f 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -58,16 +58,6 @@ std::string resolveUri(const std::string & uri) return uri; } -ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data) -{ - if (encoding == "") - return data; - else if (encoding == "br") - return decompress(encoding, *data); - else - throw Error("unsupported Content-Encoding '%s'", encoding); -} - struct CurlDownloader : public Downloader { CURLM * curlm = 0; @@ -106,6 +96,12 @@ struct CurlDownloader : public Downloader fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), {request.uri}, request.parentAct) , callback(callback) + , finalSink([this](const unsigned char * data, size_t len) { + if (this->request.dataCallback) + this->request.dataCallback((char *) data, len); + else + this->result.data->append((char *) data, len); + }) { if (!request.expectedETag.empty()) requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); @@ -129,22 +125,40 @@ struct CurlDownloader : public Downloader } } - template<class T> - void fail(const T & e) + void failEx(std::exception_ptr ex) { assert(!done); done = true; - callback.rethrow(std::make_exception_ptr(e)); + callback.rethrow(ex); } + template<class T> + void fail(const T & e) + { + failEx(std::make_exception_ptr(e)); + } + + LambdaSink finalSink; + std::shared_ptr<CompressionSink> decompressionSink; + + std::exception_ptr writeException; + size_t writeCallback(void * contents, size_t size, size_t nmemb) { - size_t realSize = size * nmemb; - if (request.dataCallback) - request.dataCallback((char *) contents, realSize); - else - result.data->append((char *) contents, realSize); - return realSize; + try { + size_t realSize = size * nmemb; + result.bodySize += realSize; + + if (!decompressionSink) + decompressionSink = makeDecompressionSink(encoding, finalSink); + + (*decompressionSink)((unsigned char *) contents, realSize); + + return realSize; + } catch (...) { + writeException = std::current_exception(); + return 0; + } } static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) @@ -162,6 +176,7 @@ struct CurlDownloader : public Downloader auto ss = tokenizeString<vector<string>>(line, " "); status = ss.size() >= 2 ? ss[1] : ""; result.data = std::make_shared<std::string>(); + result.bodySize = 0; encoding = ""; } else { auto i = line.find(':'); @@ -296,6 +311,7 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL); result.data = std::make_shared<std::string>(); + result.bodySize = 0; } void finish(CURLcode code) @@ -309,29 +325,35 @@ struct CurlDownloader : public Downloader result.effectiveUrl = effectiveUrlCStr; debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes", - request.verb(), request.uri, code, httpStatus, result.data ? result.data->size() : 0); + request.verb(), request.uri, code, httpStatus, result.bodySize); + + if (decompressionSink) + decompressionSink->finish(); if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) { code = CURLE_OK; httpStatus = 304; } - if (code == CURLE_OK && + if (writeException) + failEx(writeException); + + else if (code == CURLE_OK && (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; done = true; try { - if (request.decompress) - result.data = decodeContent(encoding, ref<std::string>(result.data)); act.progress(result.data->size(), result.data->size()); callback(std::move(result)); } catch (...) { done = true; callback.rethrow(); } - } else { + } + + else { // We treat most errors as transient, but won't retry when hopeless Error err = Transient; @@ -366,6 +388,7 @@ struct CurlDownloader : public Downloader case CURLE_UNKNOWN_OPTION: case CURLE_SSL_CACERT_BADFILE: case CURLE_TOO_MANY_REDIRECTS: + case CURLE_WRITE_ERROR: err = Misc; break; default: // Shut up warnings @@ -598,7 +621,7 @@ struct CurlDownloader : public Downloader // FIXME: do this on a worker thread try { #ifdef ENABLE_S3 - S3Helper s3Helper("", Aws::Region::US_EAST_1); // FIXME: make configurable + S3Helper s3Helper("", Aws::Region::US_EAST_1, ""); // FIXME: make configurable auto slash = request.uri.find('/', 5); if (slash == std::string::npos) throw nix::Error("bad S3 URI '%s'", request.uri); @@ -718,15 +741,17 @@ void Downloader::download(DownloadRequest && request, Sink & sink) while (true) { checkInterrupt(); - if (state->quit) { - if (state->exc) std::rethrow_exception(state->exc); - break; - } - /* If no data is available, then wait for the download thread to wake us up. */ - if (state->data.empty()) + if (state->data.empty()) { + + if (state->quit) { + if (state->exc) std::rethrow_exception(state->exc); + break; + } + state.wait(state->avail); + } /* If data is available, then flush it to the sink and wake up the download thread if it's blocked on a full buffer. */ diff --git a/src/libstore/download.hh b/src/libstore/download.hh index da55df7a6e71..f0228f7d053a 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -38,6 +38,7 @@ struct DownloadResult std::string etag; std::string effectiveUrl; std::shared_ptr<std::string> data; + uint64_t bodySize = 0; }; class Store; @@ -87,7 +88,4 @@ public: bool isUri(const string & s); -/* Decode data according to the Content-Encoding header. */ -ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data); - } diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc index 02d91ded04cd..7c214f09d6fb 100644 --- a/src/libstore/legacy-ssh-store.cc +++ b/src/libstore/legacy-ssh-store.cc @@ -17,6 +17,7 @@ struct LegacySSHStore : public Store const Setting<Path> sshKey{this, "", "ssh-key", "path to an SSH private key"}; const Setting<bool> compress{this, false, "compress", "whether to compress the connection"}; const Setting<Path> remoteProgram{this, "nix-store", "remote-program", "path to the nix-store executable on the remote system"}; + const Setting<std::string> remoteStore{this, "", "remote-store", "URI of the store on the remote system"}; // Hack for getting remote build log output. const Setting<int> logFD{this, -1, "log-fd", "file descriptor to which SSH's stderr is connected"}; @@ -27,6 +28,7 @@ struct LegacySSHStore : public Store FdSink to; FdSource from; int remoteVersion; + bool good = true; }; std::string host; @@ -41,7 +43,7 @@ struct LegacySSHStore : public Store , connections(make_ref<Pool<Connection>>( std::max(1, (int) maxConnections), [this]() { return openConnection(); }, - [](const ref<Connection> & r) { return true; } + [](const ref<Connection> & r) { return r->good; } )) , master( host, @@ -56,7 +58,9 @@ struct LegacySSHStore : public Store ref<Connection> openConnection() { auto conn = make_ref<Connection>(); - conn->sshConn = master.startCommand(fmt("%s --serve --write", remoteProgram)); + conn->sshConn = master.startCommand( + fmt("%s --serve --write", remoteProgram) + + (remoteStore.get() == "" ? "" : " --store " + shellEscape(remoteStore.get()))); conn->to = FdSink(conn->sshConn->in.get()); conn->from = FdSource(conn->sshConn->out.get()); @@ -127,18 +131,48 @@ struct LegacySSHStore : public Store auto conn(connections->get()); - conn->to - << cmdImportPaths - << 1; - copyNAR(source, conn->to); - conn->to - << exportMagic - << info.path - << info.references - << info.deriver - << 0 - << 0; - conn->to.flush(); + if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 4) { + + conn->to + << cmdAddToStoreNar + << info.path + << info.deriver + << info.narHash.to_string(Base16, false) + << info.references + << info.registrationTime + << info.narSize + << info.ultimate + << info.sigs + << info.ca; + try { + copyNAR(source, conn->to); + } catch (...) { + conn->good = false; + throw; + } + conn->to.flush(); + + } else { + + conn->to + << cmdImportPaths + << 1; + try { + copyNAR(source, conn->to); + } catch (...) { + conn->good = false; + throw; + } + conn->to + << exportMagic + << info.path + << info.references + << info.deriver + << 0 + << 0; + conn->to.flush(); + + } if (readInt(conn->from) != 1) throw Error("failed to add path '%s' to remote host '%s', info.path, host"); diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 26144ccb40cc..6d95c1fa8c65 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -84,8 +84,8 @@ static void initAWS() }); } -S3Helper::S3Helper(const std::string & profile, const std::string & region) - : config(makeConfig(region)) +S3Helper::S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint) + : config(makeConfig(region, endpoint)) , client(make_ref<Aws::S3::S3Client>( profile == "" ? std::dynamic_pointer_cast<Aws::Auth::AWSCredentialsProvider>( @@ -99,7 +99,7 @@ S3Helper::S3Helper(const std::string & profile, const std::string & region) #else Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, #endif - false)) + endpoint.empty())) { } @@ -116,11 +116,14 @@ class RetryStrategy : public Aws::Client::DefaultRetryStrategy } }; -ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region) +ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region, const string & endpoint) { initAWS(); auto res = make_ref<Aws::Client::ClientConfiguration>(); res->region = region; + if (!endpoint.empty()) { + res->endpointOverride = endpoint; + } res->requestTimeoutMs = 600 * 1000; res->retryStrategy = std::make_shared<RetryStrategy>(); res->caFile = settings.caFile; @@ -150,10 +153,8 @@ S3Helper::DownloadResult S3Helper::getObject( auto result = checkAws(fmt("AWS error fetching '%s'", key), client->GetObject(request)); - res.data = decodeContent( - result.GetContentEncoding(), - make_ref<std::string>( - dynamic_cast<std::stringstream &>(result.GetBody()).str())); + res.data = decompress(result.GetContentEncoding(), + dynamic_cast<std::stringstream &>(result.GetBody()).str()); } catch (S3Error & e) { if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw; @@ -170,6 +171,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { const Setting<std::string> profile{this, "", "profile", "The name of the AWS configuration profile to use."}; const Setting<std::string> region{this, Aws::Region::US_EAST_1, "region", {"aws-region"}}; + const Setting<std::string> endpoint{this, "", "endpoint", "An optional override of the endpoint to use when talking to S3."}; const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"}; @@ -186,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Params & params, const std::string & bucketName) : S3BinaryCacheStore(params) , bucketName(bucketName) - , s3Helper(profile, region) + , s3Helper(profile, region, endpoint) { diskCache = getNarInfoDiskCache(); } @@ -289,10 +291,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore transferConfig.s3Client = s3Helper.client; transferConfig.bufferSize = bufferSize; - if (contentEncoding != "") - transferConfig.createMultipartUploadTemplate.SetContentEncoding( - contentEncoding); - transferConfig.uploadProgressCallback = [&](const TransferManager *transferManager, const std::shared_ptr<const TransferHandle> @@ -334,8 +332,10 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore auto now1 = std::chrono::steady_clock::now(); std::shared_ptr<TransferHandle> transferHandle = - transferManager->UploadFile(stream, bucketName, path, mimeType, - Aws::Map<Aws::String, Aws::String>()); + transferManager->UploadFile( + stream, bucketName, path, mimeType, + Aws::Map<Aws::String, Aws::String>(), + nullptr, contentEncoding); transferHandle->WaitUntilFinished(); diff --git a/src/libstore/s3.hh b/src/libstore/s3.hh index 4f996400343c..95d612b66335 100644 --- a/src/libstore/s3.hh +++ b/src/libstore/s3.hh @@ -14,9 +14,9 @@ struct S3Helper ref<Aws::Client::ClientConfiguration> config; ref<Aws::S3::S3Client> client; - S3Helper(const std::string & profile, const std::string & region); + S3Helper(const std::string & profile, const std::string & region, const std::string & endpoint); - ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region); + ref<Aws::Client::ClientConfiguration> makeConfig(const std::string & region, const std::string & endpoint); struct DownloadResult { diff --git a/src/libstore/serve-protocol.hh b/src/libstore/serve-protocol.hh index f67d1e2580a5..9fae6d5349f1 100644 --- a/src/libstore/serve-protocol.hh +++ b/src/libstore/serve-protocol.hh @@ -5,7 +5,7 @@ namespace nix { #define SERVE_MAGIC_1 0x390c9deb #define SERVE_MAGIC_2 0x5452eecb -#define SERVE_PROTOCOL_VERSION 0x204 +#define SERVE_PROTOCOL_VERSION 0x205 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) @@ -18,6 +18,7 @@ typedef enum { cmdBuildPaths = 6, cmdQueryClosure = 7, cmdBuildDerivation = 8, + cmdAddToStoreNar = 9, } ServeCommand; } diff --git a/src/libstore/ssh.cc b/src/libstore/ssh.cc index 033c580936ad..5e0e44935cca 100644 --- a/src/libstore/ssh.cc +++ b/src/libstore/ssh.cc @@ -4,8 +4,9 @@ namespace nix { SSHMaster::SSHMaster(const std::string & host, const std::string & keyFile, bool useMaster, bool compress, int logFD) : host(host) + , fakeSSH(host == "localhost") , keyFile(keyFile) - , useMaster(useMaster) + , useMaster(useMaster && !fakeSSH) , compress(compress) , logFD(logFD) { @@ -45,12 +46,19 @@ std::unique_ptr<SSHMaster::Connection> SSHMaster::startCommand(const std::string if (logFD != -1 && dup2(logFD, STDERR_FILENO) == -1) throw SysError("duping over stderr"); - Strings args = { "ssh", host.c_str(), "-x", "-a" }; - addCommonSSHOpts(args); - if (socketPath != "") - args.insert(args.end(), {"-S", socketPath}); - if (verbosity >= lvlChatty) - args.push_back("-v"); + Strings args; + + if (fakeSSH) { + args = { "bash", "-c" }; + } else { + args = { "ssh", host.c_str(), "-x", "-a" }; + addCommonSSHOpts(args); + if (socketPath != "") + args.insert(args.end(), {"-S", socketPath}); + if (verbosity >= lvlChatty) + args.push_back("-v"); + } + args.push_back(command); execvp(args.begin()->c_str(), stringsToCharPtrs(args).data()); diff --git a/src/libstore/ssh.hh b/src/libstore/ssh.hh index 1268e6d00054..4f0f0bd29f9f 100644 --- a/src/libstore/ssh.hh +++ b/src/libstore/ssh.hh @@ -10,6 +10,7 @@ class SSHMaster private: const std::string host; + bool fakeSSH; const std::string keyFile; const bool useMaster; const bool compress; diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 9b0b7d6327e0..7a4a5f5eb85d 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -629,11 +629,12 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const PathSet & storePa Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size())); std::atomic<size_t> nrDone{0}; + std::atomic<size_t> nrFailed{0}; std::atomic<uint64_t> bytesExpected{0}; std::atomic<uint64_t> nrRunning{0}; auto showProgress = [&]() { - act.progress(nrDone, missing.size(), nrRunning); + act.progress(nrDone, missing.size(), nrRunning, nrFailed); }; ThreadPool pool; @@ -662,7 +663,16 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const PathSet & storePa if (!dstStore->isValidPath(storePath)) { MaintainCount<decltype(nrRunning)> mc(nrRunning); showProgress(); - copyStorePath(srcStore, dstStore, storePath, repair, checkSigs); + try { + copyStorePath(srcStore, dstStore, storePath, repair, checkSigs); + } catch (Error &e) { + nrFailed++; + if (!settings.keepGoing) + throw e; + logger->log(lvlError, format("could not copy %s: %s") % storePath % e.what()); + showProgress(); + return; + } } nrDone++; @@ -834,8 +844,24 @@ ref<Store> openStore(const std::string & uri_, if (q != std::string::npos) { for (auto s : tokenizeString<Strings>(uri.substr(q + 1), "&")) { auto e = s.find('='); - if (e != std::string::npos) - params[s.substr(0, e)] = s.substr(e + 1); + if (e != std::string::npos) { + auto value = s.substr(e + 1); + std::string decoded; + for (size_t i = 0; i < value.size(); ) { + if (value[i] == '%') { + if (i + 2 >= value.size()) + throw Error("invalid URI parameter '%s'", value); + try { + decoded += std::stoul(std::string(value, i + 1, 2), 0, 16); + i += 3; + } catch (...) { + throw Error("invalid URI parameter '%s'", value); + } + } else + decoded += value[i++]; + } + params[s.substr(0, e)] = decoded; + } } uri = uri_.substr(0, q); } |