diff options
author | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2014-07-10T12·15+0200 |
---|---|---|
committer | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2014-07-10T12·15+0200 |
commit | 1114c7bd57bcab16255d5db5e6f66ae8dece7b1e (patch) | |
tree | 59c96d86ae08301382bbc93f1fea282b43d79df8 | |
parent | 7911e4c27a0020a61ace13cfdc44de4af02f315e (diff) |
nix-copy-closure: Restore compression and the progress viewer
-rw-r--r-- | perl/lib/Nix/CopyClosure.pm | 52 | ||||
-rwxr-xr-x | scripts/nix-copy-closure.in | 4 | ||||
-rw-r--r-- | src/nix-store/nix-store.cc | 49 |
3 files changed, 89 insertions, 16 deletions
diff --git a/perl/lib/Nix/CopyClosure.pm b/perl/lib/Nix/CopyClosure.pm index cba365aa1745..5085ec075b96 100644 --- a/perl/lib/Nix/CopyClosure.pm +++ b/perl/lib/Nix/CopyClosure.pm @@ -15,6 +15,16 @@ sub readInt { } +sub writeString { + my ($s, $to) = @_; + my $len = length $s; + my $req .= pack("L<x4", $len); + $req .= $s; + $req .= "\000" x (8 - $len % 8) if $len % 8; + syswrite($to, $req) or die; +} + + sub copyTo { my ($sshHost, $sshOpts, $storePaths, $compressor, $decompressor, $includeOutputs, $dryRun, $sign, $progressViewer, $useSubstitutes) = @_; @@ -49,16 +59,10 @@ sub copyTo { } # Send the "query valid paths" command with the "lock" option - # enabled. This prevens a race where the remote host + # enabled. This prevents a race where the remote host # garbage-collect paths that are already there. - my $req = pack("L<x4L<x4L<x4", 1, 1, scalar @closure); - for my $s (@closure) { - my $len = length $s; - $req .= pack("L<x4", $len); - $req .= $s; - $req .= "\000" x (8 - $len % 8) if $len % 8; - } - syswrite($to, $req) or die; + syswrite($to, pack("L<x4L<x4L<x4", 1, 1, scalar @closure)) or die; + writeString($_, $to) foreach @closure; # Get back the set of paths that are already valid on the remote host. my %present; @@ -76,11 +80,35 @@ sub copyTo { # Send the "import paths" command. syswrite($to, pack("L<x4", 4)) or die; - exportPaths(fileno($to), $sign, @missing); - readInt($from) == 1 or die; + writeString($compressor, $to); + + if ($compressor || $progressViewer) { + + # Compute the size of the closure for the progress viewer. + if ($progressViewer) { + my $missingSize = 0; + $missingSize += (queryPathInfo($_, 1))[3] foreach @missing; + $progressViewer = "$progressViewer -s $missingSize"; + } + + # Start the compressor and/or progress viewer in between us + # and the remote host. + my $to_; + my $pid2 = open2(">&" . fileno($to), $to_, + $progressViewer && $compressor ? "$progressViewer | $compressor" : $progressViewer || $compressor); + close $to; + exportPaths(fileno($to_), $sign, @missing); + close $to_; + waitpid $pid2, 0; + + } else { + exportPaths(fileno($to), $sign, @missing); + close $to; + } + + readInt($from) == 1 or die "remote machine \`$sshHost' failed to import closure\n"; # Shut down the server process. - close $to; waitpid $pid, 0; } diff --git a/scripts/nix-copy-closure.in b/scripts/nix-copy-closure.in index 23d5619519a4..abd3760fc138 100755 --- a/scripts/nix-copy-closure.in +++ b/scripts/nix-copy-closure.in @@ -42,11 +42,11 @@ while (@ARGV) { } elsif ($arg eq "--gzip") { $compressor = "gzip"; - $decompressor = "gunzip"; + $decompressor = "gzip -d"; } elsif ($arg eq "--bzip2") { $compressor = "bzip2"; - $decompressor = "bunzip2"; + $decompressor = "bzip2 -d"; } elsif ($arg eq "--xz") { $compressor = "xz"; diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc index 849cb7e8a77b..bb5a9e2e0ba2 100644 --- a/src/nix-store/nix-store.cc +++ b/src/nix-store/nix-store.cc @@ -928,12 +928,57 @@ static void opServe(Strings opFlags, Strings opArgs) dumpPath(readStorePath(in), out); out.flush(); break; - case cmdImportPaths: + case cmdImportPaths: { if (!writeAllowed) throw Error("importing paths not allowed"); - store->importPaths(false, in); + string compression = readString(in); + + if (compression != "") { + if (compression != "gzip" && compression != "bzip2" && compression != "xz") + throw Error(format("unsupported compression method `%1%'") % compression); + + Pipe fromDecompressor; + fromDecompressor.create(); + + Pid pid; + pid = fork(); + + switch (pid) { + + case -1: + throw SysError("unable to fork"); + + case 0: /* child */ + try { + fromDecompressor.readSide.close(); + if (dup2(fromDecompressor.writeSide, STDOUT_FILENO) == -1) + throw SysError("dupping stdout"); + // FIXME: use absolute path. + execlp(compression.c_str(), compression.c_str(), "-d", NULL); + throw SysError(format("executing `%1%'") % compression); + } catch (std::exception & e) { + std::cerr << "error: " << e.what() << std::endl; + } + _exit(1); + } + + fromDecompressor.writeSide.close(); + + FdSource fromDecompressor_(fromDecompressor.readSide); + store->importPaths(false, fromDecompressor_); + + pid.wait(true); + } else + store->importPaths(false, in); + writeInt(1, out); // indicate success out.flush(); + + /* The decompressor will have left stdin in an + undefined state, so we can't continue. */ + if (compression != "") return; + break; + } default: throw Error(format("unknown serve command %1%") % cmd); } |