root / tags / 1.0.8 / xapian-core / net / remoteserver.cc

Revision 10757, 18.5 kB (checked in by olly, 6 months ago)

Backport change from trunk:
net/remoteserver.cc: Just delete the Database * pointer db. The
Database dtor is virtual, so it's fine to delete a WritableDatabase?
via a Database * pointer.

  • Property svn:eol-style set to native
Line 
1/** @file remoteserver.cc
2 *  @brief Xapian remote backend server base class
3 */
4/* Copyright (C) 2006,2007,2008 Olly Betts
5 * Copyright (C) 2006,2007 Lemur Consulting Ltd
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
20 */
21
22#include <config.h>
23#include "remoteserver.h"
24
25#include <xapian/database.h>
26#include <xapian/enquire.h>
27#include <xapian/error.h>
28#include <xapian/valueiterator.h>
29
30#include "safeerrno.h"
31#include <signal.h>
32#include <stdlib.h>
33
34#include "autoptr.h"
35#include "multimatch.h"
36#include "omassert.h"
37#include "omtime.h"
38#include "serialise.h"
39#include "serialise-double.h"
40#include "stats.h"
41#include "utils.h"
42
43/// Class to throw when we receive the connection closing message.
44struct ConnectionClosed { };
45
46RemoteServer::RemoteServer(const std::vector<std::string> &dbpaths,
47                           int fdin_, int fdout_,
48                           Xapian::timeout active_timeout_,
49                           Xapian::timeout idle_timeout_,
50                           bool writable)
51    : RemoteConnection(fdin_, fdout_, ""),
52      db(NULL), wdb(NULL),
53      active_timeout(active_timeout_), idle_timeout(idle_timeout_)
54{
55    // Catch errors opening the database and propagate them to the client.
56    try {
57        if (writable) {
58            AssertEq(dbpaths.size(), 1); // Expecting exactly one database.
59            wdb = new Xapian::WritableDatabase(dbpaths[0], Xapian::DB_CREATE_OR_OPEN);
60            db = wdb;
61        } else {
62            db = new Xapian::Database;
63            vector<std::string>::const_iterator i;
64            for (i = dbpaths.begin(); i != dbpaths.end(); ++i)
65                db->add_database(Xapian::Database(*i));
66        }
67
68        // Build a better description than Database::get_description() gives.
69        // FIXME: improve Database::get_description() and then just use that
70        // instead.
71        context = dbpaths[0];
72        vector<std::string>::const_iterator i(dbpaths.begin());
73        for (++i; i != dbpaths.end(); ++i) {
74            context += ' ';
75            context += *i;
76        }
77    } catch (const Xapian::Error &err) {
78        // Propagate the exception to the client.
79        send_message(REPLY_EXCEPTION, serialise_error(err));
80        // And rethrow it so our caller can log it and close the connection.
81        throw;
82    }
83
84#ifndef __WIN32__
85    // It's simplest to just ignore SIGPIPE.  We'll still know if the
86    // connection dies because we'll get EPIPE back from write().
87    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
88        throw Xapian::NetworkError("Couldn't set SIGPIPE to SIG_IGN", errno);
89#endif
90
91    // Send greeting message.
92    string message;
93    message += char(XAPIAN_REMOTE_PROTOCOL_MAJOR_VERSION);
94    message += char(XAPIAN_REMOTE_PROTOCOL_MINOR_VERSION);
95    message += encode_length(db->get_doccount());
96    message += encode_length(db->get_lastdocid());
97    message += (db->has_positions() ? '1' : '0');
98    message += serialise_double(db->get_avlength());
99    send_message(REPLY_GREETING, message);
100
101    // Register weighting schemes.
102    Xapian::Weight * weight;
103    weight = new Xapian::BM25Weight();
104    wtschemes[weight->name()] = weight;
105    weight = new Xapian::BoolWeight();
106    wtschemes[weight->name()] = weight;
107    weight = new Xapian::TradWeight();
108    wtschemes[weight->name()] = weight;
109}
110
111RemoteServer::~RemoteServer()
112{
113    delete db;
114    // wdb is either NULL or equal to db, so we shouldn't delete it too!
115
116    map<string, Xapian::Weight*>::const_iterator i;
117    for (i = wtschemes.begin(); i != wtschemes.end(); ++i) {
118        delete i->second;
119    }
120}
121
122message_type
123RemoteServer::get_message(Xapian::timeout timeout, string & result,
124                          message_type required_type)
125{
126    unsigned int type;
127    OmTime end_time;
128    if (timeout)
129        end_time = OmTime::now() + timeout;
130    type = RemoteConnection::get_message(result, end_time);
131
132    // Handle "shutdown connection" message here.
133    if (type == MSG_SHUTDOWN) throw ConnectionClosed();
134    if (type >= MSG_MAX) {
135        string errmsg("Invalid message type ");
136        errmsg += om_tostring(type);
137        throw Xapian::NetworkError(errmsg);
138    }
139    if (required_type != MSG_MAX && type != unsigned(required_type)) {
140        string errmsg("Expecting message type ");
141        errmsg += om_tostring(required_type);
142        errmsg += ", got ";
143        errmsg += om_tostring(type);
144        throw Xapian::NetworkError(errmsg);
145    }
146    return static_cast<message_type>(type);
147}
148
149void
150RemoteServer::send_message(reply_type type, const string &message)
151{
152    OmTime end_time;
153    if (active_timeout)
154        end_time = OmTime::now() + active_timeout;
155    unsigned char type_as_char = static_cast<unsigned char>(type);
156    RemoteConnection::send_message(type_as_char, message, end_time);
157}
158
159typedef void (RemoteServer::* dispatch_func)(const string &);
160
161void
162RemoteServer::run()
163{
164    while (true) {
165        try {
166            /* This list needs to be kept in the same order as the list of
167             * message types in "remoteprotocol.h". Note that messages at the
168             * end of the list in "remoteprotocol.h" can be omitted if they
169             * don't correspond to dispatch actions.
170             */
171            static const dispatch_func dispatch[] = {
172                &RemoteServer::msg_allterms,
173                &RemoteServer::msg_collfreq,
174                &RemoteServer::msg_document,
175                &RemoteServer::msg_termexists,
176                &RemoteServer::msg_termfreq,
177                &RemoteServer::msg_keepalive,
178                &RemoteServer::msg_doclength,
179                &RemoteServer::msg_query,
180                &RemoteServer::msg_termlist,
181                &RemoteServer::msg_positionlist,
182                &RemoteServer::msg_postlist,
183                &RemoteServer::msg_reopen,
184                &RemoteServer::msg_update,
185                &RemoteServer::msg_adddocument,
186                &RemoteServer::msg_cancel,
187                &RemoteServer::msg_deletedocument_pre_30_2,
188                &RemoteServer::msg_deletedocumentterm,
189                &RemoteServer::msg_flush,
190                &RemoteServer::msg_replacedocument,
191                &RemoteServer::msg_replacedocumentterm,
192                NULL, // MSG_GETMSET - used during a conversation.
193                NULL, // MSG_SHUTDOWN - handled by get_message().
194                &RemoteServer::msg_deletedocument
195            };
196
197            string message;
198            size_t type = get_message(idle_timeout, message);
199            if (type >= sizeof(dispatch)/sizeof(dispatch[0]) ||
200                dispatch[type] == NULL) {
201                string errmsg("Unexpected message type ");
202                errmsg += om_tostring(type);
203                throw Xapian::InvalidArgumentError(errmsg);
204            }
205            (this->*(dispatch[type]))(message);
206        } catch (const Xapian::NetworkTimeoutError & e) {
207            try {
208                // We've had a timeout, so the client may not be listening, so
209                // if we can't send the message right away, just exit and the
210                // client will cope.
211                RemoteConnection::send_message(REPLY_EXCEPTION, serialise_error(e), OmTime::now());
212            } catch (...) {
213            }
214            // And rethrow it so our caller can log it and close the
215            // connection.
216            throw;
217        } catch (const Xapian::NetworkError) {
218            // All other network errors mean we are fatally confused and are
219            // unlikely to be able to communicate further across this
220            // connection.  So we don't try to propagate the error to the
221            // client, but instead just rethrow the exception so our caller can
222            // log it and close the connection.
223            throw;
224        } catch (const Xapian::Error &e) {
225            // Propagate the exception to the client, then return to the main
226            // message handling loop.
227            send_message(REPLY_EXCEPTION, serialise_error(e));
228        } catch (ConnectionClosed &) {
229            return;
230        } catch (...) {
231            // Propagate an unknown exception to the client.
232            send_message(REPLY_EXCEPTION, "");
233            // And rethrow it so our caller can log it and close the
234            // connection.
235            throw;
236        }
237    }
238}
239
240void
241RemoteServer::msg_allterms(const string &message)
242{
243    const char *p = message.data();
244    const char *p_end = p + message.size();
245    string prefix(p, p_end - p);
246
247    const Xapian::TermIterator end = db->allterms_end(prefix);
248    for (Xapian::TermIterator t = db->allterms_begin(prefix); t != end; ++t) {
249        string item = encode_length(t.get_termfreq());
250        item += *t;
251        send_message(REPLY_ALLTERMS, item);
252    }
253
254    send_message(REPLY_DONE, "");
255}
256
257void
258RemoteServer::msg_termlist(const string &message)
259{
260    const char *p = message.data();
261    const char *p_end = p + message.size();
262    Xapian::docid did = decode_length(&p, p_end, false);
263
264    send_message(REPLY_DOCLENGTH, serialise_double(db->get_doclength(did)));
265    const Xapian::TermIterator end = db->termlist_end(did);
266    for (Xapian::TermIterator t = db->termlist_begin(did); t != end; ++t) {
267        string item = encode_length(t.get_wdf());
268        item += encode_length(t.get_termfreq());
269        item += *t;
270        send_message(REPLY_TERMLIST, item);
271    }
272
273    send_message(REPLY_DONE, "");
274}
275
276void
277RemoteServer::msg_positionlist(const string &message)
278{
279    const char *p = message.data();
280    const char *p_end = p + message.size();
281    Xapian::docid did = decode_length(&p, p_end, false);
282    string term(p, p_end - p);
283
284    Xapian::termpos lastpos = static_cast<Xapian::termpos>(-1);
285    const Xapian::PositionIterator end = db->positionlist_end(did, term);
286    for (Xapian::PositionIterator i = db->positionlist_begin(did, term);
287         i != end; ++i) {
288        Xapian::termpos pos = *i;
289        send_message(REPLY_POSITIONLIST, encode_length(pos - lastpos - 1));
290        lastpos = pos;
291    }
292
293    send_message(REPLY_DONE, "");
294}
295
296void
297RemoteServer::msg_postlist(const string &message)
298{
299    const char *p = message.data();
300    const char *p_end = p + message.size();
301    string term(p, p_end - p);
302
303    Xapian::doccount termfreq = db->get_termfreq(term);
304    Xapian::termcount collfreq = db->get_collection_freq(term);
305    send_message(REPLY_POSTLISTSTART, encode_length(termfreq) + encode_length(collfreq));
306
307    Xapian::docid lastdocid = 0;
308    const Xapian::PostingIterator end = db->postlist_end(term);
309    for (Xapian::PostingIterator i = db->postlist_begin(term);
310         i != end; ++i) {
311
312        Xapian::docid newdocid = *i;
313        string reply = encode_length(newdocid - lastdocid - 1);
314        reply += encode_length(i.get_wdf());
315        // FIXME: get_doclength should always return an integer value, but
316        // Xapian::doclength is a double.  We could improve the compression
317        // here by casting to an int and serialising that instead, but it's
318        // probably not worth doing since the plan is to stop storing the
319        // document length in the posting lists anyway, at which point the
320        // remote protocol should stop passing it since it will be more
321        // expensive to do so.
322        reply += serialise_double(i.get_doclength());
323
324        send_message(REPLY_POSTLISTITEM, reply);
325        lastdocid = newdocid;
326    }
327
328    send_message(REPLY_DONE, "");
329}
330
331void
332RemoteServer::msg_reopen(const string & msg)
333{
334    db->reopen();
335    msg_update(msg);
336}
337
338void
339RemoteServer::msg_update(const string &)
340{
341    // reopen() doesn't do anything for a WritableDatabase, so there's
342    // no harm in calling it unconditionally.
343    db->reopen();
344
345    string message = encode_length(db->get_doccount());
346    message += encode_length(db->get_lastdocid());
347    message += (db->has_positions() ? '1' : '0');
348    message += serialise_double(db->get_avlength());
349    send_message(REPLY_UPDATE, message);
350}
351
352void
353RemoteServer::msg_query(const string &message_in)
354{
355    const char *p = message_in.c_str();
356    const char *p_end = p + message_in.size();
357    size_t len;
358
359    // Unserialise the Query.
360    len = decode_length(&p, p_end, true);
361    AutoPtr<Xapian::Query::Internal> query(Xapian::Query::Internal::unserialise(string(p, len)));
362    p += len;
363
364    // Unserialise assorted Enquire settings.
365    Xapian::termcount qlen = decode_length(&p, p_end, false);
366
367    Xapian::valueno collapse_key = decode_length(&p, p_end, false);
368
369    if (p_end - p < 4 || *p < '0' || *p > '2') {
370        throw Xapian::NetworkError("bad message (docid_order)");
371    }
372    Xapian::Enquire::docid_order order;
373    order = static_cast<Xapian::Enquire::docid_order>(*p++ - '0');
374
375    Xapian::valueno sort_key = decode_length(&p, p_end, false);
376
377    if (*p < '0' || *p > '3') {
378        throw Xapian::NetworkError("bad message (sort_by)");
379    }
380    Xapian::Enquire::Internal::sort_setting sort_by;
381    sort_by = static_cast<Xapian::Enquire::Internal::sort_setting>(*p++ - '0');
382
383    if (*p < '0' || *p > '1') {
384        throw Xapian::NetworkError("bad message (sort_value_forward)");
385    }
386    bool sort_value_forward(*p++ != '0');
387
388    int percent_cutoff = *p++;
389    if (percent_cutoff < 0 || percent_cutoff > 100) {
390        throw Xapian::NetworkError("bad message (percent_cutoff)");
391    }
392
393    Xapian::weight weight_cutoff = unserialise_double(&p, p_end);
394    if (weight_cutoff < 0) {
395        throw Xapian::NetworkError("bad message (weight_cutoff)");
396    }
397
398    // Unserialise the Weight object.
399    len = decode_length(&p, p_end, true);
400    map<string, Xapian::Weight *>::const_iterator i;
401    i = wtschemes.find(string(p, len));
402    if (i == wtschemes.end()) {
403        throw Xapian::InvalidArgumentError("Weighting scheme " + string(p, len) + " not registered");
404    }
405    p += len;
406
407    len = decode_length(&p, p_end, true);
408    AutoPtr<Xapian::Weight> wt(i->second->unserialise(string(p, len)));
409    p += len;
410
411    // Unserialise the RSet object.
412    Xapian::RSet rset = unserialise_rset(string(p, p_end - p));
413
414    Stats local_stats;
415    MultiMatch match(*db, query.get(), qlen, &rset, collapse_key,
416                     percent_cutoff, weight_cutoff, order,
417                     sort_key, sort_by, sort_value_forward, NULL,
418                     NULL, local_stats, wt.get());
419
420    send_message(REPLY_STATS, serialise_stats(local_stats));
421
422    string message;
423#if 0 // Reinstate this when major protocol version increases to 31.
424    get_message(active_timeout, message, MSG_GETMSET);
425#else
426    char type = get_message(active_timeout, message);
427    if (rare(type != MSG_GETMSET)) {
428        if (type != MSG_GETMSET_PRE_30_5 && type != MSG_GETMSET_PRE_30_3) {
429            string errmsg("Expecting message type ");
430            errmsg += om_tostring(MSG_GETMSET_PRE_30_3);
431            errmsg += " or ";
432            errmsg += om_tostring(MSG_GETMSET_PRE_30_5);
433            errmsg += " or ";
434            errmsg += om_tostring(MSG_GETMSET);
435            errmsg += ", got ";
436            errmsg += om_tostring(type);
437            throw Xapian::NetworkError(errmsg);
438        }
439    }
440#endif
441    p = message.c_str();
442    p_end = p + message.size();
443
444    Xapian::termcount first = decode_length(&p, p_end, false);
445    Xapian::termcount maxitems = decode_length(&p, p_end, false);
446
447    Xapian::termcount check_at_least = 0;
448    if (type != MSG_GETMSET_PRE_30_3) {
449        check_at_least = decode_length(&p, p_end, false);
450    }
451
452    message.erase(0, message.size() - (p_end - p));
453    Stats total_stats(unserialise_stats(message));
454
455    Xapian::MSet mset;
456    match.get_mset(first, maxitems, check_at_least, mset, total_stats, 0, 0);
457
458    if (type == MSG_GETMSET_PRE_30_3 || type == MSG_GETMSET_PRE_30_5) {
459        send_message(REPLY_RESULTS_PRE_30_5, serialise_mset_pre_30_5(mset));
460    } else {
461        send_message(REPLY_RESULTS, serialise_mset(mset));
462    }
463}
464
465void
466RemoteServer::msg_document(const string &message)
467{
468    const char *p = message.data();
469    const char *p_end = p + message.size();
470    Xapian::docid did = decode_length(&p, p_end, false);
471
472    Xapian::Document doc = db->get_document(did);
473
474    send_message(REPLY_DOCDATA, doc.get_data());
475
476    Xapian::ValueIterator i;
477    for (i = doc.values_begin(); i != doc.values_end(); ++i) {
478        string item = encode_length(i.get_valueno());
479        item += *i;
480        send_message(REPLY_VALUE, item);
481    }
482    send_message(REPLY_DONE, "");
483}
484
485void
486RemoteServer::msg_keepalive(const string &)
487{
488    // Ensure *our* database stays alive, as it may contain remote databases!
489    db->keep_alive();
490    send_message(REPLY_DONE, "");
491}
492
493void
494RemoteServer::msg_termexists(const string &term)
495{
496    send_message((db->term_exists(term) ? REPLY_TERMEXISTS : REPLY_TERMDOESNTEXIST), "");
497}
498
499void
500RemoteServer::msg_collfreq(const string &term)
501{
502    send_message(REPLY_COLLFREQ, encode_length(db->get_collection_freq(term)));
503}
504
505void
506RemoteServer::msg_termfreq(const string &term)
507{
508    send_message(REPLY_TERMFREQ, encode_length(db->get_termfreq(term)));
509}
510
511void
512RemoteServer::msg_doclength(const string &message)
513{
514    const char *p = message.data();
515    const char *p_end = p + message.size();
516    Xapian::docid did = decode_length(&p, p_end, false);
517    // FIXME: get_doclength should always return an integer, but
518    // Xapian::doclength is a double...
519    send_message(REPLY_DOCLENGTH, serialise_double(db->get_doclength(did)));
520}
521
522void
523RemoteServer::msg_flush(const string &)
524{
525    if (!wdb)
526        throw Xapian::InvalidOperationError("Server is read-only");
527
528    wdb->flush();
529
530    send_message(REPLY_DONE, "");
531}
532
533void
534RemoteServer::msg_cancel(const string &)
535{
536    if (!wdb)
537        throw Xapian::InvalidOperationError("Server is read-only");
538
539    // We can't call cancel since that's an internal method, but this
540    // has the same effect with minimal additional overhead.
541    wdb->begin_transaction(false);
542    wdb->cancel_transaction();
543}
544
545void
546RemoteServer::msg_adddocument(const string & message)
547{
548    if (!wdb)
549        throw Xapian::InvalidOperationError("Server is read-only");
550
551    Xapian::docid did = wdb->add_document(unserialise_document(message));
552
553    send_message(REPLY_ADDDOCUMENT, encode_length(did));
554}
555
556// FIXME: eliminate this method when we move to remote major 31.
557void
558RemoteServer::msg_deletedocument_pre_30_2(const string & message)
559{
560    if (!wdb)
561        throw Xapian::InvalidOperationError("Server is read-only");
562
563    const char *p = message.data();
564    const char *p_end = p + message.size();
565    Xapian::docid did = decode_length(&p, p_end, false);
566
567    wdb->delete_document(did);
568}
569
570void
571RemoteServer::msg_deletedocument(const string & message)
572{
573    msg_deletedocument_pre_30_2(message);
574
575    send_message(REPLY_DONE, "");
576}
577
578void
579RemoteServer::msg_deletedocumentterm(const string & message)
580{
581    if (!wdb)
582        throw Xapian::InvalidOperationError("Server is read-only");
583
584    wdb->delete_document(message);
585}
586
587void
588RemoteServer::msg_replacedocument(const string & message)
589{
590    if (!wdb)
591        throw Xapian::InvalidOperationError("Server is read-only");
592
593    const char *p = message.data();
594    const char *p_end = p + message.size();
595    Xapian::docid did = decode_length(&p, p_end, false);
596
597    wdb->replace_document(did, unserialise_document(string(p, p_end)));
598}
599
600void
601RemoteServer::msg_replacedocumentterm(const string & message)
602{
603    if (!wdb)
604        throw Xapian::InvalidOperationError("Server is read-only");
605
606    const char *p = message.data();
607    const char *p_end = p + message.size();
608    size_t len = decode_length(&p, p_end, true);
609    string unique_term(p, len);
610    p += len;
611
612    Xapian::docid did = wdb->replace_document(unique_term, unserialise_document(string(p, p_end)));
613
614    send_message(REPLY_ADDDOCUMENT, encode_length(did));
615}
Note: See TracBrowser for help on using the browser.