From 7951c3c5460324c652d42f5f92bcae44e0a0b9c7 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Sun, 3 Dec 2006 02:08:13 +0000 Subject: * Some hackery to propagate the worker's stderr and exceptions to the client. --- src/nix-worker/main.cc | 245 ++++++++++++++++++++++++++++++------------------- 1 file changed, 150 insertions(+), 95 deletions(-) (limited to 'src/nix-worker/main.cc') diff --git a/src/nix-worker/main.cc b/src/nix-worker/main.cc index cf550895e4b2..17e892c648fc 100644 --- a/src/nix-worker/main.cc +++ b/src/nix-worker/main.cc @@ -10,7 +10,7 @@ using namespace nix; -Path readStorePath(Source & from) +static Path readStorePath(Source & from) { Path path = readString(from); assertStorePath(path); @@ -18,7 +18,7 @@ Path readStorePath(Source & from) } -PathSet readStorePaths(Source & from) +static PathSet readStorePaths(Source & from) { PathSet paths = readStringSet(from); for (PathSet::iterator i = paths.begin(); i != paths.end(); ++i) @@ -27,123 +27,178 @@ PathSet readStorePaths(Source & from) } -void processConnection(Source & from, Sink & to) +static Sink * _to; /* !!! should make writeToStderr an object */ +bool canSendStderr; + + +static void tunnelStderr(const unsigned char * buf, size_t count) { - store = boost::shared_ptr(new LocalStore(true)); + writeFull(STDERR_FILENO, buf, count); + if (canSendStderr) { + try { + writeInt(STDERR_NEXT, *_to); + writeString(string((char *) buf, count), *_to); + } catch (...) { + /* Write failed; that means that the other side is + gone. */ + canSendStderr = false; + throw; + } + } +} - unsigned int magic = readInt(from); - if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch"); - writeInt(WORKER_MAGIC_2, to); +/* startWork() means that we're starting an operation for which we + want to send out stderr to the client. */ +static void startWork() +{ + canSendStderr = true; +} - debug("greeting exchanged"); - bool quit = false; +/* stopWork() means that we're done; stop sending stderr to the + client. */ +static void stopWork() +{ + canSendStderr = false; + writeInt(STDERR_LAST, *_to); +} - unsigned int opCount = 0; - - do { + +static void performOp(Source & from, Sink & to, unsigned int op) +{ + switch (op) { + +#if 0 + case wopQuit: { + /* Close the database. */ + store.reset((StoreAPI *) 0); + writeInt(1, to); + break; + } +#endif + + case wopIsValidPath: { + Path path = readStorePath(from); + writeInt(store->isValidPath(path), to); + break; + } + + case wopHasSubstitutes: { + Path path = readStorePath(from); + writeInt(store->hasSubstitutes(path), to); + break; + } + + case wopQueryPathHash: { + Path path = readStorePath(from); + writeString(printHash(store->queryPathHash(path)), to); + break; + } + + case wopQueryReferences: + case wopQueryReferrers: { + Path path = readStorePath(from); + PathSet paths; + if (op == wopQueryReferences) + store->queryReferences(path, paths); + else + store->queryReferrers(path, paths); + writeStringSet(paths, to); + break; + } + + case wopAddToStore: { + /* !!! uberquick hack */ + string baseName = readString(from); + bool fixed = readInt(from) == 1; + bool recursive = readInt(from) == 1; + string hashAlgo = readString(from); - WorkerOp op = (WorkerOp) readInt(from); + Path tmp = createTempDir(); + Path tmp2 = tmp + "/" + baseName; + restorePath(tmp2, from); - opCount++; + writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to); + + deletePath(tmp); + break; + } - switch (op) { + case wopAddTextToStore: { + string suffix = readString(from); + string s = readString(from); + PathSet refs = readStorePaths(from); + writeString(store->addTextToStore(suffix, s, refs), to); + break; + } - case wopQuit: { - /* Close the database. */ - store.reset((StoreAPI *) 0); - writeInt(1, to); - quit = true; - break; - } + case wopBuildDerivations: { + PathSet drvs = readStorePaths(from); + startWork(); + store->buildDerivations(drvs); + stopWork(); + writeInt(1, to); + break; + } - case wopIsValidPath: { - Path path = readStorePath(from); - writeInt(store->isValidPath(path), to); - break; - } + case wopEnsurePath: { + Path path = readStorePath(from); + store->ensurePath(path); + writeInt(1, to); + break; + } - case wopHasSubstitutes: { - Path path = readStorePath(from); - writeInt(store->hasSubstitutes(path), to); - break; - } + case wopAddTempRoot: { + Path path = readStorePath(from); + store->addTempRoot(path); + writeInt(1, to); + break; + } - case wopQueryPathHash: { - Path path = readStorePath(from); - writeString(printHash(store->queryPathHash(path)), to); - break; - } + case wopSyncWithGC: { + store->syncWithGC(); + writeInt(1, to); + break; + } - case wopQueryReferences: - case wopQueryReferrers: { - Path path = readStorePath(from); - PathSet paths; - if (op == wopQueryReferences) - store->queryReferences(path, paths); - else - store->queryReferrers(path, paths); - writeStringSet(paths, to); - break; - } + default: + throw Error(format("invalid operation %1%") % op); + } +} - case wopAddToStore: { - /* !!! uberquick hack */ - string baseName = readString(from); - bool fixed = readInt(from) == 1; - bool recursive = readInt(from) == 1; - string hashAlgo = readString(from); - Path tmp = createTempDir(); - Path tmp2 = tmp + "/" + baseName; - restorePath(tmp2, from); +static void processConnection(Source & from, Sink & to) +{ + store = boost::shared_ptr(new LocalStore(true)); - writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to); - - deletePath(tmp); - break; - } + unsigned int magic = readInt(from); + if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch"); - case wopAddTextToStore: { - string suffix = readString(from); - string s = readString(from); - PathSet refs = readStorePaths(from); - writeString(store->addTextToStore(suffix, s, refs), to); - break; - } + writeInt(WORKER_MAGIC_2, to); - case wopBuildDerivations: { - PathSet drvs = readStorePaths(from); - store->buildDerivations(drvs); - writeInt(1, to); - break; - } + debug("greeting exchanged"); - case wopEnsurePath: { - Path path = readStorePath(from); - store->ensurePath(path); - writeInt(1, to); - break; - } + _to = &to; + canSendStderr = false; + writeToStderr = tunnelStderr; - case wopAddTempRoot: { - Path path = readStorePath(from); - store->addTempRoot(path); - writeInt(1, to); - break; - } + bool quit = false; - case wopSyncWithGC: { - store->syncWithGC(); - writeInt(1, to); - break; - } + unsigned int opCount = 0; + + do { + WorkerOp op = (WorkerOp) readInt(from); - default: - throw Error(format("invalid operation %1%") % op); + opCount++; + + try { + performOp(from, to, op); + } catch (Error & e) { + writeInt(STDERR_ERROR, *_to); + writeString(e.msg(), to); } - + } while (!quit); printMsg(lvlError, format("%1% worker operations") % opCount); -- cgit 1.4.1