root / tags / 1.0.8 / xapian-core / backends / flint / flint_database.cc

Revision 10955, 34.3 kB (checked in by olly, 6 months ago)

Backport change from trunk:
backends/flint/flint_database.cc,backends/flint/flint_values.cc,
backends/flint/flint_values.h: Fix WritableDatabase::add_document()
and replace_document() not to be O(n*n) in the number of values in
the new document.
tests/api_wrdb.cc: Add testcase bigoaddvalue to make sure we don't
regress to O(n*n) (or worse!)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* flint_database.cc: flint database
2 *
3 * Copyright 1999,2000,2001 BrightStation PLC
4 * Copyright 2001 Hein Ragas
5 * Copyright 2002 Ananova Ltd
6 * Copyright 2002,2003,2004,2005,2006,2007 Olly Betts
7 * Copyright 2006 Richard Boulton
8 *
9 * This program is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU General Public License as
11 * published by the Free Software Foundation; either version 2 of the
12 * License, or (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
22 * USA
23 */
24
25#include <config.h>
26
27#include <xapian/error.h>
28
29#include "safeerrno.h"
30
31#include "flint_database.h"
32#include "utils.h"
33#include "omdebug.h"
34#include "autoptr.h"
35#include <xapian/error.h>
36#include <xapian/valueiterator.h>
37
38#include "contiguousalldocspostlist.h"
39#include "flint_modifiedpostlist.h"
40#include "flint_postlist.h"
41#include "flint_alldocspostlist.h"
42#include "flint_termlist.h"
43#include "flint_positionlist.h"
44#include "flint_utils.h"
45#include "flint_record.h"
46#include "flint_values.h"
47#include "flint_document.h"
48#include "flint_alltermslist.h"
49#include "flint_lock.h"
50#include "flint_spellingwordslist.h"
51#include "stringutils.h"
52
53#include <sys/types.h>
54#include "safesysstat.h"
55
56#include <list>
57#include <string>
58
59using namespace std;
60using namespace Xapian;
61
62// The maximum safe term length is determined by the postlist.  There we
63// store the term followed by "\x00\x00" then a length byte, then up to
64// 4 bytes of docid.  The Btree manager's key length limit is 252 bytes
65// so the maximum safe term length is 252 - 2 - 1 - 4 = 245 bytes.  If
66// the term contains zero bytes, the limit is lower (by one for each zero byte
67// in the term).
68#define MAX_SAFE_TERM_LENGTH 245
69
70// Magic key in the postlist table (which corresponds to an invalid docid) is
71// used to store the next free docid and total length of all documents.
72static const string METAINFO_KEY("", 1);
73
74/* This finds the tables, opens them at consistent revisions, manages
75 * determining the current and next revision numbers, and stores handles
76 * to the tables.
77 */
78FlintDatabase::FlintDatabase(const string &flint_dir, int action,
79                             unsigned int block_size)
80        : db_dir(flint_dir),
81          readonly(action == XAPIAN_DB_READONLY),
82          version_file(db_dir),
83          postlist_table(db_dir, readonly),
84          position_table(db_dir, readonly),
85          termlist_table(db_dir, readonly),
86          value_table(db_dir, readonly),
87          synonym_table(db_dir, readonly),
88          spelling_table(db_dir, readonly),
89          record_table(db_dir, readonly),
90          lock(db_dir + "/flintlock"),
91          total_length(0),
92          lastdocid(0)
93{
94    DEBUGCALL(DB, void, "FlintDatabase", flint_dir << ", " << action <<
95              ", " << block_size);
96
97    if (action == XAPIAN_DB_READONLY) {
98        open_tables_consistent();
99        return;
100    }
101
102    if (action != Xapian::DB_OPEN && !database_exists()) {
103        // FIXME: if we allow Xapian::DB_OVERWRITE, check it here
104
105        // Create the directory for the database, if it doesn't exist
106        // already.
107        bool fail = false;
108        struct stat statbuf;
109        if (stat(db_dir, &statbuf) == 0) {
110            if (!S_ISDIR(statbuf.st_mode)) fail = true;
111        } else if (errno != ENOENT || mkdir(db_dir, 0755) == -1) {
112            fail = true;
113        }
114        if (fail) {
115            throw Xapian::DatabaseCreateError("Cannot create directory `" +
116                                              db_dir + "'", errno);
117        }
118        get_database_write_lock();
119
120        create_and_open_tables(block_size);
121        return;
122    }
123
124    if (action == Xapian::DB_CREATE) {
125        throw Xapian::DatabaseCreateError("Can't create new database at `" +
126                                          db_dir + "': a database already exists and I was told "
127                                          "not to overwrite it");
128    }
129
130    get_database_write_lock();
131    // if we're overwriting, pretend the db doesn't exist
132    // FIXME: if we allow Xapian::DB_OVERWRITE, check it here
133    if (action == Xapian::DB_CREATE_OR_OVERWRITE) {
134        create_and_open_tables(block_size);
135        return;
136    }
137
138    // Get latest consistent version
139    open_tables_consistent();
140
141    // Check that there are no more recent versions of tables.  If there
142    // are, perform recovery by writing a new revision number to all
143    // tables.
144    if (record_table.get_open_revision_number() !=
145        postlist_table.get_latest_revision_number()) {
146        flint_revision_number_t new_revision = get_next_revision_number();
147
148        set_revision_number(new_revision);
149    }
150}
151
152FlintDatabase::~FlintDatabase()
153{
154    DEBUGCALL(DB, void, "~FlintDatabase", "");
155}
156
157void
158FlintDatabase::read_metainfo()
159{
160    DEBUGCALL(DB, void, "FlintDatabase::read_metainfo", "");
161
162    string tag;
163    if (!postlist_table.get_exact_entry(METAINFO_KEY, tag)) {
164        lastdocid = 0;
165        total_length = 0;
166        return;
167    }
168
169    const char * data = tag.data();
170    const char * end = data + tag.size();
171    if (!unpack_uint(&data, end, &lastdocid) ||
172        !unpack_uint_last(&data, end, &total_length)) {
173        throw Xapian::DatabaseCorruptError("Meta information is corrupt.");
174    }
175}
176
177bool
178FlintDatabase::database_exists() {
179    DEBUGCALL(DB, bool, "FlintDatabase::database_exists", "");
180    RETURN(record_table.exists() &&
181           postlist_table.exists() &&
182           termlist_table.exists());
183}
184
185void
186FlintDatabase::create_and_open_tables(unsigned int block_size)
187{
188    DEBUGCALL(DB, void, "FlintDatabase::create_and_open_tables", "");
189    // The caller is expected to create the database directory if it doesn't
190    // already exist.
191
192    // Create postlist_table first, and record_table last.  Existence of
193    // record_table is considered to imply existence of the database.
194    version_file.create();
195    postlist_table.create_and_open(block_size);
196    // The position table is created lazily, but erase it in case we're
197    // overwriting an existing database and it already exists.
198    position_table.erase();
199    position_table.set_block_size(block_size);
200
201    termlist_table.create_and_open(block_size);
202    // The value table is created lazily, but erase it in case we're
203    // overwriting an existing database and it already exists.
204    value_table.erase();
205    value_table.set_block_size(block_size);
206
207    synonym_table.create_and_open(block_size);
208    spelling_table.create_and_open(block_size);
209    record_table.create_and_open(block_size);
210
211    Assert(database_exists());
212
213    // Check consistency
214    flint_revision_number_t revision = record_table.get_open_revision_number();
215    if (revision != termlist_table.get_open_revision_number() ||
216        revision != postlist_table.get_open_revision_number()) {
217        throw Xapian::DatabaseCreateError("Newly created tables are not in consistent state");
218    }
219
220    total_length = 0;
221    lastdocid = 0;
222}
223
224void
225FlintDatabase::open_tables_consistent()
226{
227    DEBUGCALL(DB, void, "FlintDatabase::open_tables_consistent", "");
228    // Open record_table first, since it's the last to be written to,
229    // and hence if a revision is available in it, it should be available
230    // in all the other tables (unless they've moved on already).
231    //
232    // If we find that a table can't open the desired revision, we
233    // go back and open record_table again, until record_table has
234    // the same revision as the last time we opened it.
235
236    flint_revision_number_t cur_rev = record_table.get_open_revision_number();
237
238    // Check the version file unless we're reopening.
239    if (cur_rev == 0) version_file.read_and_check(readonly);
240
241    record_table.open();
242    flint_revision_number_t revision = record_table.get_open_revision_number();
243
244    if (cur_rev && cur_rev == revision) {
245        // We're reopening a database and the revision hasn't changed so we
246        // don't need to do anything.
247        return;
248    }
249
250    // In case the position, value, synonym, and/or spelling tables don't
251    // exist yet.
252    unsigned int block_size = record_table.get_block_size();
253    position_table.set_block_size(block_size);
254    value_table.set_block_size(block_size);
255    synonym_table.set_block_size(block_size);
256    spelling_table.set_block_size(block_size);
257
258    bool fully_opened = false;
259    int tries = 100;
260    int tries_left = tries;
261    while (!fully_opened && (tries_left--) > 0) {
262        if (spelling_table.open(revision) &&
263            synonym_table.open(revision) &&
264            value_table.open(revision) &&
265            termlist_table.open(revision) &&
266            position_table.open(revision) &&
267            postlist_table.open(revision)) {
268            // Everything now open at the same revision.
269            fully_opened = true;
270        } else {
271            // Couldn't open consistent revision: two cases possible:
272            // i)   An update has completed and a second one has begun since
273            //      record was opened.  This leaves a consistent revision
274            //      available, but not the one we were trying to open.
275            // ii)  Tables have become corrupt / have no consistent revision
276            //      available.  In this case, updates must have ceased.
277            //
278            // So, we reopen the record table, and check its revision number,
279            // if it's changed we try the opening again, otherwise we give up.
280            //
281            record_table.open();
282            flint_revision_number_t newrevision =
283                    record_table.get_open_revision_number();
284            if (revision == newrevision) {
285                // Revision number hasn't changed - therefore a second index
286                // sweep hasn't begun and the system must have failed.  Database
287                // is inconsistent.
288                throw Xapian::DatabaseCorruptError("Cannot open tables at consistent revisions");
289            }
290            revision = newrevision;
291        }
292    }
293
294    if (!fully_opened) {
295        throw Xapian::DatabaseModifiedError("Cannot open tables at stable revision - changing too fast");
296    }
297
298    read_metainfo();
299}
300
301void
302FlintDatabase::open_tables(flint_revision_number_t revision)
303{
304    DEBUGCALL(DB, void, "FlintDatabase::open_tables", revision);
305    version_file.read_and_check(readonly);
306    record_table.open(revision);
307
308    // In case the position, value, synonym, and/or spelling tables don't
309    // exist yet.
310    unsigned int block_size = record_table.get_block_size();
311    position_table.set_block_size(block_size);
312    value_table.set_block_size(block_size);
313    synonym_table.set_block_size(block_size);
314    spelling_table.set_block_size(block_size);
315
316    spelling_table.open(revision);
317    synonym_table.open(revision);
318    value_table.open(revision);
319    termlist_table.open(revision);
320    position_table.open(revision);
321    postlist_table.open(revision);
322}
323
324flint_revision_number_t
325FlintDatabase::get_revision_number() const
326{
327    DEBUGCALL(DB, flint_revision_number_t, "FlintDatabase::get_revision_number", "");
328    // We could use any table here, theoretically.
329    RETURN(postlist_table.get_open_revision_number());
330}
331
332flint_revision_number_t
333FlintDatabase::get_next_revision_number() const
334{
335    DEBUGCALL(DB, flint_revision_number_t, "FlintDatabase::get_next_revision_number", "");
336    /* We _must_ use postlist_table here, since it is always the first
337     * to be written, and hence will have the greatest available revision
338     * number.
339     */
340    flint_revision_number_t new_revision =
341            postlist_table.get_latest_revision_number();
342    ++new_revision;
343    RETURN(new_revision);
344}
345
346void
347FlintDatabase::set_revision_number(flint_revision_number_t new_revision)
348{
349    DEBUGCALL(DB, void, "FlintDatabase::set_revision_number", new_revision);
350    postlist_table.commit(new_revision);
351    position_table.commit(new_revision);
352    termlist_table.commit(new_revision);
353    value_table.commit(new_revision);
354    synonym_table.commit(new_revision);
355    spelling_table.commit(new_revision);
356    record_table.commit(new_revision);
357}
358
359void
360FlintDatabase::reopen()
361{
362    DEBUGCALL(DB, void, "FlintDatabase::reopen", "");
363    if (readonly) {
364        open_tables_consistent();
365    }
366}
367
368void
369FlintDatabase::get_database_write_lock()
370{
371    DEBUGCALL(DB, void, "FlintDatabase::get_database_write_lock", "");
372    FlintLock::reason why = lock.lock(true);
373    if (why != FlintLock::SUCCESS) {
374        if (why == FlintLock::UNKNOWN && !database_exists()) {
375            string msg("No flint database found at path `");
376            msg += db_dir;
377            msg += '\'';
378            throw Xapian::DatabaseOpeningError(msg);
379        }
380        string msg("Unable to acquire database write lock on ");
381        msg += db_dir;
382        if (why == FlintLock::INUSE) {
383            msg += ": already locked";
384        } else if (why == FlintLock::UNSUPPORTED) {
385            msg += ": locking probably not supported by this FS";
386        }
387        throw Xapian::DatabaseLockError(msg);
388    }
389}
390
391void
392FlintDatabase::apply()
393{
394    DEBUGCALL(DB, void, "FlintDatabase::apply", "");
395    if (!postlist_table.is_modified() &&
396        !position_table.is_modified() &&
397        !termlist_table.is_modified() &&
398        !value_table.is_modified() &&
399        !synonym_table.is_modified() &&
400        !spelling_table.is_modified() &&
401        !record_table.is_modified()) {
402        return;
403    }
404
405    flint_revision_number_t old_revision = get_revision_number();
406    flint_revision_number_t new_revision = get_next_revision_number();
407
408    try {
409        set_revision_number(new_revision);
410    } catch (...) {
411        // Modifications failed.  Wipe all the modifications from memory.
412        try {
413            // Discard any buffered changes and reinitialised cached values
414            // from the table.
415            cancel();
416
417            // Reopen tables with old revision number.
418            open_tables(old_revision);
419
420            // Increase revision numbers to new revision number plus one,
421            // writing increased numbers to all tables.
422            ++new_revision;
423            set_revision_number(new_revision);
424        } catch (const Xapian::Error &e) {
425            throw Xapian::DatabaseError("Modifications failed, and cannot set consistent table revision numbers: " + e.get_msg());
426        }
427        throw;
428    }
429}
430
431void
432FlintDatabase::cancel()
433{
434    DEBUGCALL(DB, void, "FlintDatabase::cancel", "");
435    postlist_table.cancel();
436    position_table.cancel();
437    termlist_table.cancel();
438    value_table.cancel();
439    synonym_table.cancel();
440    spelling_table.cancel();
441    record_table.cancel();
442}
443
444Xapian::doccount
445FlintDatabase::get_doccount() const
446{
447    DEBUGCALL(DB, Xapian::doccount, "FlintDatabase::get_doccount", "");
448    RETURN(record_table.get_doccount());
449}
450
451Xapian::docid
452FlintDatabase::get_lastdocid() const
453{
454    DEBUGCALL(DB, Xapian::docid, "FlintDatabase::get_lastdocid", "");
455    RETURN(lastdocid);
456}
457
458Xapian::doclength
459FlintDatabase::get_avlength() const
460{
461    DEBUGCALL(DB, Xapian::doclength, "FlintDatabase::get_avlength", "");
462    Xapian::doccount doccount = record_table.get_doccount();
463    if (doccount == 0) {
464        // Avoid dividing by zero when there are no documents.
465        RETURN(0);
466    }
467    RETURN(double(total_length) / doccount);
468}
469
470Xapian::doclength
471FlintDatabase::get_doclength(Xapian::docid did) const
472{
473    DEBUGCALL(DB, Xapian::doclength, "FlintDatabase::get_doclength", did);
474    Assert(did != 0);
475    RETURN(termlist_table.get_doclength(did));
476}
477
478Xapian::doccount
479FlintDatabase::get_termfreq(const string & term) const
480{
481    DEBUGCALL(DB, Xapian::doccount, "FlintDatabase::get_termfreq", term);
482    Assert(!term.empty());
483    RETURN(postlist_table.get_termfreq(term));
484}
485
486Xapian::termcount
487FlintDatabase::get_collection_freq(const string & term) const
488{
489    DEBUGCALL(DB, Xapian::termcount, "FlintDatabase::get_collection_freq", term);
490    Assert(!term.empty());
491    RETURN(postlist_table.get_collection_freq(term));
492}
493
494bool
495FlintDatabase::term_exists(const string & term) const
496{
497    DEBUGCALL(DB, bool, "FlintDatabase::term_exists", term);
498    Assert(!term.empty());
499    return postlist_table.term_exists(term);
500}
501
502bool
503FlintDatabase::has_positions() const
504{
505    return position_table.get_entry_count() > 0;
506}
507
508LeafPostList *
509FlintDatabase::open_post_list(const string& term) const
510{
511    DEBUGCALL(DB, LeafPostList *, "FlintDatabase::open_post_list", term);
512    Xapian::Internal::RefCntPtr<const FlintDatabase> ptrtothis(this);
513
514    if (term.empty()) {
515        Xapian::doccount doccount = get_doccount();
516        if (lastdocid == doccount) {
517            RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
518        }
519        RETURN(new FlintAllDocsPostList(ptrtothis, doccount));
520    }
521
522    RETURN(new FlintPostList(ptrtothis, term));
523}
524
525TermList *
526FlintDatabase::open_term_list(Xapian::docid did) const
527{
528    DEBUGCALL(DB, TermList *, "FlintDatabase::open_term_list", did);
529    Assert(did != 0);
530
531    Xapian::Internal::RefCntPtr<const FlintDatabase> ptrtothis(this);
532    RETURN(new FlintTermList(ptrtothis, did));
533}
534
535Xapian::Document::Internal *
536FlintDatabase::open_document(Xapian::docid did, bool lazy) const
537{
538    DEBUGCALL(DB, Xapian::Document::Internal *, "FlintDatabase::open_document",
539              did << ", " << lazy);
540    Assert(did != 0);
541
542    Xapian::Internal::RefCntPtr<const FlintDatabase> ptrtothis(this);
543    RETURN(new FlintDocument(ptrtothis,
544                              &value_table,
545                              &record_table,
546                              did, lazy));
547}
548
549PositionList *
550FlintDatabase::open_position_list(Xapian::docid did, const string & term) const
551{
552    Assert(did != 0);
553
554    AutoPtr<FlintPositionList> poslist(new FlintPositionList());
555    if (!poslist->read_data(&position_table, did, term)) {
556        // Check that term / document combination exists.
557        // If the doc doesn't exist, this will throw Xapian::DocNotFoundError:
558        AutoPtr<TermList> tl(open_term_list(did));
559        tl->skip_to(term);
560        if (tl->at_end() || tl->get_termname() != term)
561            throw Xapian::RangeError("Can't open position list: requested term is not present in document.");
562        // FIXME: For 1.2.0, change this to just return an empty termlist.
563        // If the user really needs to know, they can check themselves.
564    }
565
566    return poslist.release();
567}
568
569TermList *
570FlintDatabase::open_allterms(const string & prefix) const
571{
572    DEBUGCALL(DB, TermList *, "FlintDatabase::open_allterms", "");
573    RETURN(new FlintAllTermsList(Xapian::Internal::RefCntPtr<const FlintDatabase>(this),
574                                 prefix));
575}
576
577TermList *
578FlintDatabase::open_spelling_termlist(const string & word) const
579{
580    return spelling_table.open_termlist(word);
581}
582
583TermList *
584FlintDatabase::open_spelling_wordlist() const
585{
586    FlintCursor * cursor = spelling_table.cursor_get();
587    if (!cursor) return NULL;
588    return new FlintSpellingWordsList(Xapian::Internal::RefCntPtr<const FlintDatabase>(this),
589                                      cursor);
590}
591
592Xapian::doccount
593FlintDatabase::get_spelling_frequency(const string & word) const
594{
595    return spelling_table.get_word_frequency(word);
596}
597
598TermList *
599FlintDatabase::open_synonym_termlist(const string & term) const
600{
601    return synonym_table.open_termlist(term);
602}
603
604TermList *
605FlintDatabase::open_synonym_keylist(const string & prefix) const
606{
607    FlintCursor * cursor = synonym_table.cursor_get();
608    if (!cursor) return NULL;
609    return new FlintSynonymTermList(Xapian::Internal::RefCntPtr<const FlintDatabase>(this),
610                                    cursor, synonym_table.get_entry_count(),
611                                    prefix);
612}
613
614string
615FlintDatabase::get_metadata(const string & key) const
616{
617    DEBUGCALL(DB, string, "FlintDatabase::get_metadata", key);
618    string btree_key("\x00\xc0", 2);
619    btree_key += key;
620    string tag;
621    (void)postlist_table.get_exact_entry(btree_key, tag);
622    RETURN(tag);
623}
624
625///////////////////////////////////////////////////////////////////////////
626
627FlintWritableDatabase::FlintWritableDatabase(const string &dir, int action,
628                                               int block_size)
629        : FlintDatabase(dir, action, block_size),
630          freq_deltas(),
631          doclens(),
632          mod_plists(),
633          change_count(0),
634          flush_threshold(0)
635{
636    DEBUGCALL(DB, void, "FlintWritableDatabase", dir << ", " << action << ", "
637              << block_size);
638
639    const char *p = getenv("XAPIAN_FLUSH_THRESHOLD");
640    if (p)
641        flush_threshold = atoi(p);
642    if (flush_threshold == 0)
643        flush_threshold = 10000;
644}
645
646FlintWritableDatabase::~FlintWritableDatabase()
647{
648    DEBUGCALL(DB, void, "~FlintWritableDatabase", "");
649    dtor_called();
650}
651
652void
653FlintWritableDatabase::flush()
654{
655    if (transaction_active())
656        throw Xapian::InvalidOperationError("Can't flush during a transaction");
657    if (change_count) flush_postlist_changes();
658    apply();
659}
660
661void
662FlintWritableDatabase::flush_postlist_changes() const
663{
664    postlist_table.merge_changes(mod_plists, doclens, freq_deltas);
665
666    // Update the total document length and last used docid.
667    string tag = pack_uint(lastdocid);
668    tag += pack_uint_last(total_length);
669    postlist_table.add(METAINFO_KEY, tag);
670
671    freq_deltas.clear();
672    doclens.clear();
673    mod_plists.clear();
674    change_count = 0;
675}
676
677Xapian::docid
678FlintWritableDatabase::add_document(const Xapian::Document & document)
679{
680    DEBUGCALL(DB, Xapian::docid,
681              "FlintWritableDatabase::add_document", document);
682    // Make sure the docid counter doesn't overflow.
683    if (lastdocid == Xapian::docid(-1))
684        throw Xapian::DatabaseError("Run out of docids - you'll have to use copydatabase to eliminate any gaps before you can add more documents");
685    // Use the next unused document ID.
686    RETURN(add_document_(++lastdocid, document));
687}
688
689Xapian::docid
690FlintWritableDatabase::add_document_(Xapian::docid did,
691                                     const Xapian::Document & document)
692{
693    DEBUGCALL(DB, Xapian::docid,
694              "FlintWritableDatabase::add_document_", did << ", " << document);
695    Assert(did != 0);
696    try {
697        // Add the record using that document ID.
698        record_table.replace_record(document.get_data(), did);
699
700        // Set the values.
701        {
702            Xapian::ValueIterator value = document.values_begin();
703            Xapian::ValueIterator value_end = document.values_end();
704            string s;
705            value_table.encode_values(s, value, value_end);
706            value_table.set_encoded_values(did, s);
707        }
708
709        flint_doclen_t new_doclen = 0;
710        {
711            Xapian::TermIterator term = document.termlist_begin();
712            Xapian::TermIterator term_end = document.termlist_end();
713            for ( ; term != term_end; ++term) {
714                termcount wdf = term.get_wdf();
715                // Calculate the new document length
716                new_doclen += wdf;
717
718                string tname = *term;
719                if (tname.size() > MAX_SAFE_TERM_LENGTH)
720                    throw Xapian::InvalidArgumentError("Term too long (> "STRINGIZE(MAX_SAFE_TERM_LENGTH)"): " + tname);
721                map<string, pair<termcount_diff, termcount_diff> >::iterator i;
722                i = freq_deltas.find(tname);
723                if (i == freq_deltas.end()) {
724                    freq_deltas.insert(make_pair(tname, make_pair(1, termcount_diff(wdf))));
725                } else {
726                    ++i->second.first;
727                    i->second.second += wdf;
728                }
729
730                // Add did to tname's postlist
731                map<string, map<docid, pair<char, termcount> > >::iterator j;
732                j = mod_plists.find(tname);
733                if (j == mod_plists.end()) {
734                    map<docid, pair<char, termcount> > m;
735                    j = mod_plists.insert(make_pair(tname, m)).first;
736                }
737                Assert(j->second.find(did) == j->second.end());
738                j->second.insert(make_pair(did, make_pair('A', wdf)));
739
740                if (term.positionlist_begin() != term.positionlist_end()) {
741                    position_table.set_positionlist(
742                        did, tname,
743                        term.positionlist_begin(), term.positionlist_end());
744                }
745            }
746        }
747        DEBUGLINE(DB, "Calculated doclen for new document " << did << " as " << new_doclen);
748
749        // Set the termlist
750        termlist_table.set_termlist(did, document, new_doclen);
751
752        // Set the new document length
753        Assert(doclens.find(did) == doclens.end());
754        doclens[did] = new_doclen;
755        total_length += new_doclen;
756    } catch (...) {
757        // If an error occurs while adding a document, or doing any other
758        // transaction, the modifications so far must be cleared before
759        // returning control to the user - otherwise partial modifications will
760        // persist in memory, and eventually get written to disk.
761        cancel();
762        throw;
763    }
764
765    // FIXME: this should be done by checking memory usage, not the number of
766    // changes.
767    // We could also look at:
768    // * mod_plists.size()
769    // * doclens.size()
770    // * freq_deltas.size()
771    //
772    // cout << "+++ mod_plists.size() " << mod_plists.size() <<
773    //     ", doclens.size() " << doclens.size() <<
774    //     ", freq_deltas.size() " << freq_deltas.size() << endl;
775    if (++change_count >= flush_threshold) {
776        flush_postlist_changes();
777        if (!transaction_active()) apply();
778    }
779
780    RETURN(did);
781}
782
783void
784FlintWritableDatabase::delete_document(Xapian::docid did)
785{
786    DEBUGCALL(DB, void, "FlintWritableDatabase::delete_document", did);
787    Assert(did != 0);
788
789    // Remove the record.  If this fails, just propagate the exception since
790    // the state should still be consistent (most likely it's
791    // DocNotFoundError).
792    record_table.delete_record(did);
793
794    try {
795        // Remove the values
796        value_table.delete_all_values(did);
797
798        // OK, now add entries to remove the postings in the underlying record.
799        Xapian::Internal::RefCntPtr<const FlintWritableDatabase> ptrtothis(this);
800        FlintTermList termlist(ptrtothis, did);
801
802        total_length -= termlist.get_doclength();
803
804        termlist.next();
805        while (!termlist.at_end()) {
806            string tname = termlist.get_termname();
807            position_table.delete_positionlist(did, tname);
808            termcount wdf = termlist.get_wdf();
809
810            map<string, pair<termcount_diff, termcount_diff> >::iterator i;
811            i = freq_deltas.find(tname);
812            if (i == freq_deltas.end()) {
813                freq_deltas.insert(make_pair(tname, make_pair(-1, -termcount_diff(wdf))));
814            } else {
815                --i->second.first;
816                i->second.second -= wdf;
817            }
818
819            // Remove did from tname's postlist
820            map<string, map<docid, pair<char, termcount> > >::iterator j;
821            j = mod_plists.find(tname);
822            if (j == mod_plists.end()) {
823                map<docid, pair<char, termcount> > m;
824                j = mod_plists.insert(make_pair(tname, m)).first;
825            }
826
827            map<docid, pair<char, termcount> >::iterator k;
828            k = j->second.find(did);
829            if (k == j->second.end()) {
830                j->second.insert(make_pair(did, make_pair('D', 0u)));
831            } else {
832                // Deleting a document we added/modified since the last flush.
833                k->second = make_pair('D', 0u);
834            }
835
836            termlist.next();
837        }
838
839        // Remove the termlist.
840        termlist_table.delete_termlist(did);
841
842        // Remove the new doclength.
843        doclens.erase(did);
844    } catch (...) {
845        // If an error occurs while deleting a document, or doing any other
846        // transaction, the modifications so far must be cleared before
847        // returning control to the user - otherwise partial modifications will
848        // persist in memory, and eventually get written to disk.
849        cancel();
850        throw;
851    }
852
853    if (++change_count >= flush_threshold) {
854        flush_postlist_changes();
855        if (!transaction_active()) apply();
856    }
857}
858
859void
860FlintWritableDatabase::replace_document(Xapian::docid did,
861                                        const Xapian::Document & document)
862{
863    DEBUGCALL(DB, void, "FlintWritableDatabase::replace_document", did << ", " << document);
864    Assert(did != 0);
865
866    try {
867        if (did > lastdocid) {
868            lastdocid = did;
869            // If this docid is above the highwatermark, then we can't be
870            // replacing an existing document.
871            (void)add_document_(did, document);
872            return;
873        }
874
875        // OK, now add entries to remove the postings in the underlying record.
876        Xapian::Internal::RefCntPtr<const FlintWritableDatabase> ptrtothis(this);
877        FlintTermList termlist(ptrtothis, did);
878
879        termlist.next();
880        while (!termlist.at_end()) {
881            string tname = termlist.get_termname();
882            termcount wdf = termlist.get_wdf();
883
884            map<string, pair<termcount_diff, termcount_diff> >::iterator i;
885            i = freq_deltas.find(tname);
886            if (i == freq_deltas.end()) {
887                freq_deltas.insert(make_pair(tname, make_pair(-1, -termcount_diff(wdf))));
888            } else {
889                --i->second.first;
890                i->second.second -= wdf;
891            }
892
893            // Remove did from tname's postlist
894            map<string, map<docid, pair<char, termcount> > >::iterator j;
895            j = mod_plists.find(tname);
896            if (j == mod_plists.end()) {
897                map<docid, pair<char, termcount> > m;
898                j = mod_plists.insert(make_pair(tname, m)).first;
899            }
900
901            map<docid, pair<char, termcount> >::iterator k;
902            k = j->second.find(did);
903            if (k == j->second.end()) {
904                j->second.insert(make_pair(did, make_pair('D', 0u)));
905            } else {
906                // Modifying a document we added/modified since the last flush.
907                k->second = make_pair('D', 0u);
908            }
909
910            termlist.next();
911        }
912
913        total_length -= termlist.get_doclength();
914
915        // Replace the record
916      Â