diff options
author | Eelco Dolstra <edolstra@gmail.com> | 2018-08-06T13·40+0200 |
---|---|---|
committer | Eelco Dolstra <edolstra@gmail.com> | 2018-08-06T13·40+0200 |
commit | d3761f5f8bce1e4c8dcfdff3fa77c173157c0346 (patch) | |
tree | 1dae65410f1977d58ad7aa7741121cbe74c33da3 /src/libstore | |
parent | fa4def3d4675c8b2d6aacb56959dbbf9e52df66a (diff) |
Fix Brotli decompression in 'nix log'
This didn't work anymore since decompression was only done in the non-coroutine case. Decompressors are now sinks, just like compressors. Also fixed a bug in bzip2 API handling (we have to handle BZ_RUN_OK rather than BZ_OK), which we didn't notice because there was a missing 'throw': if (ret != BZ_OK) CompressionError("error while compressing bzip2 file");
Diffstat (limited to 'src/libstore')
-rw-r--r-- | src/libstore/binary-cache-store.cc | 23 | ||||
-rw-r--r-- | src/libstore/builtins/fetchurl.cc | 21 | ||||
-rw-r--r-- | src/libstore/download.cc | 68 | ||||
-rw-r--r-- | src/libstore/download.hh | 3 | ||||
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 6 |
5 files changed, 65 insertions, 56 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 76c0a1a891b8..9c75c85993f9 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -217,17 +217,6 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) { auto info = queryPathInfo(storePath).cast<const NarInfo>(); - auto source = sinkToSource([this, url{info->url}](Sink & sink) { - try { - getFile(url, sink); - } catch (NoSuchBinaryCacheFile & e) { - throw SubstituteGone(e.what()); - } - }); - - stats.narRead++; - //stats.narReadCompressedBytes += nar->size(); // FIXME - uint64_t narSize = 0; LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { @@ -235,8 +224,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) narSize += len; }); - decompress(info->compression, *source, wrapperSink); + auto decompressor = makeDecompressionSink(info->compression, wrapperSink); + try { + getFile(info->url, *decompressor); + } catch (NoSuchBinaryCacheFile & e) { + throw SubstituteGone(e.what()); + } + + decompressor->flush(); + + stats.narRead++; + //stats.narReadCompressedBytes += nar->size(); // FIXME stats.narReadBytes += narSize; } diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc index 1f4abd374f54..b4dcb35f951a 100644 --- a/src/libstore/builtins/fetchurl.cc +++ b/src/libstore/builtins/fetchurl.cc @@ -39,21 +39,16 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) request.verifyTLS = false; request.decompress = false; - downloader->download(std::move(request), sink); + auto decompressor = makeDecompressionSink( + hasSuffix(mainUrl, ".xz") ? "xz" : "none", sink); + downloader->download(std::move(request), *decompressor); + decompressor->finish(); }); - if (get(drv.env, "unpack", "") == "1") { - - if (hasSuffix(mainUrl, ".xz")) { - auto source2 = sinkToSource([&](Sink & sink) { - decompress("xz", *source, sink); - }); - restorePath(storePath, *source2); - } else - restorePath(storePath, *source); - - } else - writeFile(storePath, *source); + if (get(drv.env, "unpack", "") == "1") + restorePath(storePath, *source); + else + writeFile(storePath, *source); auto executable = drv.env.find("executable"); if (executable != drv.env.end() && executable->second == "1") { diff --git a/src/libstore/download.cc b/src/libstore/download.cc index f0ea1995ae73..973fca0b130f 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -58,16 +58,6 @@ std::string resolveUri(const std::string & uri) return uri; } -ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data) -{ - if (encoding == "") - return data; - else if (encoding == "br") - return decompress(encoding, *data); - else - throw Error("unsupported Content-Encoding '%s'", encoding); -} - struct CurlDownloader : public Downloader { CURLM * curlm = 0; @@ -106,6 +96,12 @@ struct CurlDownloader : public Downloader fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), {request.uri}, request.parentAct) , callback(callback) + , finalSink([this](const unsigned char * data, size_t len) { + if (this->request.dataCallback) + this->request.dataCallback((char *) data, len); + else + this->result.data->append((char *) data, len); + }) { if (!request.expectedETag.empty()) requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); @@ -129,23 +125,40 @@ struct CurlDownloader : public Downloader } } - template<class T> - void fail(const T & e) + void failEx(std::exception_ptr ex) { assert(!done); done = true; - callback.rethrow(std::make_exception_ptr(e)); + callback.rethrow(ex); } + template<class T> + void fail(const T & e) + { + failEx(std::make_exception_ptr(e)); + } + + LambdaSink finalSink; + std::shared_ptr<CompressionSink> decompressionSink; + + std::exception_ptr writeException; + size_t writeCallback(void * contents, size_t size, size_t nmemb) { - size_t realSize = size * nmemb; - result.bodySize += realSize; - if (request.dataCallback) - request.dataCallback((char *) contents, realSize); - else - result.data->append((char *) contents, realSize); - return realSize; + try { + size_t realSize = size * nmemb; + result.bodySize += realSize; + + if (!decompressionSink) + decompressionSink = makeDecompressionSink(encoding, finalSink); + + (*decompressionSink)((unsigned char *) contents, realSize); + + return realSize; + } catch (...) { + writeException = std::current_exception(); + return 0; + } } static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp) @@ -314,27 +327,33 @@ struct CurlDownloader : public Downloader debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes", request.verb(), request.uri, code, httpStatus, result.bodySize); + if (decompressionSink) + decompressionSink->finish(); + if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) { code = CURLE_OK; httpStatus = 304; } - if (code == CURLE_OK && + if (writeException) + failEx(writeException); + + else if (code == CURLE_OK && (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; done = true; try { - if (request.decompress) - result.data = decodeContent(encoding, ref<std::string>(result.data)); act.progress(result.data->size(), result.data->size()); callback(std::move(result)); } catch (...) { done = true; callback.rethrow(); } - } else { + } + + else { // We treat most errors as transient, but won't retry when hopeless Error err = Transient; @@ -369,6 +388,7 @@ struct CurlDownloader : public Downloader case CURLE_UNKNOWN_OPTION: case CURLE_SSL_CACERT_BADFILE: case CURLE_TOO_MANY_REDIRECTS: + case CURLE_WRITE_ERROR: err = Misc; break; default: // Shut up warnings diff --git a/src/libstore/download.hh b/src/libstore/download.hh index ff38a2870cc0..f0228f7d053a 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -88,7 +88,4 @@ public: bool isUri(const string & s); -/* Decode data according to the Content-Encoding header. */ -ref<std::string> decodeContent(const std::string & encoding, ref<std::string> data); - } diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 2f18e3f38c6e..660583d31fe0 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -153,10 +153,8 @@ S3Helper::DownloadResult S3Helper::getObject( auto result = checkAws(fmt("AWS error fetching '%s'", key), client->GetObject(request)); - res.data = decodeContent( - result.GetContentEncoding(), - make_ref<std::string>( - dynamic_cast<std::stringstream &>(result.GetBody()).str())); + res.data = decompress(result.GetContentEncoding(), + dynamic_cast<std::stringstream &>(result.GetBody()).str()); } catch (S3Error & e) { if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw; |