Ticket #546: read-timeout-1.diff

File read-timeout-1.diff, 9.7 KB (added by nkvoll, 13 years ago)

patch to add timeout support to xapian-replicate

  • xapian-core/api/replication.cc

    diff --git a/xapian-core/api/replication.cc b/xapian-core/api/replication.cc
    index f77e3b1..f92be86 100644
    a b class DatabaseReplica::Internal : public Xapian::Internal::RefCntBase {  
    212212
    213213    /// Read and apply the next changeset.
    214214    bool apply_next_changeset(ReplicationInfo * info,
    215                               double reader_close_time);
     215                              double reader_close_time,
     216                              double read_timeout);
    216217
    217218    /// Return a string describing this object.
    218219    string get_description() const { return path; }
    DatabaseReplica::set_read_fd(int fd)  
    270271
    271272bool
    272273DatabaseReplica::apply_next_changeset(ReplicationInfo * info,
    273                                       double reader_close_time)
     274                                      double reader_close_time,
     275                                      double read_timeout)
    274276{
    275     LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time);
     277    LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time | read_timeout);
    276278    if (info != NULL)
    277279        info->clear();
    278280    if (internal.get() == NULL)
    279281        throw Xapian::InvalidOperationError("Attempt to call DatabaseReplica::apply_next_changeset on a closed replica.");
    280     RETURN(internal->apply_next_changeset(info, reader_close_time));
     282    RETURN(internal->apply_next_changeset(info, reader_close_time, read_timeout));
    281283}
    282284
    283285void
    DatabaseReplica::Internal::set_read_fd(int fd)  
    506508
    507509bool
    508510DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info,
    509                                                 double reader_close_time)
     511                                                double reader_close_time,
     512                                                double read_timeout)
    510513{
    511     LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time);
     514    LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time | read_timeout);
    512515    if (live_db.internal.empty())
    513516        live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN);
    514517    if (live_db.internal.size() != 1)
    515518        throw Xapian::InvalidOperationError("DatabaseReplica needs to be pointed at exactly one subdatabase");
    516519
    517520    while (true) {
    518         char type = conn->sniff_next_message_type(0.0);
     521        char type = conn->sniff_next_message_type(RealTime::end_time(read_timeout));
    519522        switch (type) {
    520523            case REPL_REPLY_END_OF_CHANGES: {
    521524                string buf;
    522                 (void)conn->get_message(buf, 0.0);
     525                (void)conn->get_message(buf, RealTime::end_time(read_timeout));
    523526                RETURN(false);
    524527            }
    525528            case REPL_REPLY_DB_HEADER:
    526529                // Apply the copy - remove offline db in case of any error.
    527530                try {
    528                     apply_db_copy(0.0);
     531                    apply_db_copy(RealTime::end_time(read_timeout));
    529532                    if (info != NULL)
    530533                        ++(info->fullcopy_count);
    531534                    string replica_uuid;
    DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info,  
    580583                        // Ignore the returned revision number, since we are
    581584                        // live so the changeset must be safe to apply to a
    582585                        // live DB.
    583                         replicator->apply_changeset_from_conn(*conn, 0.0, true);
     586                        replicator->apply_changeset_from_conn(*conn, RealTime::end_time(read_timeout), true);
    584587                    }
    585588                    last_live_changeset_time = RealTime::now();
    586589
    DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info,  
    598601                            DatabaseReplicator::open(get_replica_path(live_id ^ 1)));
    599602
    600603                    offline_revision = replicator->
    601                             apply_changeset_from_conn(*conn, 0.0, false);
     604                            apply_changeset_from_conn(*conn, RealTime::end_time(read_timeout), false);
    602605
    603606                    if (info != NULL) {
    604607                        ++(info->changeset_count);
    DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info,  
    611614                RETURN(true);
    612615            case REPL_REPLY_FAIL: {
    613616                string buf;
    614                 (void)conn->get_message(buf, 0.0);
     617                (void)conn->get_message(buf, RealTime::end_time(read_timeout));
    615618                throw NetworkError("Unable to fully synchronise: " + buf);
    616619            }
    617620            default:
  • xapian-core/bin/xapian-replicate.cc

    diff --git a/xapian-core/bin/xapian-replicate.cc b/xapian-core/bin/xapian-replicate.cc
    index ad8307f..bfe8d85 100644
    a b using namespace std;  
    4545// Number of seconds before we assume that a reader will be closed.
    4646#define READER_CLOSE_TIME 30
    4747
     48// Number of seconds before a read on a socket should time out.
     49#define READ_TIMEOUT 0
     50
    4851static void show_usage() {
    4952    cout << "Usage: "PROG_NAME" [OPTIONS] DATABASE\n\n"
    5053"Options:\n"
    51 "  -h, --host=HOST     host to connect to (required)\n"
    52 "  -p, --port=PORT     port to connect to (required)\n"
    53 "  -m, --master=DB     replicate database DB from the master (default: DATABASE)\n"
    54 "  -i, --interval=N    wait N seconds between each connection to the master\n"
    55 "                      (default: "STRINGIZE(DEFAULT_INTERVAL)")\n"
    56 "  -r, --reader-time=N wait N seconds to allow readers time to close before\n"
    57 "                      applying repeated changesets (default: "STRINGIZE(READER_CLOSE_TIME)")\n"
    58 "  -o, --one-shot      replicate only once and then exit\n"
    59 "  -v, --verbose       be more verbose\n"
    60 "  --help              display this help and exit\n"
    61 "  --version           output version information and exit" << endl;
     54"  -h, --host=HOST      host to connect to (required)\n"
     55"  -p, --port=PORT      port to connect to (required)\n"
     56"  -m, --master=DB      replicate database DB from the master (default: DATABASE)\n"
     57"  -i, --interval=N     wait N seconds between each connection to the master\n"
     58"                       (default: "STRINGIZE(DEFAULT_INTERVAL)")\n"
     59"  -r, --reader-time=N  wait N seconds to allow readers time to close before\n"
     60"                       applying repeated changesets (default: "STRINGIZE(READER_CLOSE_TIME)")\n"
     61"  -t, --read-timeout=N number of seconds before a read on a socket should time out.\n"
     62"                       (default: "STRINGIZE(READ_TIMEOUT)", which means no timeout)\n"
     63"  -o, --one-shot       replicate only once and then exit\n"
     64"  -v, --verbose        be more verbose\n"
     65"  --help               display this help and exit\n"
     66"  --version            output version information and exit" << endl;
    6267}
    6368
    6469int
    6570main(int argc, char **argv)
    6671{
    67     const char * opts = "h:p:m:i:r:ov";
     72    const char * opts = "h:p:m:i:r:t:ov";
    6873    const struct option long_opts[] = {
    6974        {"host",        required_argument,      0, 'h'},
    7075        {"port",        required_argument,      0, 'p'},
    7176        {"master",      required_argument,      0, 'm'},
    7277        {"interval",    required_argument,      0, 'i'},
    7378        {"reader-time", required_argument,      0, 'r'},
     79        {"read-timeout",required_argument,      0, 't'},
    7480        {"one-shot",    no_argument,            0, 'o'},
    7581        {"verbose",     no_argument,            0, 'v'},
    7682        {"help",        no_argument, 0, OPT_HELP},
    main(int argc, char **argv)  
    8692    bool verbose = false;
    8793    int reader_close_time = READER_CLOSE_TIME;
    8894
     95    double read_timeout = READ_TIMEOUT;
     96
    8997    int c;
    9098    while ((c = gnu_getopt_long(argc, argv, opts, long_opts, 0)) != -1) {
    9199        switch (c) {
    main(int argc, char **argv)  
    104112            case 'r':
    105113                reader_close_time = atoi(optarg);
    106114                break;
     115            case 't':
     116                read_timeout = atof(optarg);
     117                break;
    107118            case 'o':
    108119                one_shot = true;
    109120                break;
    main(int argc, char **argv)  
    158169            }
    159170            Xapian::ReplicationInfo info;
    160171            client.update_from_master(dbpath, masterdb, info,
    161                                       reader_close_time);
     172                                      reader_close_time, read_timeout);
    162173            if (verbose) {
    163174                cout << "Update complete: " <<
    164175                        info.fullcopy_count << " copies, " <<
  • xapian-core/common/replicatetcpclient.h

    diff --git a/xapian-core/common/replicatetcpclient.h b/xapian-core/common/replicatetcpclient.h
    index 56f900d..d225764 100644
    a b class XAPIAN_VISIBILITY_DEFAULT ReplicateTcpClient SOCKET_INITIALIZER_MIXIN {  
    7373    void update_from_master(const std::string & path,
    7474                            const std::string & remotedb,
    7575                            Xapian::ReplicationInfo & info,
    76                             double reader_close_time);
     76                            double reader_close_time,
     77                            double read_timeout);
    7778
    7879    /** Destructor. */
    7980    ~ReplicateTcpClient();
  • xapian-core/common/replication.h

    diff --git a/xapian-core/common/replication.h b/xapian-core/common/replication.h
    index 8b57202..df8615c 100644
    a b class XAPIAN_VISIBILITY_DEFAULT DatabaseReplica {  
    195195     *  descriptor, false otherwise.
    196196     */
    197197    bool apply_next_changeset(ReplicationInfo * info,
    198                               double reader_close_time);
     198                              double reader_close_time,
     199                              double read_timeout);
    199200
    200201    /** Close the DatabaseReplica.
    201202     *
  • xapian-core/net/replicatetcpclient.cc

    diff --git a/xapian-core/net/replicatetcpclient.cc b/xapian-core/net/replicatetcpclient.cc
    index 4fb17ee..bf17632 100644
    a b  
    2626
    2727#include "tcpclient.h"
    2828#include "utils.h"
     29#include "realtime.h"
    2930
    3031using namespace std;
    3132
    void  
    4748ReplicateTcpClient::update_from_master(const std::string & path,
    4849                                       const std::string & masterdb,
    4950                                       Xapian::ReplicationInfo & info,
    50                                        double reader_close_time)
     51                                       double reader_close_time,
     52                                       double read_timeout)
    5153{
    5254    Xapian::DatabaseReplica replica(path);
    53     remconn.send_message('R', replica.get_revision_info(), 0.0);
    54     remconn.send_message('D', masterdb, 0.0);
     55    remconn.send_message('R', replica.get_revision_info(), RealTime::end_time(read_timeout));
     56    remconn.send_message('D', masterdb, RealTime::end_time(read_timeout));
    5557    replica.set_read_fd(socket);
    5658    info.clear();
    5759    bool more;
    5860    do {
    5961        Xapian::ReplicationInfo subinfo;
    60         more = replica.apply_next_changeset(&subinfo, reader_close_time);
     62        more = replica.apply_next_changeset(&subinfo, reader_close_time, read_timeout);
    6163        info.changeset_count += subinfo.changeset_count;
    6264        info.fullcopy_count += subinfo.fullcopy_count;
    6365        if (subinfo.changed)