about summary refs log blame commit diff
path: root/src/nix-worker/main.cc
blob: 666f615723ae72e23bfb50d69de06c1c79cdd22c (plain) (tree)
1
2
3
4
5
6
7
8


                         
                       
                             
                     
 
                   


                   
 


                    
                                        






                                 
                                            







                                                                    



                                                                



                                                                   
                                                                 
 












                                                              
 
 













                                                                      




                                                                  

                                             



























                                                                      
 
 
 



                                                               


                                                                     

                                          
    


                                
 

















































                                                                
        


                                         
 




                                                                             
 






                                                                
 







                                            
 





                                        
 





                                        
 




                            
 



                                                          
 
 


                                                              
 

                                                                  
 
                                 
 
                                
    


                                 
 
                      
 



                                               
 






                                         
         
 
                    

                                                                  










                                                                 
                                             

     






                                                                                     


                                      





                                                                    

                                        





                                                                      


 

                      

                
                                                            


 
                                
#include "shared.hh"
#include "local-store.hh"
#include "util.hh"
#include "serialise.hh"
#include "worker-protocol.hh"
#include "archive.hh"

#include <iostream>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>

using namespace nix;


static Path readStorePath(Source & from)
{
    Path path = readString(from);
    assertStorePath(path);
    return path;
}


static PathSet readStorePaths(Source & from)
{
    PathSet paths = readStringSet(from);
    for (PathSet::iterator i = paths.begin(); i != paths.end(); ++i)
        assertStorePath(*i);
    return paths;
}


static Sink * _to; /* !!! should make writeToStderr an object */
bool canSendStderr;


/* This function is called anytime we want to write something to
   stderr.  If we're in a state where the protocol allows it (i.e.,
   when canSendStderr), send the message to the client over the
   socket. */
static void tunnelStderr(const unsigned char * buf, size_t count)
{
    writeFull(STDERR_FILENO, buf, count);
    if (canSendStderr) {
        try {
            writeInt(STDERR_NEXT, *_to);
            writeString(string((char *) buf, count), *_to);
        } catch (...) {
            /* Write failed; that means that the other side is
               gone. */
            canSendStderr = false;
            throw;
        }
    }
}


/* A SIGIO signal is received when data is available on the client
   communication scoket, or when the client has closed its side of the
   socket.  This handler is enabled at precisely those moments in the
   protocol when we're doing work and the client is supposed to be
   quiet.  Thus, if we get a SIGIO signal, it means that the client
   has quit.  So we should quit as well. */
static void sigioHandler(int sigNo)
{
    _isInterrupted = 1;
    canSendStderr = false;
    write(STDERR_FILENO, "SIGIO\n", 6);
}


/* startWork() means that we're starting an operation for which we
   want to send out stderr to the client. */
static void startWork()
{
    canSendStderr = true;

    /* Handle client death asynchronously. */
    if (signal(SIGIO, sigioHandler) == SIG_ERR)
        throw SysError("setting handler for SIGIO");

    /* Of course, there is a race condition here: the socket could
       have closed between when we last read from / wrote to it, and
       between the time we set the handler for SIGIO.  In that case we
       won't get the signal.  So do a non-blocking select() to find
       out if any input is available on the socket.  If there is, it
       has to be the 0-byte read that indicates that the socket has
       closed. */
    
    struct timeval timeout;
    timeout.tv_sec = timeout.tv_usec = 0;

    fd_set fds;
    FD_ZERO(&fds);
    FD_SET(STDIN_FILENO, &fds);
        
    if (select(STDIN_FILENO + 1, &fds, 0, 0, &timeout) == -1)
        throw SysError("select()");

    if (FD_ISSET(STDIN_FILENO, &fds)) {
        char c;
        if (read(STDIN_FILENO, &c, 1) != 0)
            throw Error("EOF expected (protocol error?)");
        _isInterrupted = 1;
        checkInterrupt();
    }
}


/* stopWork() means that we're done; stop sending stderr to the
   client. */
static void stopWork()
{
    /* Stop handling async client death; we're going to a state where
       we're either sending or receiving from the client, so we'll be
       notified of client death anyway. */
    if (signal(SIGIO, SIG_IGN) == SIG_ERR)
        throw SysError("ignoring SIGIO");
    
    canSendStderr = false;
    writeInt(STDERR_LAST, *_to);
}


static void performOp(Source & from, Sink & to, unsigned int op)
{
    switch (op) {

#if 0        
    case wopQuit: {
        /* Close the database. */
        store.reset((StoreAPI *) 0);
        writeInt(1, to);
        break;
    }
#endif

    case wopIsValidPath: {
        Path path = readStorePath(from);
        writeInt(store->isValidPath(path), to);
        break;
    }

    case wopHasSubstitutes: {
        Path path = readStorePath(from);
        writeInt(store->hasSubstitutes(path), to);
        break;
    }

    case wopQueryPathHash: {
        Path path = readStorePath(from);
        writeString(printHash(store->queryPathHash(path)), to);
        break;
    }

    case wopQueryReferences:
    case wopQueryReferrers: {
        Path path = readStorePath(from);
        PathSet paths;
        if (op == wopQueryReferences)
            store->queryReferences(path, paths);
        else
            store->queryReferrers(path, paths);
        writeStringSet(paths, to);
        break;
    }

    case wopAddToStore: {
        /* !!! uberquick hack */
        string baseName = readString(from);
        bool fixed = readInt(from) == 1;
        bool recursive = readInt(from) == 1;
        string hashAlgo = readString(from);
        
        Path tmp = createTempDir();
        Path tmp2 = tmp + "/" + baseName;
        restorePath(tmp2, from);

        writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to);
            
        deletePath(tmp);
        break;
    }

    case wopAddTextToStore: {
        string suffix = readString(from);
        string s = readString(from);
        PathSet refs = readStorePaths(from);
        writeString(store->addTextToStore(suffix, s, refs), to);
        break;
    }

    case wopBuildDerivations: {
        PathSet drvs = readStorePaths(from);
        startWork();
        store->buildDerivations(drvs);
        stopWork();
        writeInt(1, to);
        break;
    }

    case wopEnsurePath: {
        Path path = readStorePath(from);
        store->ensurePath(path);
        writeInt(1, to);
        break;
    }

    case wopAddTempRoot: {
        Path path = readStorePath(from);
        store->addTempRoot(path);
        writeInt(1, to);
        break;
    }

    case wopSyncWithGC: {
        store->syncWithGC();
        writeInt(1, to);
        break;
    }

    default:
        throw Error(format("invalid operation %1%") % op);
    }
}


static void processConnection(Source & from, Sink & to)
{
    store = boost::shared_ptr<StoreAPI>(new LocalStore(true));

    unsigned int magic = readInt(from);
    if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");

    writeInt(WORKER_MAGIC_2, to);

    debug("greeting exchanged");
    
    _to = &to;
    canSendStderr = false;
    writeToStderr = tunnelStderr;

    bool quit = false;

    unsigned int opCount = 0;
    
    do {
        WorkerOp op = (WorkerOp) readInt(from);

        opCount++;

        try {
            performOp(from, to, op);
        } catch (Error & e) {
            writeInt(STDERR_ERROR, *_to);
            writeString(e.msg(), to);
        }

    } while (!quit);

    printMsg(lvlError, format("%1% worker operations") % opCount);
}


void run(Strings args)
{
    bool slave = false;
    bool daemon = false;
    
    for (Strings::iterator i = args.begin(); i != args.end(); ) {
        string arg = *i++;
        if (arg == "--slave") slave = true;
        if (arg == "--daemon") daemon = true;
    }

    /* Allow us to receive SIGIO for events on the client socket. */
    signal(SIGIO, SIG_IGN);
    if (fcntl(STDIN_FILENO, F_SETOWN, getpid()) == -1)
        throw SysError("F_SETOWN");
    if (fcntl(STDIN_FILENO, F_SETFL, fcntl(STDIN_FILENO, F_GETFL, 0) | FASYNC) == -1)
        throw SysError("F_SETFL");

    if (slave) {
        FdSource source(STDIN_FILENO);
        FdSink sink(STDOUT_FILENO);

        /* This prevents us from receiving signals from the terminal
           when we're running in setuid mode. */
        if (setsid() == -1)
            throw SysError(format("creating a new session"));

        processConnection(source, sink);
    }

    else if (daemon)
        throw Error("daemon mode not implemented");

    else
        throw Error("must be run in either --slave or --daemon mode");
}


#include "help.txt.hh"

void printHelp()
{
    std::cout << string((char *) helpText, sizeof helpText);
}


string programId = "nix-worker";