root / tags / 1.0.8 / xapian-core / backends / flint / flint_table.cc

Revision 11154, 59.9 kB (checked in by olly, 4 months ago)

Backport change from trunk:
backends/flint/flint_table.cc,backends/flint/flint_table.h:
Eliminate other_base_letter member of FlintTable - its value can
always be easily determined from base_letter.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* flint_table.cc: Btree implementation
2 *
3 * Copyright 1999,2000,2001 BrightStation PLC
4 * Copyright 2002 Ananova Ltd
5 * Copyright 2002,2003,2004,2005,2006,2007 Olly Betts
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
20 * USA
21 */
22
23#include <config.h>
24
25#include <xapian/error.h>
26
27#include "safeerrno.h"
28#ifdef __WIN32__
29# include "msvc_posix_wrapper.h"
30#endif
31
32#include "omassert.h"
33#include "stringutils.h" // For STRINGIZE().
34
35// Define to use "dangerous" mode - in this mode we write modified btree
36// blocks back in place.  This is somewhat faster (especially once we're
37// I/O bound) but the database can't be safely searched during indexing
38// and if indexing is terminated uncleanly, the database may be corrupt.
39//
40// Despite the risks, it's useful for speeding up a full rebuild.
41//
42// FIXME: make this mode run-time selectable, and record that it is currently
43// in use somewhere on disk, so readers can check and refuse to open the
44// database.
45//
46// #define DANGEROUS
47
48#include <sys/types.h>
49
50// Trying to include the correct headers with the correct defines set to
51// get pread() and pwrite() prototyped on every platform without breaking any
52// other platform is a real can of worms.  So instead we probe for what
53// prototypes (if any) are required in configure and put them into
54// PREAD_PROTOTYPE and PWRITE_PROTOTYPE.
55#if defined HAVE_PREAD && defined PREAD_PROTOTYPE
56PREAD_PROTOTYPE
57#endif
58#if defined HAVE_PWRITE && defined PWRITE_PROTOTYPE
59PWRITE_PROTOTYPE
60#endif
61
62#include <stdio.h>
63#include <string.h>   /* for memmove */
64#include <limits.h>   /* for CHAR_BIT */
65
66#include "flint_io.h"
67#include "flint_table.h"
68#include "flint_btreeutil.h"
69#include "flint_btreebase.h"
70#include "flint_cursor.h"
71#include "flint_utils.h"
72
73#include "omassert.h"
74#include "omdebug.h"
75#include <xapian/error.h>
76#include "utils.h"
77
78#include <algorithm>  // for std::min()
79#include <string>
80#include <vector>
81
82using namespace std;
83
84// Only try to compress tags longer than this many bytes.
85const size_t COMPRESS_MIN = 4;
86
87//#define BTREE_DEBUG_FULL 1
88#undef BTREE_DEBUG_FULL
89
90#ifdef BTREE_DEBUG_FULL
91/*------debugging aids from here--------*/
92
93static void print_key(const byte * p, int c, int j);
94static void print_tag(const byte * p, int c, int j);
95
96/*
97static void report_cursor(int N, Btree * B, Cursor_ * C)
98{
99    int i;
100    printf("%d)\n", N);
101    for (i = 0; i <= B->level; i++)
102        printf("p=%d, c=%d, n=[%d], rewrite=%d\n",
103                C[i].p, C[i].c, C[i].n, C[i].rewrite);
104}
105*/
106
107/*------to here--------*/
108#endif /* BTREE_DEBUG_FULL */
109
110/* Input/output is defined with calls to the basic Unix system interface: */
111
112static void sys_unlink(const string &filename)
113{
114#ifdef __WIN32__
115    if (msvc_posix_unlink(filename.c_str()) == -1) {
116#else
117    if (unlink(filename) == -1) {
118#endif
119        string message = "Failed to unlink ";
120        message += filename;
121        message += ": ";
122        message += strerror(errno);
123        throw Xapian::DatabaseCorruptError(message);
124    }
125}
126
127static inline byte *zeroed_new(size_t size)
128{
129    byte *temp = new byte[size];
130    // No need to check if temp is NULL, new throws std::bad_alloc if
131    // allocation fails.
132    Assert(temp);
133    memset(temp, 0, size);
134    return temp;
135}
136
137/* A B-tree comprises (a) a base file, containing essential information (Block
138   size, number of the B-tree root block etc), (b) a bitmap, the Nth bit of the
139   bitmap being set if the Nth block of the B-tree file is in use, and (c) a
140   file DB containing the B-tree proper. The DB file is divided into a sequence
141   of equal sized blocks, numbered 0, 1, 2 ... some of which are free, some in
142   use. Those in use are arranged in a tree.
143
144   Each block, b, has a structure like this:
145
146     R L M T D o1 o2 o3 ... oN <gap> [item] .. [item] .. [item] ...
147     <---------- D ----------> <-M->
148
149   And then,
150
151   R = REVISION(b)  is the revision number the B-tree had when the block was
152                    written into the DB file.
153   L = GET_LEVEL(b) is the level of the block, which is the number of levels
154                    towards the root of the B-tree structure. So leaf blocks
155                    have level 0 and the one root block has the highest level
156                    equal to the number of levels in the B-tree.
157   M = MAX_FREE(b)  is the size of the gap between the end of the directory and
158                    the first item of data. (It is not necessarily the maximum
159                    size among the bits of space that are free, but I can't
160                    think of a better name.)
161   T = TOTAL_FREE(b)is the total amount of free space left in b.
162   D = DIR_END(b)   gives the offset to the end of the directory.
163
164   o1, o2 ... oN are a directory of offsets to the N items held in the block.
165   The items are key-tag pairs, and as they occur in the directory are ordered
166   by the keys.
167
168   An item has this form:
169
170           I K key x C tag
171             <--K-->
172           <------I------>
173
174   A long tag presented through the API is split up into C tags small enough to
175   be accommodated in the blocks of the B-tree. The key is extended to include
176   a counter, x, which runs from 1 to C. The key is preceded by a length, K,
177   and the whole item with a length, I, as depicted above.
178
179   Here are the corresponding definitions:
180
181*/
182
183#define REVISION(b)      static_cast<unsigned int>(getint4(b, 0))
184#define GET_LEVEL(b)     getint1(b, 4)
185#define MAX_FREE(b)      getint2(b, 5)
186#define TOTAL_FREE(b)    getint2(b, 7)
187#define DIR_END(b)       getint2(b, 9)
188#define DIR_START        11
189
190#define SET_REVISION(b, x)      setint4(b, 0, x)
191#define SET_LEVEL(b, x)         setint1(b, 4, x)
192#define SET_MAX_FREE(b, x)      setint2(b, 5, x)
193#define SET_TOTAL_FREE(b, x)    setint2(b, 7, x)
194#define SET_DIR_END(b, x)       setint2(b, 9, x)
195
196/** Flip to sequential addition block-splitting after this number of observed
197 *  sequential additions (in negated form). */
198#define SEQ_START_POINT (-10)
199
200/** Even for items of at maximum size, it must be possible to get this number of
201 *  items in a block */
202#define BLOCK_CAPACITY 4
203
204
205
206/* There are two bit maps in bit_map0 and bit_map. The nth bit of bitmap is 0
207   if the nth block is free, otherwise 1. bit_map0 is the initial state of
208   bitmap at the start of the current transaction.
209
210   Note use of the limits.h values:
211   UCHAR_MAX = 255, an unsigned with all bits set, and
212   CHAR_BIT = 8, the number of bits per byte
213
214   BYTE_PAIR_RANGE below is the smallest +ve number that can't be held in two
215   bytes -- 64K effectively.
216*/
217
218#define BYTE_PAIR_RANGE (1 << 2 * CHAR_BIT)
219
220/// read_block(n, p) reads block n of the DB file to address p.
221void
222FlintTable::read_block(uint4 n, byte * p) const
223{
224    // Log the value of p, not the contents of the block it points to...
225    DEBUGCALL(DB, void, "FlintTable::read_block", n << ", " << (void*)p);
226    /* Use the base bit_map_size not the bitmap's size, because
227     * the latter is uninitialised in readonly mode.
228     */
229    Assert(n / CHAR_BIT < base.get_bit_map_size());
230
231#ifdef HAVE_PREAD
232    off_t offset = off_t(block_size) * n;
233    int m = block_size;
234    while (true) {
235        ssize_t bytes_read = pread(handle, reinterpret_cast<char *>(p), m,
236                                   offset);
237        // normal case - read succeeded, so return.
238        if (bytes_read == m) return;
239        if (bytes_read == -1) {
240            if (errno == EINTR) continue;
241            string message = "Error reading block " + om_tostring(n) + ": ";
242            message += strerror(errno);
243            throw Xapian::DatabaseError(message);
244        } else if (bytes_read == 0) {
245            string message = "Error reading block " + om_tostring(n) + ": got end of file";
246            throw Xapian::DatabaseError(message);
247        } else if (bytes_read < m) {
248            /* Read part of the block, which is not an error.  We should
249             * continue reading the rest of the block.
250             */
251            m -= bytes_read;
252            p += bytes_read;
253            offset += bytes_read;
254        }
255    }
256#else
257    if (lseek(handle, off_t(block_size) * n, SEEK_SET) == -1) {
258        string message = "Error seeking to block: ";
259        message += strerror(errno);
260        throw Xapian::DatabaseError(message);
261    }
262
263    flint_io_read(handle, reinterpret_cast<char *>(p), block_size, block_size);
264#endif
265}
266
267/** write_block(n, p) writes block n in the DB file from address p.
268 *  When writing we check to see if the DB file has already been
269 *  modified. If not (so this is the first write) the old base is
270 *  deleted. This prevents the possibility of it being opened
271 *  subsequently as an invalid base.
272 */
273void
274FlintTable::write_block(uint4 n, const byte * p) const
275{
276    DEBUGCALL(DB, void, "FlintTable::write_block", n << ", " << p);
277    Assert(writable);
278    /* Check that n is in range. */
279    Assert(n / CHAR_BIT < base.get_bit_map_size());
280
281    /* don't write to non-free */;
282    AssertParanoid(base.block_free_at_start(n));
283
284    /* write revision is okay */
285    AssertEqParanoid(REVISION(p), latest_revision_number + 1);
286
287    if (both_bases) {
288        // Delete the old base before modifying the database.
289        sys_unlink(name + "base" + other_base_letter());
290        both_bases = false;
291        latest_revision_number = revision_number;
292    }
293
294#ifdef HAVE_PWRITE
295    off_t offset = off_t(block_size) * n;
296    int m = block_size;
297    while (true) {
298        ssize_t bytes_written = pwrite(handle, p, m, offset);
299        if (bytes_written == m) {
300            // normal case - write succeeded, so return.
301            return;
302        } else if (bytes_written == -1) {
303            if (errno == EINTR) continue;
304            string message = "Error writing block: ";
305            message += strerror(errno);
306            throw Xapian::DatabaseError(message);
307        } else if (bytes_written == 0) {
308            string message = "Error writing block: wrote no data";
309            throw Xapian::DatabaseError(message);
310        } else if (bytes_written < m) {
311            /* Wrote part of the block, which is not an error.  We should
312             * continue writing the rest of the block.
313             */
314            m -= bytes_written;
315            p += bytes_written;
316            offset += bytes_written;
317        }
318    }
319#else
320    if (lseek(handle, (off_t)block_size * n, SEEK_SET) == -1) {
321        string message = "Error seeking to block: ";
322        message += strerror(errno);
323        throw Xapian::DatabaseError(message);
324    }
325
326    flint_io_write(handle, reinterpret_cast<const char *>(p), block_size);
327#endif
328}
329
330
331/* A note on cursors:
332
333   Each B-tree level has a corresponding array element C[j] in a
334   cursor, C. C[0] is the leaf (or data) level, and C[B->level] is the
335   root block level. Within a level j,
336
337       C[j].p  addresses the block
338       C[j].c  is the offset into the directory entry in the block
339       C[j].n  is the number of the block at C[j].p
340
341   A look up in the B-tree causes navigation of the blocks starting
342   from the root. In each block, p, we find an offset, c, to an item
343   which gives the number, n, of the block for the next level. This
344   leads to an array of values p,c,n which are held inside the cursor.
345
346   Any Btree object B has a built-in cursor, at B->C. But other cursors may
347   be created.  If BC is a created cursor, BC->C is the cursor in the
348   sense given above, and BC->B is the handle for the B-tree again.
349*/
350
351
352void
353FlintTable::set_overwritten() const
354{
355    DEBUGCALL(DB, void, "FlintTable::set_overwritten", "");
356    // If we're writable, there shouldn't be another writer who could cause
357    // overwritten to be flagged, so that's a DatabaseCorruptError.
358    if (writable)
359        throw Xapian::DatabaseCorruptError("Db block overwritten - are there multiple writers?");
360    throw Xapian::DatabaseModifiedError("The revision being read has been discarded - you should call Xapian::Database::reopen() and retry the operation");
361}
362
363/* block_to_cursor(C, j, n) puts block n into position C[j] of cursor
364   C, writing the block currently at C[j] back to disk if necessary.
365   Note that
366
367       C[j].rewrite
368
369   is true iff C[j].n is different from block n in file DB. If it is
370   false no rewriting is necessary.
371*/
372
373void
374FlintTable::block_to_cursor(Cursor_ * C_, int j, uint4 n) const
375{
376    DEBUGCALL(DB, void, "FlintTable::block_to_cursor", (void*)C_ << ", " << j << ", " << n);
377    if (n == C_[j].n) return;
378    byte * p = C_[j].p;
379    Assert(p);
380
381    // FIXME: only needs to be done in write mode
382    if (C_[j].rewrite) {
383        Assert(writable);
384        Assert(C == C_);
385        write_block(C_[j].n, p);
386        C_[j].rewrite = false;
387    }
388    // Check if the block is in the built-in cursor (potentially in
389    // modified form).
390    if (writable && n == C[j].n) {
391        memcpy(p, C[j].p, block_size);
392    } else {
393        read_block(n, p);
394    }
395
396    C_[j].n = n;
397    if (j < level) {
398        /* unsigned comparison */
399        if (REVISION(p) > REVISION(C_[j + 1].p)) {
400            set_overwritten();
401            return;
402        }
403    }
404    AssertEq(j, GET_LEVEL(p));
405}
406
407/** Btree::alter(); is called when the B-tree is to be altered.
408
409   It causes new blocks to be forced for the current set of blocks in
410   the cursor.
411
412   The point is that if a block at level 0 is to be altered it may get
413   a new number. Then the pointer to this block from level 1 will need
414   changing. So the block at level 1 needs altering and may get a new
415   block number. Then the pointer to this block from level 2 will need
416   changing ... and so on back to the root.
417
418   The clever bit here is spotting the cases when we can make an early
419   exit from this process. If C[j].rewrite is true, C[j+k].rewrite
420   will be true for k = 1,2 ... We have been through all this before,
421   and there is no need to do it again. If C[j].n was free at the
422   start of the transaction, we can copy it back to the same place
423   without violating the integrity of the B-tree. We don't then need a
424   new n and can return. The corresponding C[j].rewrite may be true or
425   false in that case.
426*/
427
428void
429FlintTable::alter()
430{
431    DEBUGCALL(DB, void, "FlintTable::alter", "");
432    Assert(writable);
433#ifdef DANGEROUS
434    C[0].rewrite = true;
435#else
436    int j = 0;
437    byte * p = C[j].p;
438    while (true) {
439        if (C[j].rewrite) return; /* all new, so return */
440        C[j].rewrite = true;
441
442        uint4 n = C[j].n;
443        if (base.block_free_at_start(n)) {
444            Assert(REVISION(p) == latest_revision_number + 1);
445            return;
446        }
447        Assert(REVISION(p) < latest_revision_number + 1);
448        base.free_block(n);
449        n = base.next_free_block();
450        C[j].n = n;
451        SET_REVISION(p, latest_revision_number + 1);
452
453        if (j == level) return;
454        j++;
455        p = C[j].p;
456        Item_wr_(p, C[j].c).set_block_given_by(n);
457    }
458#endif
459}
460
461/** find_in_block(p, key, leaf, c) searches for the key in the block at p.
462
463   leaf is true for a data block, and false for an index block (when the
464   first key is dummy and never needs to be tested). What we get is the
465   directory entry to the last key <= the key being searched for.
466
467   The lookup is by binary chop, with i and j set to the left and
468   right ends of the search area. In sequential addition, c will often
469   be the answer, so we test the keys round c and move i and j towards
470   c if possible.
471*/
472
473int FlintTable::find_in_block(const byte * p, Key_ key, bool leaf, int c)
474{
475    DEBUGCALL_STATIC(DB, int, "FlintTable::find_in_block", reinterpret_cast<const void*>(p) << ", " << reinterpret_cast<const void*>(key.get_address()) << ", " << leaf << ", " << c);
476    int i = DIR_START;
477    if (leaf) i -= D2;
478    int j = DIR_END(p);
479
480    if (c != -1) {
481        if (c < j && i < c && Item_(p, c).key() <= key)
482            i = c;
483        c += D2;
484        if (c < j && i < c && key < Item_(p, c).key())
485            j = c;
486    }
487
488    while (j - i > D2) {
489        int k = i + ((j - i)/(D2 * 2))*D2; /* mid way */
490        if (key < Item_(p, k).key()) j = k; else i = k;
491    }
492    RETURN(i);
493}
494
495/** find(C_) searches for the key of B->kt in the B-tree.
496
497   Result is true if found, false otherwise.  When false, the B_tree
498   cursor is positioned at the last key in the B-tree <= the search
499   key.  Goes to first (null) item in B-tree when key length == 0.
500*/
501
502bool
503FlintTable::find(Cursor_ * C_) const
504{
505    DEBUGCALL(DB, bool, "FlintTable::find", (void*)C_);
506    // Note: the parameter is needed when we're called by FlintCursor
507    const byte * p;
508    int c;
509    Key_ key = kt.key();
510    for (int j = level; j > 0; --j) {
511        p = C_[j].p;
512        c = find_in_block(p, key, false, C_[j].c);
513#ifdef BTREE_DEBUG_FULL
514        printf("Block in FlintTable:find - code position 1");
515        report_block_full(j, C_[j].n, p);
516#endif /* BTREE_DEBUG_FULL */
517        C_[j].c = c;
518        block_to_cursor(C_, j - 1, Item_(p, c).block_given_by());
519    }
520    p = C_[0].p;
521    c = find_in_block(p, key, true, C_[0].c);
522#ifdef BTREE_DEBUG_FULL
523    printf("Block in FlintTable:find - code position 2");
524    report_block_full(0, C_[0].n, p);
525#endif /* BTREE_DEBUG_FULL */
526    C_[0].c = c;
527    if (c < DIR_START) RETURN(false);
528    RETURN(Item_(p, c).key() == key);
529}
530
531/** compact(p) compact the block at p by shuffling all the items up to the end.
532
533   MAX_FREE(p) is then maximized, and is equal to TOTAL_FREE(p).
534*/
535
536void
537FlintTable::compact(byte * p)
538{
539    DEBUGCALL(DB, void, "FlintTable::compact", (void*)p);
540    Assert(writable);
541    int e = block_size;
542    byte * b = buffer;
543    int dir_end = DIR_END(p);
544    for (int c = DIR_START; c < dir_end; c += D2) {
545        Item_ item(p, c);
546        int l = item.size();
547        e -= l;
548        memmove(b + e, item.get_address(), l);
549        setD(p, c, e);  /* reform in b */
550    }
551    memmove(p + e, b + e, block_size - e);  /* copy back */
552    e -= dir_end;
553    SET_TOTAL_FREE(p, e);
554    SET_MAX_FREE(p, e);
555}
556
557/** Btree needs to gain a new level to insert more items: so split root block
558 *  and construct a new one.
559 */
560void
561FlintTable::split_root(uint4 split_n)
562{
563    DEBUGCALL(DB, void, "FlintTable::split_root", split_n);
564    /* gain a level */
565    ++level;
566
567    /* check level overflow - this isn't something that should ever happen
568     * but deserves more than an Assert()... */
569    if (level == BTREE_CURSOR_LEVELS) {
570        throw Xapian::DatabaseCorruptError("Btree has grown impossibly large ("STRINGIZE(BTREE_CURSOR_LEVELS)" levels)");
571    }
572
573    byte * q = zeroed_new(block_size);
574    C[level].p = q;
575    C[level].c = DIR_START;
576    C[level].n = base.next_free_block();
577    C[level].rewrite = true;
578    SET_REVISION(q, latest_revision_number + 1);
579    SET_LEVEL(q, level);
580    SET_DIR_END(q, DIR_START);
581    compact(q);   /* to reset TOTAL_FREE, MAX_FREE */
582
583    /* form a null key in b with a pointer to the old root */
584    byte b[10]; /* 7 is exact */
585    Item_wr_ item(b);
586    item.form_null_key(split_n);
587    add_item(item, level);
588}
589
590/** enter_key(j, prevkey, newkey) is called after a block split.
591
592   It enters in the block at level C[j] a separating key for the block
593   at level C[j - 1]. The key itself is newkey. prevkey is the
594   preceding key, and at level 1 newkey can be trimmed down to the
595   first point of difference to prevkey for entry in C[j].
596
597   This code looks longer than it really is. If j exceeds the number
598   of B-tree levels the root block has split and we have to construct
599   a new one, but this is a rare event.
600
601   The key is constructed in b, with block number C[j - 1].n as tag,
602   and this is added in with add_item. add_item may itself cause a
603   block split, with a further call to enter_key. Hence the recursion.
604*/
605void
606FlintTable::enter_key(int j, Key_ prevkey, Key_ newkey)
607{
608    Assert(writable);
609    Assert(prevkey < newkey);
610    Assert(j >= 1);
611
612    uint4 blocknumber = C[j - 1].n;
613
614    // FIXME update to use Key_
615    // Keys are truncated here: but don't truncate the count at the end away.
616    const int newkey_len = newkey.length();
617    int i;
618
619    if (j == 1) {
620        // Truncate the key to the minimal key which differs from prevkey,
621        // the preceding key in the block.
622        i = 0;
623        const int min_len = min(newkey_len, prevkey.length());
624        while (i < min_len && prevkey[i] == newkey[i]) {
625            i++;
626        }
627
628        // Want one byte of difference.
629        if (i < newkey_len) i++;
630    } else {
631        /* Can't truncate between branch levels, since the separated keys
632         * are in at the leaf level, and truncating again will change the
633         * branch point.
634         */
635        i = newkey_len;
636    }
637
638    byte b[UCHAR_MAX + 6];
639    Item_wr_ item(b);
640    Assert(i <= 256 - I2 - C2);
641    Assert(i <= (int)sizeof(b) - I2 - C2 - 4);
642    item.set_key_and_block(newkey, i, blocknumber);
643
644    // When j > 1 we can make the first key of block p null.  This is probably
645    // worthwhile as it trades a small amount of CPU and RAM use for a small
646    // saving in disk use.  Other redundant keys will still creep in though.
647    if (j > 1) {
648        byte * p = C[j - 1].p;
649        uint4 n = getint4(newkey.get_address(), newkey_len + K1 + C2);
650        int new_total_free = TOTAL_FREE(p) + newkey_len + C2;
651        // FIXME: incredibly icky going from key to item like this...
652        Item_wr_(const_cast<byte*>(newkey.get_address()) - I2).form_null_key(n);
653        SET_TOTAL_FREE(p, new_total_free);
654    }
655
656    C[j].c = find_in_block(C[j].p, item.key(), false, 0) + D2;
657    C[j].rewrite = true; /* a subtle point: this *is* required. */
658    add_item(item, j);
659}
660
661/** mid_point(p) finds the directory entry in c that determines the
662   approximate mid point of the data in the block at p.
663 */
664
665int
666FlintTable::mid_point(byte * p)
667{
668    int n = 0;
669    int dir_end = DIR_END(p);
670    int size = block_size - TOTAL_FREE(p) - dir_end;
671    for (int c = DIR_START; c < dir_end; c += D2) {
672        int l = Item_(p, c).size();
673        n += 2 * l;
674        if (n >= size) {
675            if (l < n - size) return c;
676            return c + D2;
677        }
678    }
679
680    /* falling out of mid_point */
681    Assert(false);
682    return 0; /* Stop compiler complaining about end of method. */
683}
684
685/** add_item_to_block(p, kt_, c) adds item kt_ to the block at p.
686
687   c is the offset in the directory that needs to be expanded to
688   accommodate the new entry for the item. We know before this is
689   called that there is enough room, so it's just a matter of byte
690   shuffling.
691*/
692
693void
694FlintTable::add_item_to_block(byte * p, Item_wr_ kt_, int c)
695{
696    Assert(writable);
697    int dir_end = DIR_END(p);
698    int kt_len = kt_.size();
699    int needed = kt_len + D2;
700    int new_total = TOTAL_FREE(p) - needed;
701    int new_max = MAX_FREE(p) - needed;
702
703    Assert(new_total >= 0);
704
705    if (new_max < 0) {
706        compact(p);
707        new_max = MAX_FREE(p) - needed;
708        Assert(new_max >= 0);
709    }
710    Assert(dir_end >= c);
711
712    memmove(p + c + D2, p + c, dir_end - c);
713    dir_end += D2;
714    SET_DIR_END(p, dir_end);
715
716    int o = dir_end + new_max;
717    setD(p, c, o);
718    memmove(p + o, kt_.get_address(), kt_len);
719
720    SET_MAX_FREE(p, new_max);
721
722    SET_TOTAL_FREE(p, new_total);
723}
724
725/** FlintTable::add_item(kt_, j) adds item kt_ to the block at cursor level C[j].
726 *
727 *  If there is not enough room the block splits and the item is then
728 *  added to the appropriate half.
729 */
730void
731FlintTable::add_item(Item_wr_ kt_, int j)
732{
733    Assert(writable);
734    byte * p = C[j].p;
735    int c = C[j].c;
736    uint4 n;
737
738    int needed = kt_.size() + D2;
739    if (TOTAL_FREE(p) < needed) {
740        int m;
741        // Prepare to split p. After splitting, the block is in two halves, the
742        // lower half is split_p, the upper half p again. add_to_upper_half
743        // becomes true when the item gets added to p, false when it gets added
744        // to split_p.
745
746        if (seq_count < 0) {
747            // If we're not in sequential mode, we split at the mid point
748            // of the node.
749            m = mid_point(p);
750        } else {
751            // During sequential addition, split at the insert point
752            m = c;
753        }
754
755        uint4 split_n = C[j].n;
756        C[j].n = base.next_free_block();
757
758        memcpy(split_p, p, block_size);  // replicate the whole block in split_p
759        SET_DIR_END(split_p, m);
760        compact(split_p);      /* to reset TOTAL_FREE, MAX_FREE */
761
762        {
763            int residue = DIR_END(p) - m;
764            int new_dir_end = DIR_START + residue;
765            memmove(p + DIR_START, p + m, residue);
766            SET_DIR_END(p, new_dir_end);
767        }
768
769        compact(p);      /* to reset TOTAL_FREE, MAX_FREE */
770
771        bool add_to_upper_half;
772        if (seq_count < 0) {
773            add_to_upper_half = (c >= m);
774        } else {
775            // And add item to lower half if split_p has room, otherwise upper
776            // half
777            add_to_upper_half = (TOTAL_FREE(split_p) < needed);
778        }
779
780        if (add_to_upper_half) {
781            c -= (m - DIR_START);
782            Assert(seq_count < 0 || c <= DIR_START + D2);
783            Assert(c >= DIR_START);
784            Assert(c <= DIR_END(p));
785            add_item_to_block(p, kt_, c);
786            n = C[j].n;
787        } else {
788            Assert(c >= DIR_START);
789            Assert(c <= DIR_END(split_p));
790            add_item_to_block(split_p, kt_, c);
791            n = split_n;
792        }
793        write_block(split_n, split_p);
794
795        // Check if we're splitting the root block.
796        if (j == level) split_root(split_n);
797
798        /* Enter a separating key at level j + 1 between */
799        /* the last key of block split_p, and the first key of block p */
800        enter_key(j + 1,
801                  Item_(split_p, DIR_END(split_p) - D2).key(),
802                  Item_(p, DIR_START).key());
803    } else {
804        add_item_to_block(p, kt_, c);
805        n = C[j].n;
806    }
807    if (j == 0) {
808        changed_n = n;
809        changed_c = c;
810    }
811}
812
813/** FlintTable::delete_item(j, repeatedly) is (almost) the converse of add_item.
814 *
815 * If repeatedly is true, the process repeats at the next level when a
816 * block has been completely emptied, freeing the block and taking out
817 * the pointer to it.  Emptied root blocks are also removed, which
818 * reduces the number of levels in the B-tree.
819 */
820void
821FlintTable::delete_item(int j, bool repeatedly)
822{
823    Assert(writable);
824    byte * p = C[j].p;
825    int c = C[j].c;
826    int kt_len = Item_(p, c).size(); /* size of the item to be deleted */
827    int dir_end = DIR_END(p) - D2;   /* directory length will go down by 2 bytes */
828
829    memmove(p + c, p + c + D2, dir_end - c);
830    SET_DIR_END(p, dir_end);
831    SET_MAX_FREE(p, MAX_FREE(p) + D2);
832    SET_TOTAL_FREE(p, TOTAL_FREE(p) + kt_len + D2);
833
834    if (!repeatedly) return;
835    if (j < level) {
836        if (dir_end == DIR_START) {
837            base.free_block(C[j].n);
838            C[j].rewrite = false;
839            C[j].n = BLK_UNUSED;
840            C[j + 1].rewrite = true;  /* *is* necessary */
841            delete_item(j + 1, true);
842        }
843    } else {
844        Assert(j == level);
845        while (dir_end == DIR_START + D2 && level > 0) {
846            /* single item in the root block, so lose a level */
847            uint4 new_root = Item_(p, DIR_START).block_given_by();
848            delete [] p;
849            C[level].p = 0;
850            base.free_block(C[level].n);
851            C[level].rewrite = false;
852            C[level].n = BLK_UNUSED;
853            level--;
854
855            block_to_cursor(C, level, new_root);
856
857            p = C[level].p;
858            dir_end = DIR_END(p); /* prepare for the loop */
859        }
860    }
861}
862
863/* debugging aid:
864static addcount = 0;
865*/
866
867/** add_kt(found) adds the item (key-tag pair) at B->kt into the
868   B-tree, using cursor C.
869
870   found == find() is handed over as a parameter from Btree::add.
871   Btree::alter() prepares for the alteration to the B-tree. Then
872   there are a number of cases to consider:
873
874     If an item with the same key is in the B-tree (found is true),
875     the new kt replaces it.
876
877     If then kt is smaller, or the same size as, the item it replaces,
878     kt is put in the same place as the item it replaces, and the
879     TOTAL_FREE measure is reduced.
880
881     If kt is larger than the item it replaces it is put in the
882     MAX_FREE space if there is room, and the directory entry and
883     space counts are adjusted accordingly.
884
885     - But if there is not room we do it the long way: the old item is
886     deleted with delete_item and kt is added in with add_item.
887
888     If the key of kt is not in the B-tree (found is false), the new
889     kt is added in with add_item.
890*/
891
892int
893FlintTable::add_kt(bool found)
894{
895    Assert(writable);
896    int components = 0;
897
898    /*
899    {
900        printf("%d) %s ", addcount++, (found ? "replacing" : "adding"));
901        print_bytes(kt[I2] - K