Ticket #348: compressed_cc.diff
File compressed_cc.diff, 16.8 KB (added by , 13 years ago) |
---|
-
xapian-core/backends/brass/brass_databasereplicator.cc
diff --git a/xapian-core/backends/brass/brass_databasereplicator.cc b/xapian-core/backends/brass/brass_databasereplicator.cc index 002df61..a8b1875 100644
a b 20 20 * USA 21 21 */ 22 22 23 #include <iostream> 23 24 #include <config.h> 24 25 25 26 #include "brass_databasereplicator.h" … … 34 35 #include "debuglog.h" 35 36 #include "fd.h" 36 37 #include "filetests.h" 38 #include "internaltypes.h" 37 39 #include "io_utils.h" 38 40 #include "pack.h" 39 41 #include "net/remoteconnection.h" … … 42 44 #include "str.h" 43 45 #include "stringutils.h" 44 46 47 45 48 #ifdef __WIN32__ 46 49 # include "msvc_posix_wrapper.h" 47 50 #endif … … using namespace std; 52 55 using namespace Xapian; 53 56 54 57 BrassDatabaseReplicator::BrassDatabaseReplicator(const string & db_dir_) 55 : db_dir(db_dir_) 58 : db_dir(db_dir_), 59 comp_stream(Z_DEFAULT_STRATEGY) 56 60 { 57 61 } 58 62 … … BrassDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename 191 195 } 192 196 { 193 197 FD closer(fd); 194 198 Bytef out[8192]; 195 199 while (true) { 196 200 conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time); 197 201 ptr = buf.data(); 198 202 end = ptr + buf.size(); 199 200 203 uint4 block_number; 201 204 if (!unpack_uint(&ptr, end, &block_number)) 202 205 throw NetworkError("Invalid block number in changeset"); … … BrassDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename 205 208 break; 206 209 --block_number; 207 210 208 conn.get_message_chunk(buf, changeset_blocksize, end_time); 211 conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time); 212 ptr = buf.data(); 213 end = ptr + buf.size(); 214 unsigned int compressed_block_size; 215 if(!unpack_uint(&ptr, end, &compressed_block_size)) 216 throw NetworkError("Invalid v2 cphangeset"); 217 buf.erase(0, ptr - buf.data()); 218 219 if (compressed_block_size > 0) { 220 string ubuf; 221 ubuf.reserve(changeset_blocksize); 222 conn.get_message_chunk(buf, changeset_blocksize, end_time); 223 comp_stream.lazy_alloc_inflate_zstream(); 224 comp_stream.inflate_zstream->next_in = (Bytef*)const_cast<char *>(buf.data()); 225 comp_stream.inflate_zstream->avail_in = (uInt)(buf.size()); 226 comp_stream.inflate_zstream->next_out = out; 227 comp_stream.inflate_zstream->avail_out = (uInt)sizeof(out); 228 int err = inflate(comp_stream.inflate_zstream, Z_FINISH); 229 if (err == Z_STREAM_END) { 230 std::cout << comp_stream.inflate_zstream->next_out - out << std::endl; 231 std::cout << out << std::endl; 232 ubuf.append(reinterpret_cast<const char *>(out), 233 comp_stream.inflate_zstream->next_out - out); 234 swap(buf, ubuf); 235 } 236 } else { 237 conn.get_message_chunk(buf, changeset_blocksize, end_time); 238 } 209 239 if (buf.size() < changeset_blocksize) 210 240 throw NetworkError("Incomplete block in changeset"); 211 241 … … BrassDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn, 255 285 buf.erase(0, 12); 256 286 const char *ptr = buf.data(); 257 287 const char *end = ptr + buf.size(); 258 259 288 unsigned int changes_version; 260 289 if (!unpack_uint(&ptr, end, &changes_version)) 261 290 throw NetworkError("Couldn't read a valid version number from changeset"); -
xapian-core/backends/brass/brass_databasereplicator.h
diff --git a/xapian-core/backends/brass/brass_databasereplicator.h b/xapian-core/backends/brass/brass_databasereplicator.h index 3176589..915fb1f 100644
a b 24 24 #define XAPIAN_INCLUDED_BRASS_DATABASEREPLICATOR_H 25 25 26 26 #include "backends/databasereplicator.h" 27 #include "zlib_utils.h" 27 28 28 29 class BrassDatabaseReplicator : public Xapian::DatabaseReplicator { 29 30 private: … … class BrassDatabaseReplicator : public Xapian::DatabaseReplicator { 31 32 */ 32 33 std::string db_dir; 33 34 35 CompressionStream comp_stream; 36 34 37 /** Process a chunk which holds a base block. 35 38 */ 36 39 void process_changeset_chunk_base(const std::string & tablename, -
xapian-core/backends/brass/brass_replicate_internal.h
diff --git a/xapian-core/backends/brass/brass_replicate_internal.h b/xapian-core/backends/brass/brass_replicate_internal.h index d108710..24d140d 100644
a b 28 28 29 29 // The current version of changeset files. 30 30 // 1 - initial implementation 31 #define CHANGES_VERSION 1u 31 // 2 - compressed changesets 32 #define CHANGES_VERSION 2u 32 33 33 34 // Must be big enough to ensure that the start of the changeset (up to the new 34 35 // revision number) will fit in this much space. -
xapian-core/backends/brass/brass_table.cc
diff --git a/xapian-core/backends/brass/brass_table.cc b/xapian-core/backends/brass/brass_table.cc index f6c9527..e77312b 100644
a b BrassTable::write_changed_blocks(int changes_fd, bool compressed) 1818 1818 pack_string(buf, tablename); 1819 1819 pack_uint(buf, block_size); 1820 1820 1821 if (compressed) { 1822 comp_stream.lazy_alloc_deflate_zstream(); 1823 1824 comp_stream.deflate_zstream->next_in = (Bytef *)const_cast<char *>(buf.data()); 1825 comp_stream.deflate_zstream->avail_in = (uInt)buf.size(); 1826 int err = deflate(comp_stream.deflate_zstream, Z_FINISH); 1827 if (err == Z_STREAM_END) { 1828 io_write(changes_fd, reinterpret_cast<const char *>(buf.data()), 1829 comp_stream.deflate_zstream->total_out); 1830 } else { 1831 // The deflate failed, try to write data uncompressed 1832 io_write(changes_fd, buf.data(), buf.size()); 1833 } 1834 } 1835 else { 1836 io_write(changes_fd, buf.data(), buf.size()); 1837 } 1821 // Write the table name and block size to the file 1822 io_write(changes_fd, buf.data(), buf.size()); 1838 1823 1839 1824 // Compare the old and new bitmaps to find blocks which have changed, and 1840 1825 // write them to the file descriptor. … … BrassTable::write_changed_blocks(int changes_fd, bool compressed) 1845 1830 while (base.find_changed_block(&n)) { 1846 1831 buf.resize(0); 1847 1832 pack_uint(buf, n + 1); 1848 if (compressed) { 1849 comp_stream.lazy_alloc_deflate_zstream(); 1850 1851 comp_stream.deflate_zstream->next_in = (Bytef *)const_cast<char *>(buf.data()); 1852 comp_stream.deflate_zstream->avail_in = (uInt)buf.size(); 1853 int err = deflate(comp_stream.deflate_zstream, Z_FINISH); 1854 if (err == Z_STREAM_END) { 1855 io_write(changes_fd, reinterpret_cast<const char *>(buf.data()), 1856 comp_stream.deflate_zstream->total_out); 1857 } else { 1858 // The deflate failed, try to write data uncompressed 1859 io_write(changes_fd, buf.data(), buf.size()); 1860 } 1861 } 1862 else { 1863 io_write(changes_fd, buf.data(), buf.size()); 1864 } 1833 // Write the block number to the file 1834 io_write(changes_fd, buf.data(), buf.size()); 1865 1835 1866 1836 // Read block n. 1867 1837 read_block(n, p); … … BrassTable::write_changed_blocks(int changes_fd, bool compressed) 1869 1839 // Write block n to the file. 1870 1840 if (compressed) { 1871 1841 comp_stream.lazy_alloc_deflate_zstream(); 1872 1873 comp_stream.deflate_zstream->next_in = (Bytef *)(p);1874 comp_stream.deflate_zstream->avail_in = (uInt)block_size;1875 int err = deflate(comp_stream.deflate_zstream, Z_FINISH);1876 if (err == Z_STREAM_END) {1877 io_write(changes_fd, reinterpret_cast<const char *>( p),1842 comp_stream.compress(p, block_size); 1843 if (comp_stream.zerr == Z_STREAM_END) { 1844 buf.resize(0); 1845 pack_uint(buf, comp_stream.deflate_zstream->total_out); 1846 io_write(changes_fd, buf.data(), buf.size()); 1847 io_write(changes_fd, reinterpret_cast<const char *>(comp_stream.out), 1878 1848 comp_stream.deflate_zstream->total_out); 1879 1849 } else { 1880 1850 // The deflate failed, try to write data uncompressed 1851 buf.resize(0); 1852 pack_uint(buf, 0u); 1853 io_write(changes_fd, buf.data(), buf.size()); 1881 1854 io_write(changes_fd, reinterpret_cast<const char *>(p), block_size); 1882 1855 } 1883 1856 } 1884 1857 else { 1858 buf.resize(0); 1859 pack_uint(buf, 0u); 1860 io_write(changes_fd, buf.data(), buf.size()); 1885 1861 io_write(changes_fd, reinterpret_cast<const char *>(p), block_size); 1886 1862 } 1887 1863 ++n; … … BrassTable::write_changed_blocks(int changes_fd, bool compressed) 1894 1870 } 1895 1871 buf.resize(0); 1896 1872 pack_uint(buf, 0u); 1897 1898 if (compressed) { 1899 comp_stream.lazy_alloc_deflate_zstream(); 1900 1901 comp_stream.deflate_zstream->next_in = (Bytef *)const_cast<char *>(buf.data()); 1902 comp_stream.deflate_zstream->avail_in = (uInt)buf.size(); 1903 int err = deflate(comp_stream.deflate_zstream, Z_FINISH); 1904 if (err == Z_STREAM_END) { 1905 io_write(changes_fd, reinterpret_cast<const char *>(buf.data()), 1906 comp_stream.deflate_zstream->total_out); 1907 } else { 1908 // The deflate failed, try to write data uncompressed 1909 io_write(changes_fd, buf.data(), buf.size()); 1910 } 1911 } 1912 else { 1913 io_write(changes_fd, buf.data(), buf.size()); 1914 } 1873 // Write 0 for end of blocks 1874 io_write(changes_fd, buf.data(), buf.size()); 1915 1875 } 1916 1876 1917 1877 void -
xapian-core/backends/brass/brass_table.h
diff --git a/xapian-core/backends/brass/brass_table.h b/xapian-core/backends/brass/brass_table.h index 24d3f2a..a51df2f 100644
a b 41 41 #include <algorithm> 42 42 #include <string> 43 43 44 #include <zlib.h>45 46 44 #define DONT_COMPRESS -1 47 45 48 46 /** Even for items of at maximum size, it must be possible to get this number of -
xapian-core/common/Makefile.mk
diff --git a/xapian-core/common/Makefile.mk b/xapian-core/common/Makefile.mk index afd3e14..0211f8a 100644
a b noinst_HEADERS +=\ 38 38 common/stringutils.h\ 39 39 common/submatch.h\ 40 40 common/unaligned.h\ 41 common/utils.h\42 common/valuelist.h\43 common/valuestats.h\44 common/vectortermlist.h\45 common/weightinternal.h\46 41 common/zlib_utils.h 47 42 common/unaligned.h 48 43 … … lib_src +=\ 67 62 common/serialise-double.cc\ 68 63 common/socket_utils.cc\ 69 64 common/str.cc\ 70 common/stringutils.cc 65 common/stringutils.cc\ 71 66 common/zlib_utils.cc 72 67 73 68 -
xapian-core/common/zlib_utils.cc
diff --git a/xapian-core/common/zlib_utils.cc b/xapian-core/common/zlib_utils.cc index 4f6af90..8761f48 100644
a b 6 6 7 7 CompressionStream::CompressionStream(int compress_strategy_) 8 8 : compress_strategy(compress_strategy_), 9 zerr(0), 10 out_len(0), 11 out(NULL), 9 12 deflate_zstream(NULL), 10 13 inflate_zstream(NULL) 11 14 { … … CompressionStream::~CompressionStream() { 26 29 (void) inflateEnd(inflate_zstream); 27 30 delete inflate_zstream; 28 31 } 32 33 if (out) { 34 delete [] out; 35 } 36 } 37 38 39 void 40 CompressionStream::compress(string & buf) { 41 out_len = buf.size() - 1; 42 out = new unsigned char[out_len]; 43 deflate_zstream->avail_in = (uInt)buf.size(); 44 deflate_zstream->next_in = (Bytef *)const_cast<char *>(buf.data()); 45 deflate_zstream->next_out = out; 46 deflate_zstream->avail_out = (uInt)out_len; 47 zerr = deflate(deflate_zstream, Z_FINISH); 29 48 } 30 49 50 51 void 52 CompressionStream::compress(byte * buf, int size) { 53 out_len = size - 1; 54 out = new unsigned char[out_len]; 55 deflate_zstream->avail_in = (uInt)size; 56 deflate_zstream->next_in = (Bytef *)(buf); 57 deflate_zstream->next_out = out; 58 deflate_zstream->avail_out = (uInt)out_len; 59 zerr = deflate(deflate_zstream, Z_FINISH); 60 } 61 62 // void 63 // CompressionStream::decompress(string & buf) { 64 // inflate_zstream->next_in = (Bytef*)const_cast<char *>(tag->data()); 65 // inflate_zstream->avail_in = (uInt)tag->size(); 66 // int err = Z_OK; 67 // while (err != Z_STREAM_END) { 68 // comp_stream.inflate_zstream->next_out = buf; 69 // comp_stream.inflate_zstream->avail_out = (uInt)sizeof(buf); 70 // err = inflate(comp_stream.inflate_zstream, Z_SYNC_FLUSH); 71 // if (err == Z_BUF_ERROR && comp_stream.inflate_zstream->avail_in == 0) { 72 // LOGLINE(DB, "Z_BUF_ERROR - faking checksum of " << comp_stream.inflate_zstream->adler); 73 // Bytef header2[4]; 74 // setint4(header2, 0, comp_stream.inflate_zstream->adler); 75 // comp_stream.inflate_zstream->next_in = header2; 76 // comp_stream.inflate_zstream->avail_in = 4; 77 // err = inflate(comp_stream.inflate_zstream, Z_SYNC_FLUSH); 78 // if (err == Z_STREAM_END) break; 79 // } 80 81 // if (err != Z_OK && err != Z_STREAM_END) { 82 // if (err == Z_MEM_ERROR) throw std::bad_alloc(); 83 // string msg = "inflate failed"; 84 // if (comp_stream.inflate_zstream->msg) { 85 // msg += " ("; 86 // msg += comp_stream.inflate_zstream->msg; 87 // msg += ')'; 88 // } 89 // throw Xapian::DatabaseError(msg); 90 // } 91 92 // utag.append(reinterpret_cast<const char *>(buf), 93 // comp_stream.inflate_zstream->next_out - buf); 94 // } 95 // if (utag.size() != comp_stream.inflate_zstream->total_out) { 96 // string msg = "compressed tag didn't expand to the expected size: "; 97 // msg += str(utag.size()); 98 // msg += " != "; 99 // // OpenBSD's zlib.h uses off_t instead of uLong for total_out. 100 // msg += str((size_t)comp_stream.inflate_zstream->total_out); 101 // throw Xapian::DatabaseCorruptError(msg); 102 // } 103 // } 104 105 31 106 void 32 107 CompressionStream::lazy_alloc_deflate_zstream() const { 33 108 if (usual(deflate_zstream)) { … … CompressionStream::lazy_alloc_inflate_zstream() const { 102 177 throw Xapian::DatabaseError(msg); 103 178 } 104 179 } 105 106 // void107 // BrassTable::lazy_alloc_deflate_zstream() const {108 // if (usual(deflate_zstream)) {109 // if (usual(deflateReset(deflate_zstream) == Z_OK)) return;110 // // Try to recover by deleting the stream and starting from scratch.111 // delete deflate_zstream;112 // }113 114 // deflate_zstream = new z_stream;115 116 // deflate_zstream->zalloc = reinterpret_cast<alloc_func>(0);117 // deflate_zstream->zfree = reinterpret_cast<free_func>(0);118 // deflate_zstream->opaque = (voidpf)0;119 120 // // -15 means raw deflate with 32K LZ77 window (largest)121 // // memLevel 9 is the highest (8 is default)122 // int err;123 // err = deflateInit2(deflate_zstream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,124 // -15, 9, compress_strategy);125 // if (rare(err != Z_OK)) {126 // if (err == Z_MEM_ERROR) {127 // delete deflate_zstream;128 // deflate_zstream = 0;129 // throw std::bad_alloc();130 // }131 // string msg = "deflateInit2 failed (";132 // if (deflate_zstream->msg) {133 // msg += deflate_zstream->msg;134 // } else {135 // msg += str(err);136 // }137 // msg += ')';138 // delete deflate_zstream;139 // deflate_zstream = 0;140 // throw Xapian::DatabaseError(msg);141 // }142 // }143 144 // void145 // BrassTable::lazy_alloc_inflate_zstream() const {146 // if (usual(inflate_zstream)) {147 // if (usual(inflateReset(inflate_zstream) == Z_OK)) return;148 // // Try to recover by deleting the stream and starting from scratch.149 // delete inflate_zstream;150 // }151 152 // inflate_zstream = new z_stream;153 154 // inflate_zstream->zalloc = reinterpret_cast<alloc_func>(0);155 // inflate_zstream->zfree = reinterpret_cast<free_func>(0);156 157 // inflate_zstream->next_in = Z_NULL;158 // inflate_zstream->avail_in = 0;159 160 // int err = inflateInit2(inflate_zstream, -15);161 // if (rare(err != Z_OK)) {162 // if (err == Z_MEM_ERROR) {163 // delete inflate_zstream;164 // inflate_zstream = 0;165 // throw std::bad_alloc();166 // }167 // string msg = "inflateInit2 failed (";168 // if (inflate_zstream->msg) {169 // msg += inflate_zstream->msg;170 // } else {171 // msg += str(err);172 // }173 // msg += ')';174 // delete inflate_zstream;175 // inflate_zstream = 0;176 // throw Xapian::DatabaseError(msg);177 // }178 // } -
xapian-core/common/zlib_utils.h
diff --git a/xapian-core/common/zlib_utils.h b/xapian-core/common/zlib_utils.h index 5ce4329..6fbcee8 100644
a b 1 #ifndef XAPIAN_INCLUDED_ZLIB_UTILS_H 2 #define XAPIAN_INCLUDED_ZLIB_UTILS_H 1 3 2 #include <zlib.h> 4 #include "debuglog.h" 5 6 #include "internaltypes.h" 3 7 4 8 #include "xapian/error.h" 5 9 6 #include "debuglog.h" 7 #include "utils.h" 10 #include <zlib.h> 8 11 9 12 using namespace std; 10 13 … … class CompressionStream { 19 22 20 23 int compress_strategy; 21 24 25 int zerr; 26 27 unsigned long out_len; 28 29 unsigned char * out; 30 22 31 /// Zlib state object for deflating 23 32 mutable z_stream *deflate_zstream; 24 33 … … class CompressionStream { 30 39 31 40 /// Allocate the zstream for inflating, if not already allocated. 32 41 void lazy_alloc_inflate_zstream() const; 42 43 void compress(string &); 44 void compress(byte *, int); 33 45 }; 46 47 #endif // XAPIAN_INCLUDED_ZLIB_UTILS_H -
xapian-core/tests/api_query.cc
diff --git a/xapian-core/tests/api_query.cc b/xapian-core/tests/api_query.cc index 0d8a8e2..64bd6c9 100644
a b DEFINE_TESTCASE(overload1, !backend) { 66 66 Xapian::Query q; 67 67 q = Xapian::Query("foo") & Xapian::Query("bar"); 68 68 TEST_STRINGS_EQUAL(q.get_description(), "Query((foo AND bar))"); 69 q = Xapian::Query("foo") & ~ Xapian::Query("bar");69 q = Xapian::Query("foo") & (~Xapian::Query("bar")); 70 70 TEST_STRINGS_EQUAL(q.get_description(), "Query((foo AND_NOT bar))"); 71 71 q = ~Xapian::Query("bar"); 72 72 TEST_STRINGS_EQUAL(q.get_description(), "Query((<alldocuments> AND_NOT bar))");