Ticket #141: multi_writer_lock.2.patch

File multi_writer_lock.2.patch, 15.8 KB (added by Mark Hammond, 17 years ago)

updated patch

  • bin/xapian-tcpsrv.cc

     
    150150    }
    151151
    152152    try {
     153        vector<string> dbnames;
    153154        if (writable) {
     155            // Try to open the database just to give the user a
     156            // reasonable error now instead of waiting for the first
     157            // connection.
    154158            Xapian::WritableDatabase db(argv[optind], Xapian::DB_CREATE_OR_OPEN);
    155 
    156             if (verbose) {
    157                 if (host.empty())
    158                     cout << "Starting writable server on port ";
    159                 else
    160                     cout << "Starting writable server on host " << host << ", port ";
    161                 cout << port << endl;
    162             }
    163 
    164             TcpServer server(db, host, port, msecs_active_timeout,
    165                              msecs_idle_timeout, verbose);
    166 
    167             if (verbose)
    168                 cout << "Listening..." << endl;
    169 
    170             register_user_weighting_schemes(server);
    171 
    172             if (one_shot) {
    173                 server.run_once();
    174             } else {
    175                 server.run();
    176             }
     159            dbnames.push_back(argv[optind]);
    177160        } else {
    178             Xapian::Database db;
    179161            while (argv[optind]) {
    180                 db.add_database(Xapian::Database(argv[optind++]));
     162                dbnames.push_back(argv[optind]);
     163                Xapian::Database db(argv[optind++]);
    181164            }
     165        }
    182166
    183             if (verbose) {
    184                 if (host.empty())
    185                     cout << "Starting server on port ";
    186                 else
    187                     cout << "Starting server on host " << host << ", port ";
    188                 cout << port << endl;
    189             }
     167        if (verbose) {
     168            cout << "Starting";
     169            if (writable)
     170                cout << " writable";
     171            cout << " server on";
     172            if (!host.empty())
     173                cout << " host " << host << ",";
     174            cout << " port " << port << endl;
     175        }
    190176
    191             TcpServer server(db, host, port, msecs_active_timeout,
    192                              msecs_idle_timeout, verbose);
     177        TcpServer server(dbnames, host, port, msecs_active_timeout,
     178                         msecs_idle_timeout, writable, verbose);
    193179
    194             if (verbose)
    195                 cout << "Listening..." << endl;
     180        if (verbose)
     181            cout << "Listening..." << endl;
    196182
    197             register_user_weighting_schemes(server);
     183        register_user_weighting_schemes(server);
    198184
    199             if (one_shot) {
    200                 server.run_once();
    201             } else {
    202                 server.run();
    203             }
     185        if (one_shot) {
     186            server.run_once();
     187        } else {
     188            server.run();
    204189        }
    205190    } catch (const Xapian::Error &e) {
    206191        cerr << e.get_type() << ": " << e.get_msg();
  • bin/xapian-progsrv.cc

     
    5757
    5858int main(int argc, char **argv) {
    5959    /* variables needed in both try/catch blocks */
    60     Xapian::WritableDatabase wdb;
    61     Xapian::Database dbs;
    6260    unsigned int timeout = 60000;
    6361    bool writable = false;
    6462    bool syntax_error = false;
     
    9492        exit(1);
    9593    }
    9694
    97     /* Trap exceptions related to setting up the database */
    98     try {
    99         if (writable) {
    100             wdb = Xapian::WritableDatabase(argv[optind], Xapian::DB_CREATE_OR_OPEN);
    101         } else {
    102             while (argv[optind]) {
    103                 dbs.add_database(Xapian::Database(argv[optind++]));
    104             }
    105         }
    106     } catch (const Xapian::Error &e) {
    107         // FIXME: we shouldn't build messages by hand here.
    108         string msg = serialise_error(e);
    109         cout << char(REPLY_EXCEPTION) << encode_length(msg.size()) << msg << flush;
    110     } catch (...) {
    111         // FIXME: we shouldn't build messages by hand here.
    112         cout << char(REPLY_EXCEPTION) << encode_length(0) << flush;
    113     }
     95    vector<string> dbnames;
     96    /* Unlike the tcp server, a prog server only has a single 'connection'
     97     * which is established immediately.  Thus, there is no need to
     98     * attempt opening the database for convenience - if it can't be opened
     99     * they will immediately see the problem anyway - so just get to it!
     100     */
    114101
    115102    /* Catch exceptions from running the server, but don't pass them
    116103     * on to the remote end, as the RemoteServer will do that itself.
    117104     */
    118105    try {
    119         if (writable) {
    120             RemoteServer server(&wdb, 0, 1, timeout, timeout);
    121             // If you have defined your own weighting scheme, register it here
    122             // like so:
    123             // server.register_weighting_scheme(FooWeight());
     106        RemoteServer server(dbnames, 0, 1, timeout, timeout, writable);
     107        // If you have defined your own weighting scheme, register it here
     108        // like so:
     109        // server.register_weighting_scheme(FooWeight());
    124110
    125             server.run();
    126         } else {
    127             RemoteServer server(&dbs, 0, 1, timeout, timeout);
    128             // If you have defined your own weighting scheme, register it here
    129             // like so:
    130             // server.register_weighting_scheme(FooWeight());
    131 
    132             server.run();
    133         }
     111        server.run();
    134112    } catch (...) {
    135113    }
    136114}
  • net/tcpserver.cc

     
    5454#endif
    5555
    5656/// The TcpServer constructor, taking a database and a listening port.
    57 TcpServer::TcpServer(Xapian::Database db_, const std::string & host, int port,
     57TcpServer::TcpServer(const vector<std::string> &dbpaths_, const std::string & host, int port,
    5858                     int msecs_active_timeout_,
    5959                     int msecs_idle_timeout_,
     60                     bool writable_,
    6061                     bool verbose_)
    61         : writable(false), db(db_), listen_socket(get_listening_socket(host, port)),
     62        : writable(writable_), listen_socket(get_listening_socket(host, port)),
    6263          msecs_active_timeout(msecs_active_timeout_),
    6364          msecs_idle_timeout(msecs_idle_timeout_),
     65          dbpaths(dbpaths_),
    6466          verbose(verbose_)
    6567{
    66 
    6768}
    6869
    69 TcpServer::TcpServer(Xapian::WritableDatabase wdb_, const std::string & host, int port,
    70                      int msecs_active_timeout_,
    71                      int msecs_idle_timeout_,
    72                      bool verbose_)
    73         : writable(true), wdb(wdb_), listen_socket(get_listening_socket(host, port)),
    74           msecs_active_timeout(msecs_active_timeout_),
    75           msecs_idle_timeout(msecs_idle_timeout_),
    76           verbose(verbose_)
    77 {
    78 
    79 }
    80 
    8170int
    8271TcpServer::get_listening_socket(const std::string & host, int port)
    8372{
     
    224213        // child code
    225214        close(listen_socket);
    226215        try {
    227             if (writable) {
    228                 RemoteServer sserv(&wdb, connected_socket, connected_socket,
    229                                    msecs_active_timeout,
    230                                    msecs_idle_timeout);
    231                 sserv.run();
    232             } else {
    233                 RemoteServer sserv(&db, connected_socket, connected_socket,
    234                                    msecs_active_timeout,
    235                                    msecs_idle_timeout);
    236                 sserv.run();
     216            RemoteServer sserv(dbpaths, connected_socket, connected_socket,
     217                               msecs_active_timeout,
     218                               msecs_idle_timeout,
     219                               writable);
    237220            }
    238221        } catch (const Xapian::Error &err) {
    239222            cerr << "Got exception " << err.get_type()
     
    291274    while (true) {
    292275        try {
    293276            run_once();
    294         } catch (const Xapian::DatabaseModifiedError &) {
    295             cerr << "Database modified - calling db.reopen()" << endl;
    296             db.reopen();
    297277        } catch (const Xapian::Error &err) {
    298278            // FIXME: better error handling.
    299279            cerr << "Caught " << err.get_type()
     
    367347TcpServer::handle_one_request(int connected_socket)
    368348{
    369349    try {
    370         if (writable) {
    371             RemoteServer sserv(&wdb, connected_socket, connected_socket,
    372                                msecs_active_timeout,
    373                                msecs_idle_timeout);
    374             sserv.run();
    375         } else {
    376             RemoteServer sserv(&db, connected_socket, connected_socket,
    377                                msecs_active_timeout,
    378                                msecs_idle_timeout);
    379             sserv.run();
    380         }
     350        RemoteServer sserv(dbpaths, connected_socket, connected_socket,
     351                           msecs_active_timeout,
     352                           msecs_idle_timeout,
     353                           writable);
     354        sserv.run();
    381355    } catch (const Xapian::Error &err) {
    382356        cerr << "Got exception " << err.get_type()
    383357             << ": " << err.get_msg() << endl;
     
    441415            // really matter...
    442416            CloseHandle(hthread);
    443417       
    444         } catch (const Xapian::DatabaseModifiedError &) {
    445             cerr << "Database modified - calling db.reopen()" << endl;
    446             db.reopen();
    447418        } catch (const Xapian::Error &err) {
    448419            // FIXME: better error handling.
    449420            cerr << "Caught " << err.get_type()
  • net/remoteserver.cc

     
    4141/// Class to throw when we receive the connection closing message.
    4242struct ConnectionClosed { };
    4343
    44 RemoteServer::RemoteServer(Xapian::Database * db_,
     44RemoteServer::RemoteServer(const std::vector<std::string> &dbpaths,
    4545                           int fdin_, int fdout_,
    4646                           Xapian::timeout active_timeout_,
    47                            Xapian::timeout idle_timeout_)
    48     : RemoteConnection(fdin_, fdout_, db_->get_description()),
    49       db(db_), wdb(NULL),
     47                           Xapian::timeout idle_timeout_,
     48                           bool writable)
     49    : RemoteConnection(fdin_, fdout_, ""),
     50      db(NULL), wdb(NULL),
    5051      active_timeout(active_timeout_), idle_timeout(idle_timeout_)
    5152{
    52     initialise();
    53 }
     53    // Catch errors opening the database and serialize back to other end
     54    try {
     55        if (writable) {
     56            Assert(dbpaths.size()==1); // expecting exactly 1 DB
     57            wdb = new Xapian::WritableDatabase(dbpaths[0], Xapian::DB_CREATE_OR_OPEN);
     58            db = wdb;
     59        } else {
     60            db = new Xapian::Database();
     61            vector<std::string>::const_iterator i;
     62            for (i = dbpaths.begin(); i != dbpaths.end(); ++i)
     63                db->add_database(Xapian::Database(*i));
     64        }
     65        // build a better description than db.get_description() gives.
     66        context = dbpaths[0];
     67        vector<std::string>::const_iterator i(dbpaths.begin());
     68        for (++i; i != dbpaths.end(); ++i) {
     69            context += ' ';
     70            context += *i;
     71        }
    5472
    55 RemoteServer::RemoteServer(Xapian::WritableDatabase * wdb_,
    56                            int fdin_, int fdout_,
    57                            Xapian::timeout active_timeout_,
    58                            Xapian::timeout idle_timeout_)
    59     : RemoteConnection(fdin_, fdout_, wdb_->get_description()),
    60       db(wdb_), wdb(wdb_),
    61       active_timeout(active_timeout_), idle_timeout(idle_timeout_)
    62 {
    63     initialise();
    64 }
    65 
    66 void
    67 RemoteServer::initialise()
    68 {
     73    } catch (const Xapian::Error &err) {
     74        // Propagate the exception across the connection and get out-of-here!
     75        send_message(REPLY_EXCEPTION, serialise_error(err));
     76        throw;
     77    }
     78    /* Initialize things */
    6979#ifndef __WIN32__
    7080    // It's simplest to just ignore SIGPIPE.  We'll still know if the
    7181    // connection dies because we'll get EPIPE back from write().
     
    94104
    95105RemoteServer::~RemoteServer()
    96106{
     107    delete db; // wdb is either null or db - so it's ok
    97108    map<string, Xapian::Weight*>::const_iterator i;
    98109    for (i = wtschemes.begin(); i != wtschemes.end(); ++i) {
    99110        delete i->second;
  • common/remoteconnection.h

     
    105105    /// The file descriptor used for writing.
    106106    int fdout;
    107107
    108     /// The context to report with errors
     108protected:
     109    /// The context to report with errors (which subclasses are allowed
     110    /// to manage)
    109111    std::string context;
     112private:
    110113
    111114    /// Buffer to hold unprocessed input.
    112115    std::string buffer;
  • common/tcpserver.h

     
    4242    /// Don't allow copying.
    4343    TcpServer(const TcpServer &);
    4444
     45    // path to the DBs we will open - this should contain exactly 1
     46    // entry if writable, and at least one if not.
     47    const std::vector<std::string> dbpaths;
     48
    4549    /** Is this a WritableDatabase?
    4650     *
    47      *  If true, the wdb member is used.  If false, the db member is.
    4851     */
    4952    bool writable;
    5053
    51     /** If writable is false, this is the database we're using. */
    52     Xapian::Database db;
    53    
    54     /** If writable is true, this is the database we're using. */
    55     Xapian::WritableDatabase wdb;
    56 
    5754    /** The socket we're listening on. */
    5855    int listen_socket;
    5956
     
    8178    /** Construct a TcpServer for a Database and start listening for
    8279     *  connections.
    8380     *
    84      *  @param db_      The Database to provide remote access to.
     81     *  @param dbpaths_ The locations of the databases we should open.
    8582     *  @param host     The hostname or address for the interface to listen on
    8683     *                  (or "" to listen on all interfaces).
    8784     *  @port           The TCP port number to listen on.
     
    9087     *                                  (default 10000).
    9188     *  @param msecs_idle_timeout       Timeout between operations (in
    9289     *                                  milliseconds) (default 60000).
     90     *  @param writable_                Should we open the DB for writing? (default
     91     *                          false).
    9392     *  @param verbose_         Should we produce output when connections are
    9493     *                          made or lost? (default true).
    9594     */
    96     TcpServer(Xapian::Database db_, const std::string &host, int port,
     95    TcpServer(const std::vector<std::string> &dbpaths_, const std::string &host, int port,
    9796              int msecs_normal_timeout_ = 10000,
    9897              int msecs_idle_timeout_ = 60000,
     98              bool writable_ = false,
    9999              bool verbose_ = true);
    100100
    101     /** Construct a TcpServer for a WritableDatabase and start listening for
    102      *  connections.
    103      *
    104      *  @param db_      The WritableDatabase to provide remote access to.
    105      *  @param host     The hostname or address for the interface to listen on
    106      *                  (or "" to listen on all interfaces).
    107      *  @port           The TCP port number to listen on.
    108      *  @param msecs_active_timeout     Timeout between messages during a
    109      *                                  single operation (in milliseconds)
    110      *                                  (default 10000).
    111      *  @param msecs_idle_timeout       Timeout between operations (in
    112      *                                  milliseconds) (default 60000).
    113      *  @param verbose_         Should we produce output when connections are
    114      *                          made or lost? (default true).
    115      */
    116     TcpServer(Xapian::WritableDatabase db_, const std::string &host, int port,
    117               int msecs_normal_timeout_ = 10000,
    118               int msecs_idle_timeout_ = 60000,
    119               bool verbose_ = true);
    120 
    121101    /** Destructor. */
    122102    ~TcpServer();
    123103
  • common/remoteserver.h

     
    6868    /// Registered weighting schemes.
    6969    map<string, Xapian::Weight *> wtschemes;
    7070
    71     /// Initialisation code needed by both ctors.
    72     void initialise();
    73 
    7471    /// Accept a message from the client.
    7572    message_type get_message(Xapian::timeout timeout, string & result,
    7673                             message_type required_type = MSG_MAX);
     
    139136    void msg_replacedocumentterm(const std::string & message);
    140137
    141138  public:
    142     /** Construct a read-only RemoteServer.
     139    /** Construct a RemoteServer.
    143140     *
    144      *  @param db       The Xapian::Database to use.
     141     *  @param dbpaths  The paths to the xapian databases to use.
    145142     *  @param fdin     The file descriptor to read from.
    146143     *  @param fdout    The file descriptor to write to (fdin and fdout may be
    147144     *                  the same).
     
    149146     *                  (specified in milliseconds).
    150147     *  @param idle_timeout_    Timeout while waiting for a new action from
    151148     *                  the client (specified in milliseconds).
     149     *  @param writable Should the DB be opened for writing?
    152150     */
    153     RemoteServer(Xapian::Database * db, int fdin, int fdout,
     151    RemoteServer(const std::vector<std::string> &dbpaths,
     152                 int fdin, int fdout,
    154153                 Xapian::timeout active_timeout_,
    155                  Xapian::timeout idle_timeout_);
     154                 Xapian::timeout idle_timeout_,
     155                 bool writable = false);
    156156
    157     /** Construct a writable RemoteServer.
    158      *
    159      *  @param wdb      The Xapian::WritableDatabase to use.
    160      *  @param fdin     The file descriptor to read from.
    161      *  @param fdout    The file descriptor to write to (fdin and fdout may be
    162      *                  the same).
    163      *  @param active_timeout_  Timeout for actions during a conversation
    164      *                  (specified in milliseconds).
    165      *  @param idle_timeout_    Timeout while waiting for a new action from
    166      *                  the client (specified in milliseconds).
    167      */
    168     RemoteServer(Xapian::WritableDatabase * wdb, int fdin, int fdout,
    169                  Xapian::timeout active_timeout_,
    170                  Xapian::timeout idle_timeout_);
    171 
    172157    /// Destructor.
    173158    ~RemoteServer();
    174159