Ticket #546: read-timeout-1.diff
File read-timeout-1.diff, 9.7 KB (added by , 13 years ago) |
---|
-
xapian-core/api/replication.cc
diff --git a/xapian-core/api/replication.cc b/xapian-core/api/replication.cc index f77e3b1..f92be86 100644
a b class DatabaseReplica::Internal : public Xapian::Internal::RefCntBase { 212 212 213 213 /// Read and apply the next changeset. 214 214 bool apply_next_changeset(ReplicationInfo * info, 215 double reader_close_time); 215 double reader_close_time, 216 double read_timeout); 216 217 217 218 /// Return a string describing this object. 218 219 string get_description() const { return path; } … … DatabaseReplica::set_read_fd(int fd) 270 271 271 272 bool 272 273 DatabaseReplica::apply_next_changeset(ReplicationInfo * info, 273 double reader_close_time) 274 double reader_close_time, 275 double read_timeout) 274 276 { 275 LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time );277 LOGCALL(REPLICA, bool, "DatabaseReplica::apply_next_changeset", info | reader_close_time | read_timeout); 276 278 if (info != NULL) 277 279 info->clear(); 278 280 if (internal.get() == NULL) 279 281 throw Xapian::InvalidOperationError("Attempt to call DatabaseReplica::apply_next_changeset on a closed replica."); 280 RETURN(internal->apply_next_changeset(info, reader_close_time ));282 RETURN(internal->apply_next_changeset(info, reader_close_time, read_timeout)); 281 283 } 282 284 283 285 void … … DatabaseReplica::Internal::set_read_fd(int fd) 506 508 507 509 bool 508 510 DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info, 509 double reader_close_time) 511 double reader_close_time, 512 double read_timeout) 510 513 { 511 LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time );514 LOGCALL(REPLICA, bool, "DatabaseReplica::Internal::apply_next_changeset", info | reader_close_time | read_timeout); 512 515 if (live_db.internal.empty()) 513 516 live_db = WritableDatabase(get_replica_path(live_id), Xapian::DB_OPEN); 514 517 if (live_db.internal.size() != 1) 515 518 throw Xapian::InvalidOperationError("DatabaseReplica needs to be pointed at exactly one subdatabase"); 516 519 517 520 while (true) { 518 char type = conn->sniff_next_message_type( 0.0);521 char type = conn->sniff_next_message_type(RealTime::end_time(read_timeout)); 519 522 switch (type) { 520 523 case REPL_REPLY_END_OF_CHANGES: { 521 524 string buf; 522 (void)conn->get_message(buf, 0.0);525 (void)conn->get_message(buf, RealTime::end_time(read_timeout)); 523 526 RETURN(false); 524 527 } 525 528 case REPL_REPLY_DB_HEADER: 526 529 // Apply the copy - remove offline db in case of any error. 527 530 try { 528 apply_db_copy( 0.0);531 apply_db_copy(RealTime::end_time(read_timeout)); 529 532 if (info != NULL) 530 533 ++(info->fullcopy_count); 531 534 string replica_uuid; … … DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info, 580 583 // Ignore the returned revision number, since we are 581 584 // live so the changeset must be safe to apply to a 582 585 // live DB. 583 replicator->apply_changeset_from_conn(*conn, 0.0, true);586 replicator->apply_changeset_from_conn(*conn, RealTime::end_time(read_timeout), true); 584 587 } 585 588 last_live_changeset_time = RealTime::now(); 586 589 … … DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info, 598 601 DatabaseReplicator::open(get_replica_path(live_id ^ 1))); 599 602 600 603 offline_revision = replicator-> 601 apply_changeset_from_conn(*conn, 0.0, false);604 apply_changeset_from_conn(*conn, RealTime::end_time(read_timeout), false); 602 605 603 606 if (info != NULL) { 604 607 ++(info->changeset_count); … … DatabaseReplica::Internal::apply_next_changeset(ReplicationInfo * info, 611 614 RETURN(true); 612 615 case REPL_REPLY_FAIL: { 613 616 string buf; 614 (void)conn->get_message(buf, 0.0);617 (void)conn->get_message(buf, RealTime::end_time(read_timeout)); 615 618 throw NetworkError("Unable to fully synchronise: " + buf); 616 619 } 617 620 default: -
xapian-core/bin/xapian-replicate.cc
diff --git a/xapian-core/bin/xapian-replicate.cc b/xapian-core/bin/xapian-replicate.cc index ad8307f..bfe8d85 100644
a b using namespace std; 45 45 // Number of seconds before we assume that a reader will be closed. 46 46 #define READER_CLOSE_TIME 30 47 47 48 // Number of seconds before a read on a socket should time out. 49 #define READ_TIMEOUT 0 50 48 51 static void show_usage() { 49 52 cout << "Usage: "PROG_NAME" [OPTIONS] DATABASE\n\n" 50 53 "Options:\n" 51 " -h, --host=HOST host to connect to (required)\n" 52 " -p, --port=PORT port to connect to (required)\n" 53 " -m, --master=DB replicate database DB from the master (default: DATABASE)\n" 54 " -i, --interval=N wait N seconds between each connection to the master\n" 55 " (default: "STRINGIZE(DEFAULT_INTERVAL)")\n" 56 " -r, --reader-time=N wait N seconds to allow readers time to close before\n" 57 " applying repeated changesets (default: "STRINGIZE(READER_CLOSE_TIME)")\n" 58 " -o, --one-shot replicate only once and then exit\n" 59 " -v, --verbose be more verbose\n" 60 " --help display this help and exit\n" 61 " --version output version information and exit" << endl; 54 " -h, --host=HOST host to connect to (required)\n" 55 " -p, --port=PORT port to connect to (required)\n" 56 " -m, --master=DB replicate database DB from the master (default: DATABASE)\n" 57 " -i, --interval=N wait N seconds between each connection to the master\n" 58 " (default: "STRINGIZE(DEFAULT_INTERVAL)")\n" 59 " -r, --reader-time=N wait N seconds to allow readers time to close before\n" 60 " applying repeated changesets (default: "STRINGIZE(READER_CLOSE_TIME)")\n" 61 " -t, --read-timeout=N number of seconds before a read on a socket should time out.\n" 62 " (default: "STRINGIZE(READ_TIMEOUT)", which means no timeout)\n" 63 " -o, --one-shot replicate only once and then exit\n" 64 " -v, --verbose be more verbose\n" 65 " --help display this help and exit\n" 66 " --version output version information and exit" << endl; 62 67 } 63 68 64 69 int 65 70 main(int argc, char **argv) 66 71 { 67 const char * opts = "h:p:m:i:r: ov";72 const char * opts = "h:p:m:i:r:t:ov"; 68 73 const struct option long_opts[] = { 69 74 {"host", required_argument, 0, 'h'}, 70 75 {"port", required_argument, 0, 'p'}, 71 76 {"master", required_argument, 0, 'm'}, 72 77 {"interval", required_argument, 0, 'i'}, 73 78 {"reader-time", required_argument, 0, 'r'}, 79 {"read-timeout",required_argument, 0, 't'}, 74 80 {"one-shot", no_argument, 0, 'o'}, 75 81 {"verbose", no_argument, 0, 'v'}, 76 82 {"help", no_argument, 0, OPT_HELP}, … … main(int argc, char **argv) 86 92 bool verbose = false; 87 93 int reader_close_time = READER_CLOSE_TIME; 88 94 95 double read_timeout = READ_TIMEOUT; 96 89 97 int c; 90 98 while ((c = gnu_getopt_long(argc, argv, opts, long_opts, 0)) != -1) { 91 99 switch (c) { … … main(int argc, char **argv) 104 112 case 'r': 105 113 reader_close_time = atoi(optarg); 106 114 break; 115 case 't': 116 read_timeout = atof(optarg); 117 break; 107 118 case 'o': 108 119 one_shot = true; 109 120 break; … … main(int argc, char **argv) 158 169 } 159 170 Xapian::ReplicationInfo info; 160 171 client.update_from_master(dbpath, masterdb, info, 161 reader_close_time );172 reader_close_time, read_timeout); 162 173 if (verbose) { 163 174 cout << "Update complete: " << 164 175 info.fullcopy_count << " copies, " << -
xapian-core/common/replicatetcpclient.h
diff --git a/xapian-core/common/replicatetcpclient.h b/xapian-core/common/replicatetcpclient.h index 56f900d..d225764 100644
a b class XAPIAN_VISIBILITY_DEFAULT ReplicateTcpClient SOCKET_INITIALIZER_MIXIN { 73 73 void update_from_master(const std::string & path, 74 74 const std::string & remotedb, 75 75 Xapian::ReplicationInfo & info, 76 double reader_close_time); 76 double reader_close_time, 77 double read_timeout); 77 78 78 79 /** Destructor. */ 79 80 ~ReplicateTcpClient(); -
xapian-core/common/replication.h
diff --git a/xapian-core/common/replication.h b/xapian-core/common/replication.h index 8b57202..df8615c 100644
a b class XAPIAN_VISIBILITY_DEFAULT DatabaseReplica { 195 195 * descriptor, false otherwise. 196 196 */ 197 197 bool apply_next_changeset(ReplicationInfo * info, 198 double reader_close_time); 198 double reader_close_time, 199 double read_timeout); 199 200 200 201 /** Close the DatabaseReplica. 201 202 * -
xapian-core/net/replicatetcpclient.cc
diff --git a/xapian-core/net/replicatetcpclient.cc b/xapian-core/net/replicatetcpclient.cc index 4fb17ee..bf17632 100644
a b 26 26 27 27 #include "tcpclient.h" 28 28 #include "utils.h" 29 #include "realtime.h" 29 30 30 31 using namespace std; 31 32 … … void 47 48 ReplicateTcpClient::update_from_master(const std::string & path, 48 49 const std::string & masterdb, 49 50 Xapian::ReplicationInfo & info, 50 double reader_close_time) 51 double reader_close_time, 52 double read_timeout) 51 53 { 52 54 Xapian::DatabaseReplica replica(path); 53 remconn.send_message('R', replica.get_revision_info(), 0.0);54 remconn.send_message('D', masterdb, 0.0);55 remconn.send_message('R', replica.get_revision_info(), RealTime::end_time(read_timeout)); 56 remconn.send_message('D', masterdb, RealTime::end_time(read_timeout)); 55 57 replica.set_read_fd(socket); 56 58 info.clear(); 57 59 bool more; 58 60 do { 59 61 Xapian::ReplicationInfo subinfo; 60 more = replica.apply_next_changeset(&subinfo, reader_close_time );62 more = replica.apply_next_changeset(&subinfo, reader_close_time, read_timeout); 61 63 info.changeset_count += subinfo.changeset_count; 62 64 info.fullcopy_count += subinfo.fullcopy_count; 63 65 if (subinfo.changed)