Ticket #348: compressed_cc.diff

File compressed_cc.diff, 16.8 KB (added by Dan, 12 years ago)
  • xapian-core/backends/brass/brass_databasereplicator.cc

    diff --git a/xapian-core/backends/brass/brass_databasereplicator.cc b/xapian-core/backends/brass/brass_databasereplicator.cc
    index 002df61..a8b1875 100644
    a b  
    2020 * USA
    2121 */
    2222
     23#include <iostream>
    2324#include <config.h>
    2425
    2526#include "brass_databasereplicator.h"
     
    3435#include "debuglog.h"
    3536#include "fd.h"
    3637#include "filetests.h"
     38#include "internaltypes.h"
    3739#include "io_utils.h"
    3840#include "pack.h"
    3941#include "net/remoteconnection.h"
     
    4244#include "str.h"
    4345#include "stringutils.h"
    4446
     47
    4548#ifdef __WIN32__
    4649# include "msvc_posix_wrapper.h"
    4750#endif
    using namespace std;  
    5255using namespace Xapian;
    5356
    5457BrassDatabaseReplicator::BrassDatabaseReplicator(const string & db_dir_)
    55         : db_dir(db_dir_)
     58    : db_dir(db_dir_),
     59      comp_stream(Z_DEFAULT_STRATEGY)
    5660{
    5761}
    5862
    BrassDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename  
    191195    }
    192196    {
    193197        FD closer(fd);
    194 
     198        Bytef out[8192];
    195199        while (true) {
    196200            conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
    197201            ptr = buf.data();
    198202            end = ptr + buf.size();
    199 
    200203            uint4 block_number;
    201204            if (!unpack_uint(&ptr, end, &block_number))
    202205                throw NetworkError("Invalid block number in changeset");
    BrassDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename  
    205208                break;
    206209            --block_number;
    207210
    208             conn.get_message_chunk(buf, changeset_blocksize, end_time);
     211            conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time);
     212            ptr = buf.data();
     213            end = ptr + buf.size();
     214            unsigned int compressed_block_size;
     215            if(!unpack_uint(&ptr, end, &compressed_block_size))
     216                throw NetworkError("Invalid v2 cphangeset");
     217            buf.erase(0, ptr - buf.data());
     218
     219            if (compressed_block_size > 0) {
     220                string ubuf;
     221                ubuf.reserve(changeset_blocksize);
     222                conn.get_message_chunk(buf, changeset_blocksize, end_time);
     223                comp_stream.lazy_alloc_inflate_zstream();
     224                comp_stream.inflate_zstream->next_in = (Bytef*)const_cast<char *>(buf.data());
     225                comp_stream.inflate_zstream->avail_in = (uInt)(buf.size());
     226                comp_stream.inflate_zstream->next_out = out;
     227                comp_stream.inflate_zstream->avail_out = (uInt)sizeof(out);
     228                int err = inflate(comp_stream.inflate_zstream, Z_FINISH);
     229                if (err == Z_STREAM_END) {
     230                    std::cout << comp_stream.inflate_zstream->next_out - out << std::endl;
     231                    std::cout << out << std::endl;
     232                    ubuf.append(reinterpret_cast<const char *>(out),
     233                                comp_stream.inflate_zstream->next_out - out);
     234                    swap(buf, ubuf);
     235                }
     236            } else {
     237                conn.get_message_chunk(buf, changeset_blocksize, end_time);
     238            }
    209239            if (buf.size() < changeset_blocksize)
    210240                throw NetworkError("Incomplete block in changeset");
    211241
    BrassDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,  
    255285    buf.erase(0, 12);
    256286    const char *ptr = buf.data();
    257287    const char *end = ptr + buf.size();
    258 
    259288    unsigned int changes_version;
    260289    if (!unpack_uint(&ptr, end, &changes_version))
    261290        throw NetworkError("Couldn't read a valid version number from changeset");
  • xapian-core/backends/brass/brass_databasereplicator.h

    diff --git a/xapian-core/backends/brass/brass_databasereplicator.h b/xapian-core/backends/brass/brass_databasereplicator.h
    index 3176589..915fb1f 100644
    a b  
    2424#define XAPIAN_INCLUDED_BRASS_DATABASEREPLICATOR_H
    2525
    2626#include "backends/databasereplicator.h"
     27#include "zlib_utils.h"
    2728
    2829class BrassDatabaseReplicator : public Xapian::DatabaseReplicator {
    2930    private:
    class BrassDatabaseReplicator : public Xapian::DatabaseReplicator {  
    3132         */
    3233        std::string db_dir;
    3334
     35        CompressionStream comp_stream;
     36
    3437        /** Process a chunk which holds a base block.
    3538         */
    3639        void process_changeset_chunk_base(const std::string & tablename,
  • xapian-core/backends/brass/brass_replicate_internal.h

    diff --git a/xapian-core/backends/brass/brass_replicate_internal.h b/xapian-core/backends/brass/brass_replicate_internal.h
    index d108710..24d140d 100644
    a b  
    2828
    2929// The current version of changeset files.
    3030// 1  - initial implementation
    31 #define CHANGES_VERSION 1u
     31// 2  - compressed changesets
     32#define CHANGES_VERSION 2u
    3233
    3334// Must be big enough to ensure that the start of the changeset (up to the new
    3435// revision number) will fit in this much space.
  • xapian-core/backends/brass/brass_table.cc

    diff --git a/xapian-core/backends/brass/brass_table.cc b/xapian-core/backends/brass/brass_table.cc
    index f6c9527..e77312b 100644
    a b BrassTable::write_changed_blocks(int changes_fd, bool compressed)  
    18181818    pack_string(buf, tablename);
    18191819    pack_uint(buf, block_size);
    18201820
    1821     if (compressed) {
    1822         comp_stream.lazy_alloc_deflate_zstream();
    1823 
    1824         comp_stream.deflate_zstream->next_in = (Bytef *)const_cast<char *>(buf.data());
    1825         comp_stream.deflate_zstream->avail_in = (uInt)buf.size();
    1826         int err = deflate(comp_stream.deflate_zstream, Z_FINISH);
    1827         if (err == Z_STREAM_END) {
    1828             io_write(changes_fd, reinterpret_cast<const char *>(buf.data()),
    1829                      comp_stream.deflate_zstream->total_out);
    1830         } else {
    1831             // The deflate failed, try to write data uncompressed
    1832             io_write(changes_fd, buf.data(), buf.size());
    1833         }
    1834     }
    1835     else {
    1836         io_write(changes_fd, buf.data(), buf.size());
    1837     }
     1821    // Write the table name and block size to the file
     1822    io_write(changes_fd, buf.data(), buf.size());
    18381823       
    18391824    // Compare the old and new bitmaps to find blocks which have changed, and
    18401825    // write them to the file descriptor.
    BrassTable::write_changed_blocks(int changes_fd, bool compressed)  
    18451830        while (base.find_changed_block(&n)) {
    18461831            buf.resize(0);
    18471832            pack_uint(buf, n + 1);
    1848             if (compressed) {
    1849                 comp_stream.lazy_alloc_deflate_zstream();
    1850 
    1851                 comp_stream.deflate_zstream->next_in = (Bytef *)const_cast<char *>(buf.data());
    1852                 comp_stream.deflate_zstream->avail_in = (uInt)buf.size();
    1853                 int err = deflate(comp_stream.deflate_zstream, Z_FINISH);
    1854                 if (err == Z_STREAM_END) {
    1855                     io_write(changes_fd, reinterpret_cast<const char *>(buf.data()),
    1856                              comp_stream.deflate_zstream->total_out);
    1857                 } else {
    1858                     // The deflate failed, try to write data uncompressed
    1859                     io_write(changes_fd, buf.data(), buf.size());
    1860                 }
    1861             }
    1862             else {
    1863                 io_write(changes_fd, buf.data(), buf.size());
    1864             }
     1833            // Write the block number to the file
     1834            io_write(changes_fd, buf.data(), buf.size());
    18651835
    18661836            // Read block n.
    18671837            read_block(n, p);
    BrassTable::write_changed_blocks(int changes_fd, bool compressed)  
    18691839            // Write block n to the file.
    18701840            if (compressed) {
    18711841                comp_stream.lazy_alloc_deflate_zstream();
    1872 
    1873                 comp_stream.deflate_zstream->next_in = (Bytef *)(p);
    1874                 comp_stream.deflate_zstream->avail_in = (uInt)block_size;
    1875                 int err = deflate(comp_stream.deflate_zstream, Z_FINISH);
    1876                 if (err == Z_STREAM_END) {
    1877                     io_write(changes_fd, reinterpret_cast<const char *>(p),
     1842                comp_stream.compress(p, block_size);
     1843                if (comp_stream.zerr == Z_STREAM_END) {
     1844                    buf.resize(0);
     1845                    pack_uint(buf, comp_stream.deflate_zstream->total_out);
     1846                    io_write(changes_fd, buf.data(), buf.size());
     1847                    io_write(changes_fd, reinterpret_cast<const char *>(comp_stream.out),
    18781848                             comp_stream.deflate_zstream->total_out);
    18791849                } else {
    18801850                    // The deflate failed, try to write data uncompressed
     1851                    buf.resize(0);
     1852                    pack_uint(buf, 0u);
     1853                    io_write(changes_fd, buf.data(), buf.size());
    18811854                    io_write(changes_fd, reinterpret_cast<const char *>(p), block_size);
    18821855                }
    18831856            }
    18841857            else {
     1858                buf.resize(0);
     1859                pack_uint(buf, 0u);
     1860                io_write(changes_fd, buf.data(), buf.size());
    18851861                io_write(changes_fd, reinterpret_cast<const char *>(p), block_size);
    18861862            }
    18871863            ++n;
    BrassTable::write_changed_blocks(int changes_fd, bool compressed)  
    18941870    }
    18951871    buf.resize(0);
    18961872    pack_uint(buf, 0u);
    1897 
    1898     if (compressed) {
    1899         comp_stream.lazy_alloc_deflate_zstream();
    1900 
    1901         comp_stream.deflate_zstream->next_in = (Bytef *)const_cast<char *>(buf.data());
    1902         comp_stream.deflate_zstream->avail_in = (uInt)buf.size();
    1903         int err = deflate(comp_stream.deflate_zstream, Z_FINISH);
    1904         if (err == Z_STREAM_END) {
    1905             io_write(changes_fd, reinterpret_cast<const char *>(buf.data()),
    1906                      comp_stream.deflate_zstream->total_out);
    1907         } else {
    1908             // The deflate failed, try to write data uncompressed
    1909             io_write(changes_fd, buf.data(), buf.size());
    1910         }
    1911     }
    1912     else {
    1913         io_write(changes_fd, buf.data(), buf.size());
    1914     }
     1873    // Write 0 for end of blocks
     1874    io_write(changes_fd, buf.data(), buf.size());
    19151875}
    19161876
    19171877void
  • xapian-core/backends/brass/brass_table.h

    diff --git a/xapian-core/backends/brass/brass_table.h b/xapian-core/backends/brass/brass_table.h
    index 24d3f2a..a51df2f 100644
    a b  
    4141#include <algorithm>
    4242#include <string>
    4343
    44 #include <zlib.h>
    45 
    4644#define DONT_COMPRESS -1
    4745
    4846/** Even for items of at maximum size, it must be possible to get this number of
  • xapian-core/common/Makefile.mk

    diff --git a/xapian-core/common/Makefile.mk b/xapian-core/common/Makefile.mk
    index afd3e14..0211f8a 100644
    a b noinst_HEADERS +=\  
    3838        common/stringutils.h\
    3939        common/submatch.h\
    4040        common/unaligned.h\
    41         common/utils.h\
    42         common/valuelist.h\
    43         common/valuestats.h\
    44         common/vectortermlist.h\
    45         common/weightinternal.h\
    4641        common/zlib_utils.h
    4742        common/unaligned.h
    4843
    lib_src +=\  
    6762        common/serialise-double.cc\
    6863        common/socket_utils.cc\
    6964        common/str.cc\
    70         common/stringutils.cc
     65        common/stringutils.cc\
    7166        common/zlib_utils.cc
    7267
    7368
  • xapian-core/common/zlib_utils.cc

    diff --git a/xapian-core/common/zlib_utils.cc b/xapian-core/common/zlib_utils.cc
    index 4f6af90..8761f48 100644
    a b  
    66
    77CompressionStream::CompressionStream(int compress_strategy_)
    88    : compress_strategy(compress_strategy_),
     9      zerr(0),
     10      out_len(0),
     11      out(NULL),
    912      deflate_zstream(NULL),
    1013      inflate_zstream(NULL)
    1114{
    CompressionStream::~CompressionStream() {  
    2629        (void) inflateEnd(inflate_zstream);
    2730        delete inflate_zstream;
    2831    }
     32   
     33    if (out) {
     34        delete [] out;
     35    }
     36}
     37
     38
     39void
     40CompressionStream::compress(string & buf) {
     41    out_len = buf.size() - 1;
     42    out = new unsigned char[out_len];
     43    deflate_zstream->avail_in = (uInt)buf.size();
     44    deflate_zstream->next_in = (Bytef *)const_cast<char *>(buf.data());
     45    deflate_zstream->next_out = out;
     46    deflate_zstream->avail_out = (uInt)out_len;
     47    zerr = deflate(deflate_zstream, Z_FINISH);
    2948}
    3049
     50
     51void
     52CompressionStream::compress(byte * buf, int size) {
     53    out_len = size - 1;
     54    out = new unsigned char[out_len];
     55    deflate_zstream->avail_in = (uInt)size;
     56    deflate_zstream->next_in = (Bytef *)(buf);
     57    deflate_zstream->next_out = out;
     58    deflate_zstream->avail_out = (uInt)out_len;
     59    zerr = deflate(deflate_zstream, Z_FINISH);
     60}
     61
     62// void
     63// CompressionStream::decompress(string & buf) {
     64//     inflate_zstream->next_in = (Bytef*)const_cast<char *>(tag->data());
     65//     inflate_zstream->avail_in = (uInt)tag->size();
     66//     int err = Z_OK;
     67//     while (err != Z_STREAM_END) {
     68//      comp_stream.inflate_zstream->next_out = buf;
     69//      comp_stream.inflate_zstream->avail_out = (uInt)sizeof(buf);
     70//      err = inflate(comp_stream.inflate_zstream, Z_SYNC_FLUSH);
     71//      if (err == Z_BUF_ERROR && comp_stream.inflate_zstream->avail_in == 0) {
     72//          LOGLINE(DB, "Z_BUF_ERROR - faking checksum of " << comp_stream.inflate_zstream->adler);
     73//          Bytef header2[4];
     74//          setint4(header2, 0, comp_stream.inflate_zstream->adler);
     75//          comp_stream.inflate_zstream->next_in = header2;
     76//          comp_stream.inflate_zstream->avail_in = 4;
     77//          err = inflate(comp_stream.inflate_zstream, Z_SYNC_FLUSH);
     78//          if (err == Z_STREAM_END) break;
     79//      }
     80
     81//      if (err != Z_OK && err != Z_STREAM_END) {
     82//          if (err == Z_MEM_ERROR) throw std::bad_alloc();
     83//          string msg = "inflate failed";
     84//          if (comp_stream.inflate_zstream->msg) {
     85//              msg += " (";
     86//              msg += comp_stream.inflate_zstream->msg;
     87//              msg += ')';
     88//          }
     89//          throw Xapian::DatabaseError(msg);
     90//      }
     91
     92//      utag.append(reinterpret_cast<const char *>(buf),
     93//                  comp_stream.inflate_zstream->next_out - buf);
     94//     }
     95//     if (utag.size() != comp_stream.inflate_zstream->total_out) {
     96//      string msg = "compressed tag didn't expand to the expected size: ";
     97//      msg += str(utag.size());
     98//      msg += " != ";
     99//      // OpenBSD's zlib.h uses off_t instead of uLong for total_out.
     100//      msg += str((size_t)comp_stream.inflate_zstream->total_out);
     101//      throw Xapian::DatabaseCorruptError(msg);
     102//     }
     103// }
     104
     105
    31106void
    32107CompressionStream::lazy_alloc_deflate_zstream() const {
    33108    if (usual(deflate_zstream)) {
    CompressionStream::lazy_alloc_inflate_zstream() const {  
    102177        throw Xapian::DatabaseError(msg);
    103178    }
    104179}
    105 
    106 // void
    107 // BrassTable::lazy_alloc_deflate_zstream() const {
    108 //     if (usual(deflate_zstream)) {
    109 //      if (usual(deflateReset(deflate_zstream) == Z_OK)) return;
    110 //      // Try to recover by deleting the stream and starting from scratch.
    111 //      delete deflate_zstream;
    112 //     }
    113 
    114 //     deflate_zstream = new z_stream;
    115 
    116 //     deflate_zstream->zalloc = reinterpret_cast<alloc_func>(0);
    117 //     deflate_zstream->zfree = reinterpret_cast<free_func>(0);
    118 //     deflate_zstream->opaque = (voidpf)0;
    119 
    120 //     // -15 means raw deflate with 32K LZ77 window (largest)
    121 //     // memLevel 9 is the highest (8 is default)
    122 //     int err;
    123 //     err = deflateInit2(deflate_zstream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
    124 //                     -15, 9, compress_strategy);
    125 //     if (rare(err != Z_OK)) {
    126 //      if (err == Z_MEM_ERROR) {
    127 //          delete deflate_zstream;
    128 //          deflate_zstream = 0;
    129 //          throw std::bad_alloc();
    130 //      }
    131 //      string msg = "deflateInit2 failed (";
    132 //      if (deflate_zstream->msg) {
    133 //          msg += deflate_zstream->msg;
    134 //      } else {
    135 //          msg += str(err);
    136 //      }
    137 //      msg += ')';
    138 //      delete deflate_zstream;
    139 //      deflate_zstream = 0;
    140 //      throw Xapian::DatabaseError(msg);
    141 //     }
    142 // }
    143 
    144 // void
    145 // BrassTable::lazy_alloc_inflate_zstream() const {
    146 //     if (usual(inflate_zstream)) {
    147 //      if (usual(inflateReset(inflate_zstream) == Z_OK)) return;
    148 //      // Try to recover by deleting the stream and starting from scratch.
    149 //      delete inflate_zstream;
    150 //     }
    151 
    152 //     inflate_zstream = new z_stream;
    153 
    154 //     inflate_zstream->zalloc = reinterpret_cast<alloc_func>(0);
    155 //     inflate_zstream->zfree = reinterpret_cast<free_func>(0);
    156 
    157 //     inflate_zstream->next_in = Z_NULL;
    158 //     inflate_zstream->avail_in = 0;
    159 
    160 //     int err = inflateInit2(inflate_zstream, -15);
    161 //     if (rare(err != Z_OK)) {
    162 //      if (err == Z_MEM_ERROR) {
    163 //          delete inflate_zstream;
    164 //          inflate_zstream = 0;
    165 //          throw std::bad_alloc();
    166 //      }
    167 //      string msg = "inflateInit2 failed (";
    168 //      if (inflate_zstream->msg) {
    169 //          msg += inflate_zstream->msg;
    170 //      } else {
    171 //          msg += str(err);
    172 //      }
    173 //      msg += ')';
    174 //      delete inflate_zstream;
    175 //      inflate_zstream = 0;
    176 //      throw Xapian::DatabaseError(msg);
    177 //     }
    178 // }
  • xapian-core/common/zlib_utils.h

    diff --git a/xapian-core/common/zlib_utils.h b/xapian-core/common/zlib_utils.h
    index 5ce4329..6fbcee8 100644
    a b  
     1#ifndef XAPIAN_INCLUDED_ZLIB_UTILS_H
     2#define XAPIAN_INCLUDED_ZLIB_UTILS_H
    13
    2 #include <zlib.h>
     4#include "debuglog.h"
     5
     6#include "internaltypes.h"
    37
    48#include "xapian/error.h"
    59
    6 #include "debuglog.h"
    7 #include "utils.h"
     10#include <zlib.h>
    811
    912using namespace std;
    1013
    class CompressionStream {  
    1922
    2023    int compress_strategy;
    2124
     25    int zerr;
     26
     27    unsigned long out_len;
     28   
     29    unsigned char * out;
     30
    2231    /// Zlib state object for deflating
    2332    mutable z_stream *deflate_zstream;
    2433
    class CompressionStream {  
    3039
    3140    /// Allocate the zstream for inflating, if not already allocated.
    3241    void lazy_alloc_inflate_zstream() const;
     42
     43    void compress(string &);
     44    void compress(byte *, int);
    3345};
     46
     47#endif // XAPIAN_INCLUDED_ZLIB_UTILS_H
  • xapian-core/tests/api_query.cc

    diff --git a/xapian-core/tests/api_query.cc b/xapian-core/tests/api_query.cc
    index 0d8a8e2..64bd6c9 100644
    a b DEFINE_TESTCASE(overload1, !backend) {  
    6666    Xapian::Query q;
    6767    q = Xapian::Query("foo") & Xapian::Query("bar");
    6868    TEST_STRINGS_EQUAL(q.get_description(), "Query((foo AND bar))");
    69     q = Xapian::Query("foo") &~ Xapian::Query("bar");
     69    q = Xapian::Query("foo") & (~Xapian::Query("bar"));
    7070    TEST_STRINGS_EQUAL(q.get_description(), "Query((foo AND_NOT bar))");
    7171    q = ~Xapian::Query("bar");
    7272    TEST_STRINGS_EQUAL(q.get_description(), "Query((<alldocuments> AND_NOT bar))");