diff options
Diffstat (limited to 'src/libutil')
-rw-r--r-- | src/libutil/args.cc | 7 | ||||
-rw-r--r-- | src/libutil/args.hh | 3 | ||||
-rw-r--r-- | src/libutil/compression.cc | 178 | ||||
-rw-r--r-- | src/libutil/compression.hh | 7 | ||||
-rw-r--r-- | src/libutil/finally.hh | 12 | ||||
-rw-r--r-- | src/libutil/hash.cc | 32 | ||||
-rw-r--r-- | src/libutil/hash.hh | 12 | ||||
-rw-r--r-- | src/libutil/local.mk | 2 | ||||
-rw-r--r-- | src/libutil/logging.cc | 79 | ||||
-rw-r--r-- | src/libutil/logging.hh | 82 | ||||
-rw-r--r-- | src/libutil/lru-cache.hh | 6 | ||||
-rw-r--r-- | src/libutil/ref.hh | 6 | ||||
-rw-r--r-- | src/libutil/thread-pool.cc | 130 | ||||
-rw-r--r-- | src/libutil/thread-pool.hh | 73 | ||||
-rw-r--r-- | src/libutil/types.hh | 10 | ||||
-rw-r--r-- | src/libutil/util.cc | 131 | ||||
-rw-r--r-- | src/libutil/util.hh | 56 |
17 files changed, 548 insertions, 278 deletions
diff --git a/src/libutil/args.cc b/src/libutil/args.cc index 6e4b82a279ce..115484f9e6c7 100644 --- a/src/libutil/args.cc +++ b/src/libutil/args.cc @@ -71,10 +71,11 @@ void Args::printHelp(const string & programName, std::ostream & out) void Args::printFlags(std::ostream & out) { Table2 table; - for (auto & flags : longFlags) + for (auto & flag : longFlags) table.push_back(std::make_pair( - "--" + flags.first + renderLabels(flags.second.labels), - flags.second.description)); + (flag.second.shortName ? std::string("-") + flag.second.shortName + ", " : " ") + + "--" + flag.first + renderLabels(flag.second.labels), + flag.second.description)); printTable(out, table); } diff --git a/src/libutil/args.hh b/src/libutil/args.hh index 4469a046d28a..6aa08aacac9e 100644 --- a/src/libutil/args.hh +++ b/src/libutil/args.hh @@ -29,6 +29,7 @@ protected: /* Flags. */ struct Flag { + char shortName; std::string description; Strings labels; size_t arity; @@ -63,7 +64,7 @@ public: const Strings & labels, const std::string & description, size_t arity, std::function<void(Strings)> handler) { - auto flag = Flag{description, labels, arity, handler}; + auto flag = Flag{shortName, description, labels, arity, handler}; if (shortName) shortFlags[shortName] = flag; longFlags[longName] = flag; } diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc index b047d305cfa6..4d15d2acdd4e 100644 --- a/src/libutil/compression.cc +++ b/src/libutil/compression.cc @@ -1,88 +1,88 @@ #include "compression.hh" -#include "types.hh" +#include "util.hh" +#include "finally.hh" #include <lzma.h> +#include <bzlib.h> #include <cstdio> +#include <cstring> namespace nix { -/* RAII wrapper around lzma_stream. */ -struct LzmaStream +static ref<std::string> compressXZ(const std::string & in) { lzma_stream strm; - LzmaStream() : strm(LZMA_STREAM_INIT) { }; - ~LzmaStream() { lzma_end(&strm); }; - lzma_stream & operator()() { return strm; } -}; - -std::string compressXZ(const std::string & in) -{ - LzmaStream strm; // FIXME: apply the x86 BCJ filter? lzma_ret ret = lzma_easy_encoder( - &strm(), 6, LZMA_CHECK_CRC64); + &strm, 6, LZMA_CHECK_CRC64); if (ret != LZMA_OK) throw Error("unable to initialise lzma encoder"); + Finally free([&]() { lzma_end(&strm); }); + lzma_action action = LZMA_RUN; uint8_t outbuf[BUFSIZ]; - string res; - strm().next_in = (uint8_t *) in.c_str(); - strm().avail_in = in.size(); - strm().next_out = outbuf; - strm().avail_out = sizeof(outbuf); + ref<std::string> res = make_ref<std::string>(); + strm.next_in = (uint8_t *) in.c_str(); + strm.avail_in = in.size(); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); while (true) { + checkInterrupt(); - if (strm().avail_in == 0) + if (strm.avail_in == 0) action = LZMA_FINISH; - lzma_ret ret = lzma_code(&strm(), action); + lzma_ret ret = lzma_code(&strm, action); - if (strm().avail_out == 0 || ret == LZMA_STREAM_END) { - res.append((char *) outbuf, sizeof(outbuf) - strm().avail_out); - strm().next_out = outbuf; - strm().avail_out = sizeof(outbuf); + if (strm.avail_out == 0 || ret == LZMA_STREAM_END) { + res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); } if (ret == LZMA_STREAM_END) return res; if (ret != LZMA_OK) - throw Error("error while decompressing xz file"); + throw Error("error while compressing xz file"); } } -ref<std::string> decompressXZ(const std::string & in) +static ref<std::string> decompressXZ(const std::string & in) { - LzmaStream strm; + lzma_stream strm; lzma_ret ret = lzma_stream_decoder( - &strm(), UINT64_MAX, LZMA_CONCATENATED); + &strm, UINT64_MAX, LZMA_CONCATENATED); if (ret != LZMA_OK) throw Error("unable to initialise lzma decoder"); + Finally free([&]() { lzma_end(&strm); }); + lzma_action action = LZMA_RUN; uint8_t outbuf[BUFSIZ]; ref<std::string> res = make_ref<std::string>(); - strm().next_in = (uint8_t *) in.c_str(); - strm().avail_in = in.size(); - strm().next_out = outbuf; - strm().avail_out = sizeof(outbuf); + strm.next_in = (uint8_t *) in.c_str(); + strm.avail_in = in.size(); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); while (true) { + checkInterrupt(); - if (strm().avail_in == 0) + if (strm.avail_in == 0) action = LZMA_FINISH; - lzma_ret ret = lzma_code(&strm(), action); + lzma_ret ret = lzma_code(&strm, action); - if (strm().avail_out == 0 || ret == LZMA_STREAM_END) { - res->append((char *) outbuf, sizeof(outbuf) - strm().avail_out); - strm().next_out = outbuf; - strm().avail_out = sizeof(outbuf); + if (strm.avail_out == 0 || ret == LZMA_STREAM_END) { + res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); } if (ret == LZMA_STREAM_END) @@ -93,4 +93,108 @@ ref<std::string> decompressXZ(const std::string & in) } } +static ref<std::string> compressBzip2(const std::string & in) +{ + bz_stream strm; + memset(&strm, 0, sizeof(strm)); + + int ret = BZ2_bzCompressInit(&strm, 9, 0, 30); + if (ret != BZ_OK) + throw Error("unable to initialise bzip2 encoder"); + + Finally free([&]() { BZ2_bzCompressEnd(&strm); }); + + int action = BZ_RUN; + char outbuf[BUFSIZ]; + ref<std::string> res = make_ref<std::string>(); + strm.next_in = (char *) in.c_str(); + strm.avail_in = in.size(); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + + while (true) { + checkInterrupt(); + + if (strm.avail_in == 0) + action = BZ_FINISH; + + int ret = BZ2_bzCompress(&strm, action); + + if (strm.avail_out == 0 || ret == BZ_STREAM_END) { + res->append(outbuf, sizeof(outbuf) - strm.avail_out); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + } + + if (ret == BZ_STREAM_END) + return res; + + if (ret != BZ_OK && ret != BZ_FINISH_OK) + Error("error while compressing bzip2 file"); + } + + return res; +} + +static ref<std::string> decompressBzip2(const std::string & in) +{ + bz_stream strm; + memset(&strm, 0, sizeof(strm)); + + int ret = BZ2_bzDecompressInit(&strm, 0, 0); + if (ret != BZ_OK) + throw Error("unable to initialise bzip2 decoder"); + + Finally free([&]() { BZ2_bzDecompressEnd(&strm); }); + + char outbuf[BUFSIZ]; + ref<std::string> res = make_ref<std::string>(); + strm.next_in = (char *) in.c_str(); + strm.avail_in = in.size(); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + + while (true) { + checkInterrupt(); + + int ret = BZ2_bzDecompress(&strm); + + if (strm.avail_out == 0 || ret == BZ_STREAM_END) { + res->append(outbuf, sizeof(outbuf) - strm.avail_out); + strm.next_out = outbuf; + strm.avail_out = sizeof(outbuf); + } + + if (ret == BZ_STREAM_END) + return res; + + if (ret != BZ_OK) + throw Error("error while decompressing bzip2 file"); + } +} + +ref<std::string> compress(const std::string & method, ref<std::string> in) +{ + if (method == "none") + return in; + else if (method == "xz") + return compressXZ(*in); + else if (method == "bzip2") + return compressBzip2(*in); + else + throw UnknownCompressionMethod(format("unknown compression method ‘%s’") % method); +} + +ref<std::string> decompress(const std::string & method, ref<std::string> in) +{ + if (method == "none") + return in; + else if (method == "xz") + return decompressXZ(*in); + else if (method == "bzip2") + return decompressBzip2(*in); + else + throw UnknownCompressionMethod(format("unknown compression method ‘%s’") % method); +} + } diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh index 79a796db7756..33c465df8455 100644 --- a/src/libutil/compression.hh +++ b/src/libutil/compression.hh @@ -1,13 +1,16 @@ #pragma once #include "ref.hh" +#include "types.hh" #include <string> namespace nix { -std::string compressXZ(const std::string & in); +ref<std::string> compress(const std::string & method, ref<std::string> in); -ref<std::string> decompressXZ(const std::string & in); +ref<std::string> decompress(const std::string & method, ref<std::string> in); + +MakeError(UnknownCompressionMethod, Error); } diff --git a/src/libutil/finally.hh b/src/libutil/finally.hh new file mode 100644 index 000000000000..47c64deaecea --- /dev/null +++ b/src/libutil/finally.hh @@ -0,0 +1,12 @@ +#pragma once + +/* A trivial class to run a function at the end of a scope. */ +class Finally +{ +private: + std::function<void()> fun; + +public: + Finally(std::function<void()> fun) : fun(fun) { } + ~Finally() { fun(); } +}; diff --git a/src/libutil/hash.cc b/src/libutil/hash.cc index 64739300302b..c17f1c4d5150 100644 --- a/src/libutil/hash.cc +++ b/src/libutil/hash.cc @@ -33,7 +33,7 @@ Hash::Hash(HashType type) else if (type == htSHA1) hashSize = sha1HashSize; else if (type == htSHA256) hashSize = sha256HashSize; else if (type == htSHA512) hashSize = sha512HashSize; - else throw Error("unknown hash type"); + else abort(); assert(hashSize <= maxHashSize); memset(hash, 0, maxHashSize); } @@ -64,6 +64,12 @@ bool Hash::operator < (const Hash & h) const } +std::string Hash::to_string(bool base32) const +{ + return printHashType(type) + ":" + (base32 ? printHash32(*this) : printHash(*this)); +} + + const string base16Chars = "0123456789abcdef"; @@ -78,15 +84,28 @@ string printHash(const Hash & hash) } +Hash parseHash(const string & s) +{ + string::size_type colon = s.find(':'); + if (colon == string::npos) + throw BadHash(format("invalid hash ‘%s’") % s); + string hts = string(s, 0, colon); + HashType ht = parseHashType(hts); + if (ht == htUnknown) + throw BadHash(format("unknown hash type ‘%s’") % hts); + return parseHash16or32(ht, string(s, colon + 1)); +} + + Hash parseHash(HashType ht, const string & s) { Hash hash(ht); if (s.length() != hash.hashSize * 2) - throw Error(format("invalid hash ‘%1%’") % s); + throw BadHash(format("invalid hash ‘%1%’") % s); for (unsigned int i = 0; i < hash.hashSize; i++) { string s2(s, i * 2, 2); if (!isxdigit(s2[0]) || !isxdigit(s2[1])) - throw Error(format("invalid hash ‘%1%’") % s); + throw BadHash(format("invalid hash ‘%1%’") % s); std::istringstream str(s2); int n; str >> std::hex >> n; @@ -103,6 +122,7 @@ const string base32Chars = "0123456789abcdfghijklmnpqrsvwxyz"; string printHash32(const Hash & hash) { size_t len = hash.base32Len(); + assert(len); string s; s.reserve(len); @@ -139,7 +159,7 @@ Hash parseHash32(HashType ht, const string & s) for (digit = 0; digit < base32Chars.size(); ++digit) /* !!! slow */ if (base32Chars[digit] == c) break; if (digit >= 32) - throw Error(format("invalid base-32 hash ‘%1%’") % s); + throw BadHash(format("invalid base-32 hash ‘%1%’") % s); unsigned int b = n * 5; unsigned int i = b / 8; unsigned int j = b % 8; @@ -161,7 +181,7 @@ Hash parseHash16or32(HashType ht, const string & s) /* base-32 representation */ hash = parseHash32(ht, s); else - throw Error(format("hash ‘%1%’ has wrong length for hash type ‘%2%’") + throw BadHash(format("hash ‘%1%’ has wrong length for hash type ‘%2%’") % s % printHashType(ht)); return hash; } @@ -322,7 +342,7 @@ string printHashType(HashType ht) else if (ht == htSHA1) return "sha1"; else if (ht == htSHA256) return "sha256"; else if (ht == htSHA512) return "sha512"; - else throw Error("cannot print unknown hash type"); + else abort(); } diff --git a/src/libutil/hash.hh b/src/libutil/hash.hh index bac2ebf2dcfa..02e213fc7b35 100644 --- a/src/libutil/hash.hh +++ b/src/libutil/hash.hh @@ -7,6 +7,9 @@ namespace nix { +MakeError(BadHash, Error); + + enum HashType : char { htUnknown, htMD5, htSHA1, htSHA256, htSHA512 }; @@ -26,12 +29,15 @@ struct Hash HashType type; - /* Create an unusable hash object. */ + /* Create an unset hash object. */ Hash(); /* Create a zero-filled hash object. */ Hash(HashType type); + /* Check whether a hash is set. */ + operator bool () const { return type != htUnknown; } + /* Check whether two hash are equal. */ bool operator == (const Hash & h2) const; @@ -52,12 +58,16 @@ struct Hash { return (hashSize * 8 - 1) / 5 + 1; } + + std::string to_string(bool base32 = true) const; }; /* Convert a hash to a hexadecimal representation. */ string printHash(const Hash & hash); +Hash parseHash(const string & s); + /* Parse a hexadecimal representation of a hash code. */ Hash parseHash(HashType ht, const string & s); diff --git a/src/libutil/local.mk b/src/libutil/local.mk index 2e5d2672e5f0..98cad00d6d95 100644 --- a/src/libutil/local.mk +++ b/src/libutil/local.mk @@ -6,6 +6,6 @@ libutil_DIR := $(d) libutil_SOURCES := $(wildcard $(d)/*.cc) -libutil_LDFLAGS = -llzma -pthread $(OPENSSL_LIBS) +libutil_LDFLAGS = -llzma -lbz2 -pthread $(OPENSSL_LIBS) libutil_LIBS = libformat diff --git a/src/libutil/logging.cc b/src/libutil/logging.cc new file mode 100644 index 000000000000..15bb1e175da6 --- /dev/null +++ b/src/libutil/logging.cc @@ -0,0 +1,79 @@ +#include "logging.hh" +#include "util.hh" + +namespace nix { + +Logger * logger = 0; + +class SimpleLogger : public Logger +{ +public: + + bool systemd, tty; + + SimpleLogger() + { + systemd = getEnv("IN_SYSTEMD") == "1"; + tty = isatty(STDERR_FILENO); + } + + void log(Verbosity lvl, const FormatOrString & fs) override + { + if (lvl > verbosity) return; + + std::string prefix; + + if (systemd) { + char c; + switch (lvl) { + case lvlError: c = '3'; break; + case lvlInfo: c = '5'; break; + case lvlTalkative: case lvlChatty: c = '6'; break; + default: c = '7'; + } + prefix = std::string("<") + c + ">"; + } + + writeToStderr(prefix + (tty ? fs.s : filterANSIEscapes(fs.s)) + "\n"); + } + + void startActivity(Activity & activity, Verbosity lvl, const FormatOrString & fs) override + { + log(lvl, fs); + } + + void stopActivity(Activity & activity) override + { + } +}; + +Verbosity verbosity = lvlInfo; + +void warnOnce(bool & haveWarned, const FormatOrString & fs) +{ + if (!haveWarned) { + printMsg(lvlError, format("warning: %1%") % fs.s); + haveWarned = true; + } +} + +void writeToStderr(const string & s) +{ + try { + writeFull(STDERR_FILENO, s); + } catch (SysError & e) { + /* Ignore failing writes to stderr if we're in an exception + handler, otherwise throw an exception. We need to ignore + write errors in exception handlers to ensure that cleanup + code runs to completion if the other side of stderr has + been closed unexpectedly. */ + if (!std::uncaught_exception()) throw; + } +} + +Logger * makeDefaultLogger() +{ + return new SimpleLogger(); +} + +} diff --git a/src/libutil/logging.hh b/src/libutil/logging.hh new file mode 100644 index 000000000000..277dff280053 --- /dev/null +++ b/src/libutil/logging.hh @@ -0,0 +1,82 @@ +#pragma once + +#include "types.hh" + +namespace nix { + +typedef enum { + lvlError = 0, + lvlInfo, + lvlTalkative, + lvlChatty, + lvlDebug, + lvlVomit +} Verbosity; + +class Activity; + +class Logger +{ + friend class Activity; + +public: + + virtual ~Logger() { } + + virtual void log(Verbosity lvl, const FormatOrString & fs) = 0; + + void log(const FormatOrString & fs) + { + log(lvlInfo, fs); + } + + virtual void setExpected(const std::string & label, uint64_t value = 1) { } + virtual void setProgress(const std::string & label, uint64_t value = 1) { } + virtual void incExpected(const std::string & label, uint64_t value = 1) { } + virtual void incProgress(const std::string & label, uint64_t value = 1) { } + +private: + + virtual void startActivity(Activity & activity, Verbosity lvl, const FormatOrString & fs) = 0; + + virtual void stopActivity(Activity & activity) = 0; + +}; + +class Activity +{ +public: + Logger & logger; + + Activity(Logger & logger, Verbosity lvl, const FormatOrString & fs) + : logger(logger) + { + logger.startActivity(*this, lvl, fs); + } + + ~Activity() + { + logger.stopActivity(*this); + } +}; + +extern Logger * logger; + +Logger * makeDefaultLogger(); + +extern Verbosity verbosity; /* suppress msgs > this */ + +#define printMsg(level, f) \ + do { \ + if (level <= nix::verbosity) { \ + logger->log(level, (f)); \ + } \ + } while (0) + +#define debug(f) printMsg(lvlDebug, f) + +void warnOnce(bool & haveWarned, const FormatOrString & fs); + +void writeToStderr(const string & s); + +} diff --git a/src/libutil/lru-cache.hh b/src/libutil/lru-cache.hh index 4344d6601bc8..35983aa2c918 100644 --- a/src/libutil/lru-cache.hh +++ b/src/libutil/lru-cache.hh @@ -79,6 +79,12 @@ public: { return data.size(); } + + void clear() + { + data.clear(); + lru.clear(); + } }; } diff --git a/src/libutil/ref.hh b/src/libutil/ref.hh index 349f24f7c488..9f5da09152c9 100644 --- a/src/libutil/ref.hh +++ b/src/libutil/ref.hh @@ -44,6 +44,12 @@ public: } template<typename T2> + ref<T2> cast() + { + return ref<T2>(std::dynamic_pointer_cast<T2>(p)); + } + + template<typename T2> operator ref<T2> () { return ref<T2>((std::shared_ptr<T2>) p); diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index 743038b588a7..32363ecf0098 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -1,79 +1,99 @@ #include "thread-pool.hh" +#include "affinity.hh" namespace nix { -ThreadPool::ThreadPool(size_t _nrThreads) - : nrThreads(_nrThreads) +ThreadPool::ThreadPool(size_t _maxThreads) + : maxThreads(_maxThreads) { - if (!nrThreads) { - nrThreads = std::thread::hardware_concurrency(); - if (!nrThreads) nrThreads = 1; + restoreAffinity(); // FIXME + + if (!maxThreads) { + maxThreads = std::thread::hardware_concurrency(); + if (!maxThreads) maxThreads = 1; } + + debug(format("starting pool of %d threads") % maxThreads); +} + +ThreadPool::~ThreadPool() +{ + std::vector<std::thread> workers; + { + auto state(state_.lock()); + state->quit = true; + std::swap(workers, state->workers); + } + + debug(format("reaping %d worker threads") % workers.size()); + + work.notify_all(); + + for (auto & thr : workers) + thr.join(); } void ThreadPool::enqueue(const work_t & t) { - auto state_(state.lock()); - state_->left.push(t); - wakeup.notify_one(); + auto state(state_.lock()); + assert(!state->quit); + state->left.push(t); + if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) + state->workers.emplace_back(&ThreadPool::workerEntry, this); + work.notify_one(); } void ThreadPool::process() { - printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads); - - std::vector<std::thread> workers; + while (true) { + auto state(state_.lock()); + if (state->exception) + std::rethrow_exception(state->exception); + if (state->left.empty() && !state->pending) break; + state.wait(done); + } +} - for (size_t n = 0; n < nrThreads; n++) - workers.push_back(std::thread([&]() { - bool first = true; +void ThreadPool::workerEntry() +{ + bool didWork = false; + while (true) { + work_t w; + { + auto state(state_.lock()); while (true) { - work_t work; - { - auto state_(state.lock()); - if (state_->exception) return; - if (!first) { - assert(state_->pending); - state_->pending--; - } - first = false; - while (state_->left.empty()) { - if (!state_->pending) { - wakeup.notify_all(); - return; - } - if (state_->exception) return; - state_.wait(wakeup); - } - work = state_->left.front(); - state_->left.pop(); - state_->pending++; - } - - try { - work(); - } catch (std::exception & e) { - auto state_(state.lock()); - if (state_->exception) { - if (!dynamic_cast<Interrupted*>(&e)) - printMsg(lvlError, format("error: %s") % e.what()); - } else { - state_->exception = std::current_exception(); - wakeup.notify_all(); - } + if (state->quit || state->exception) return; + if (didWork) { + assert(state->pending); + state->pending--; + didWork = false; } + if (!state->left.empty()) break; + if (!state->pending) + done.notify_all(); + state.wait(work); } + w = state->left.front(); + state->left.pop(); + state->pending++; + } - })); - - for (auto & thr : workers) - thr.join(); + try { + w(); + } catch (std::exception & e) { + auto state(state_.lock()); + if (state->exception) { + if (!dynamic_cast<Interrupted*>(&e)) + printMsg(lvlError, format("error: %s") % e.what()); + } else { + state->exception = std::current_exception(); + work.notify_all(); + done.notify_all(); + } + } - { - auto state_(state.lock()); - if (state_->exception) - std::rethrow_exception(state_->exception); + didWork = true; } } diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 77641d88ba4e..78b63467d62e 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -6,6 +6,7 @@ #include <queue> #include <functional> #include <thread> +#include <map> namespace nix { @@ -15,7 +16,9 @@ class ThreadPool { public: - ThreadPool(size_t nrThreads = 0); + ThreadPool(size_t maxThreads = 0); + + ~ThreadPool(); // FIXME: use std::packaged_task? typedef std::function<void()> work_t; @@ -34,19 +37,81 @@ public: private: - size_t nrThreads; + size_t maxThreads; struct State { std::queue<work_t> left; size_t pending = 0; std::exception_ptr exception; + std::vector<std::thread> workers; + bool quit = false; }; - Sync<State> state; + Sync<State> state_; - std::condition_variable wakeup; + std::condition_variable work, done; + void workerEntry(); }; +/* Process in parallel a set of items of type T that have a partial + ordering between them. Thus, any item is only processed after all + its dependencies have been processed. */ +template<typename T> +void processGraph( + ThreadPool & pool, + const std::set<T> & nodes, + std::function<std::set<T>(const T &)> getEdges, + std::function<void(const T &)> processNode) +{ + struct Graph { + std::set<T> left; + std::map<T, std::set<T>> refs, rrefs; + std::function<void(T)> wrap; + }; + + ref<Sync<Graph>> graph_ = make_ref<Sync<Graph>>(); + + auto wrapWork = [&pool, graph_, processNode](const T & node) { + processNode(node); + + /* Enqueue work for all nodes that were waiting on this one. */ + { + auto graph(graph_->lock()); + graph->left.erase(node); + for (auto & rref : graph->rrefs[node]) { + auto & refs(graph->refs[rref]); + auto i = refs.find(node); + assert(i != refs.end()); + refs.erase(i); + if (refs.empty()) + pool.enqueue(std::bind(graph->wrap, rref)); + } + } + }; + + { + auto graph(graph_->lock()); + graph->left = nodes; + graph->wrap = wrapWork; + } + + /* Build the dependency graph; enqueue all nodes with no + dependencies. */ + for (auto & node : nodes) { + auto refs = getEdges(node); + { + auto graph(graph_->lock()); + for (auto & ref : refs) + if (ref != node && graph->left.count(ref)) { + graph->refs[node].insert(ref); + graph->rrefs[ref].insert(node); + } + if (graph->refs[node].empty()) + pool.enqueue(std::bind(graph->wrap, node)); + } + } +} + } diff --git a/src/libutil/types.hh b/src/libutil/types.hh index 33aaf5fc9c4d..bd192b8506b2 100644 --- a/src/libutil/types.hh +++ b/src/libutil/types.hh @@ -89,14 +89,4 @@ typedef list<Path> Paths; typedef set<Path> PathSet; -typedef enum { - lvlError = 0, - lvlInfo, - lvlTalkative, - lvlChatty, - lvlDebug, - lvlVomit -} Verbosity; - - } diff --git a/src/libutil/util.cc b/src/libutil/util.cc index 55d490992108..67558cc0b33c 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -356,8 +356,7 @@ void deletePath(const Path & path) void deletePath(const Path & path, unsigned long long & bytesFreed) { - startNest(nest, lvlDebug, - format("recursively deleting path ‘%1%’") % path); + Activity act(*logger, lvlDebug, format("recursively deleting path ‘%1%’") % path); bytesFreed = 0; _deletePath(path, bytesFreed); } @@ -403,6 +402,18 @@ Path createTempDir(const Path & tmpRoot, const Path & prefix, } +Path getCacheDir() +{ + Path cacheDir = getEnv("XDG_CACHE_HOME"); + if (cacheDir.empty()) { + Path homeDir = getEnv("HOME"); + if (homeDir.empty()) throw Error("$XDG_CACHE_HOME and $HOME are not set"); + cacheDir = homeDir + "/.cache"; + } + return cacheDir; +} + + Paths createDirs(const Path & path) { Paths created; @@ -444,113 +455,6 @@ void replaceSymlink(const Path & target, const Path & link) } -LogType logType = ltPretty; -Verbosity verbosity = lvlInfo; - -static int nestingLevel = 0; - - -Nest::Nest() -{ - nest = false; -} - - -Nest::~Nest() -{ - close(); -} - - -static string escVerbosity(Verbosity level) -{ - return std::to_string((int) level); -} - - -void Nest::open(Verbosity level, const FormatOrString & fs) -{ - if (level <= verbosity) { - if (logType == ltEscapes) - std::cerr << "\033[" << escVerbosity(level) << "p" - << fs.s << "\n"; - else - printMsg_(level, fs); - nest = true; - nestingLevel++; - } -} - - -void Nest::close() -{ - if (nest) { - nestingLevel--; - if (logType == ltEscapes) - std::cerr << "\033[q"; - nest = false; - } -} - - -void printMsg_(Verbosity level, const FormatOrString & fs) -{ - checkInterrupt(); - if (level > verbosity) return; - - string prefix; - if (logType == ltPretty) - for (int i = 0; i < nestingLevel; i++) - prefix += "| "; - else if (logType == ltEscapes && level != lvlInfo) - prefix = "\033[" + escVerbosity(level) + "s"; - else if (logType == ltSystemd) { - char c; - switch (level) { - case lvlError: c = '3'; break; - case lvlInfo: c = '5'; break; - case lvlTalkative: case lvlChatty: c = '6'; break; - default: c = '7'; - } - prefix = string("<") + c + ">"; - } - - string s = (format("%1%%2%\n") % prefix % fs.s).str(); - if (!isatty(STDERR_FILENO)) s = filterANSIEscapes(s); - writeToStderr(s); -} - - -void warnOnce(bool & haveWarned, const FormatOrString & fs) -{ - if (!haveWarned) { - printMsg(lvlError, format("warning: %1%") % fs.s); - haveWarned = true; - } -} - - -void writeToStderr(const string & s) -{ - try { - if (_writeToStderr) - _writeToStderr((const unsigned char *) s.data(), s.size()); - else - writeFull(STDERR_FILENO, s); - } catch (SysError & e) { - /* Ignore failing writes to stderr if we're in an exception - handler, otherwise throw an exception. We need to ignore - write errors in exception handlers to ensure that cleanup - code runs to completion if the other side of stderr has - been closed unexpectedly. */ - if (!std::uncaught_exception()) throw; - } -} - - -std::function<void(const unsigned char * buf, size_t count)> _writeToStderr; - - void readFull(int fd, unsigned char * buf, size_t count) { while (count) { @@ -941,7 +845,8 @@ static pid_t doFork(bool allowVfork, std::function<void()> fun) pid_t startProcess(std::function<void()> fun, const ProcessOptions & options) { auto wrapper = [&]() { - if (!options.allowVfork) _writeToStderr = 0; + if (!options.allowVfork) + logger = makeDefaultLogger(); try { #if __linux__ if (options.dieWithParent && prctl(PR_SET_PDEATHSIG, SIGKILL) == -1) @@ -1177,6 +1082,12 @@ bool statusOk(int status) } +bool hasPrefix(const string & s, const string & suffix) +{ + return s.compare(0, suffix.size(), suffix) == 0; +} + + bool hasSuffix(const string & s, const string & suffix) { return s.size() >= suffix.size() && string(s, s.size() - suffix.size()) == suffix; diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 20bd62a0e752..f3f0f92a0aaa 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -1,6 +1,7 @@ #pragma once #include "types.hh" +#include "logging.hh" #include <sys/types.h> #include <sys/stat.h> @@ -102,6 +103,9 @@ void deletePath(const Path & path, unsigned long long & bytesFreed); Path createTempDir(const Path & tmpRoot = "", const Path & prefix = "nix", bool includePid = true, bool useGlobalCounter = true, mode_t mode = 0755); +/* Return the path to $XDG_CACHE_HOME/.cache. */ +Path getCacheDir(); + /* Create a directory and all its parents, if necessary. Returns the list of created directories, in order of creation. */ Paths createDirs(const Path & path); @@ -122,54 +126,6 @@ T singleton(const A & a) } -/* Messages. */ - - -typedef enum { - ltPretty, /* nice, nested output */ - ltEscapes, /* nesting indicated using escape codes (for log2xml) */ - ltFlat, /* no nesting */ - ltSystemd, /* use systemd severity prefixes */ -} LogType; - -extern LogType logType; -extern Verbosity verbosity; /* suppress msgs > this */ - -class Nest -{ -private: - bool nest; -public: - Nest(); - ~Nest(); - void open(Verbosity level, const FormatOrString & fs); - void close(); -}; - -void printMsg_(Verbosity level, const FormatOrString & fs); - -#define startNest(varName, level, f) \ - Nest varName; \ - if (level <= verbosity) { \ - varName.open(level, (f)); \ - } - -#define printMsg(level, f) \ - do { \ - if (level <= nix::verbosity) { \ - nix::printMsg_(level, (f)); \ - } \ - } while (0) - -#define debug(f) printMsg(lvlDebug, f) - -void warnOnce(bool & haveWarned, const FormatOrString & fs); - -void writeToStderr(const string & s); - -extern std::function<void(const unsigned char * buf, size_t count)> _writeToStderr; - - /* Wrappers arount read()/write() that read/write exactly the requested number of bytes. */ void readFull(int fd, unsigned char * buf, size_t count); @@ -377,6 +333,10 @@ template<class N> bool string2Float(const string & s, N & n) } +/* Return true iff `s' starts with `prefix'. */ +bool hasPrefix(const string & s, const string & prefix); + + /* Return true iff `s' ends in `suffix'. */ bool hasSuffix(const string & s, const string & suffix); |