// This file contains the implementation of classes
// LzipInputStream and LzipOutputStream used to compress
// and decompress Google's Protocol Buffer Streams using
// the Lempel-Ziv-Markow-Algorithm.
//
// Derived from http://protobuf.googlecode.com/svn/tags/2.2.0/src/google/protobuf/io/gzip_stream.cc
// Copyright 2009 by Jacob Rief
// Evaluation copy - don't use in production code
#include
#include
namespace google {
namespace protobuf {
namespace io {
static const int kDefaultBufferSize = 8192;
// === LzipInputStream ===
LzipInputStream::LzipInputStream(ZeroCopyInputStream* sub_stream) :
sub_stream_(sub_stream),
finished_(false),
output_buffer_length_(kDefaultBufferSize),
output_buffer_(operator new(output_buffer_length_)),
output_position_(NULL),
next_out_(NULL),
avail_out_(0),
errno_(LZ_ok)
{
GOOGLE_CHECK(output_buffer_ != NULL);
decoder_ = LZ_decompress_open();
errno_ = LZ_decompress_errno(decoder_);
GOOGLE_CHECK(errno_ == LZ_ok);
}
LzipInputStream::~LzipInputStream() {
if (decoder_ != NULL) {
Close();
}
if (output_buffer_ != NULL) {
operator delete(output_buffer_);
}
}
bool LzipInputStream::Close() {
errno_ = LZ_decompress_errno(decoder_);
bool ok = LZ_decompress_close(decoder_) == LZ_ok;
decoder_ = NULL;
return ok;
}
// --- implements ZeroCopyInputStream ---
bool LzipInputStream::Next(const void** data, int* size) {
GOOGLE_CHECK_GE(next_out_, output_position_);
if (next_out_ == output_position_) {
if (finished_ && LZ_decompress_finished(decoder_))
return false;
output_position_ = next_out_ = static_cast(output_buffer_);
avail_out_ = output_buffer_length_;
Decompress();
}
*data = output_position_;
*size = next_out_ - output_position_;
output_position_ = next_out_;
return true;
}
void LzipInputStream::BackUp(int count) {
GOOGLE_CHECK_GE(output_position_-static_cast(output_buffer_), count);
output_position_ -= count;
}
bool LzipInputStream::Skip(int count) {
const void* data;
int size;
bool ok = Next(&data, &size);
while (ok && (size < count)) {
count -= size;
ok = Next(&data, &size);
}
if (size > count) {
BackUp(size - count);
}
return ok;
}
int64 LzipInputStream::ByteCount() const {
return LZ_decompress_total_out_size(decoder_);
}
// --- private ---
void LzipInputStream::Decompress() {
GOOGLE_CHECK_GT(avail_out_, 0);
if (!finished_) {
int avail_in;
const void* next_in;
if (sub_stream_->Next(&next_in, &avail_in)) {
int bytes_written = LZ_decompress_write(decoder_, static_cast(next_in), avail_in);
errno_ = LZ_decompress_errno(decoder_);
GOOGLE_CHECK(errno_ == LZ_ok);
GOOGLE_CHECK_GE(bytes_written, 0);
sub_stream_->BackUp(avail_in - bytes_written);
} else {
GOOGLE_CHECK(LZ_decompress_finish(decoder_) == LZ_ok);
finished_ = true;
}
}
int bytes_read = LZ_decompress_read(decoder_, next_out_, avail_out_);
errno_ = LZ_decompress_errno(decoder_);
GOOGLE_CHECK(errno_ == LZ_ok);
GOOGLE_CHECK_GE(bytes_read, 0);
next_out_ += bytes_read;
avail_out_ -= bytes_read;
}
// === LzipOutputStream ===
LzipOutputStream::LzipOutputStream(ZeroCopyOutputStream* sub_stream, size_t compression_level, int64_t member_size) :
input_buffer_length_(kDefaultBufferSize),
input_buffer_(operator new(input_buffer_length_)),
input_position_(static_cast(input_buffer_)),
input_buffer_end_(input_position_ + input_buffer_length_),
sub_stream_(sub_stream),
finished_(false),
member_size_(member_size)
{
GOOGLE_CHECK(input_buffer_ != NULL);
GOOGLE_CHECK_GT(compression_level, 0);
compression_level--;
GOOGLE_CHECK_LT(compression_level, sizeof(options)/sizeof(Options));
encoder_ = LZ_compress_open(options[compression_level].dictionary_size, options[compression_level].match_len_limit, member_size);
errno_ = LZ_compress_errno(encoder_);
GOOGLE_CHECK(errno_ == LZ_ok);
}
LzipOutputStream::~LzipOutputStream() {
if (encoder_ != NULL) {
Close();
}
if (input_buffer_ != NULL) {
operator delete(input_buffer_);
}
}
bool LzipOutputStream::Flush() {
Compress(true);
input_position_ = static_cast(input_buffer_);
return true;
}
bool LzipOutputStream::Close() {
if (finished_)
return false;
Compress();
GOOGLE_CHECK(LZ_compress_finish(encoder_) == LZ_ok);
do {
int avail_out;
void* next_out;
if (sub_stream_->Next(&next_out, &avail_out)) {
int bytes_read = LZ_compress_read(encoder_, static_cast(next_out), avail_out);
errno_ = LZ_compress_errno(encoder_);
GOOGLE_CHECK(errno_ == LZ_ok);
GOOGLE_CHECK_GE(bytes_read, 0);
sub_stream_->BackUp(avail_out - bytes_read);
} else {
// disk full?
return false;
}
} while (!LZ_compress_finished(encoder_));
bool ok = LZ_compress_close(encoder_) == LZ_ok;
encoder_ = NULL;
return ok;
}
// --- implements ZeroCopyOutputStream ---
bool LzipOutputStream::Next(void** data, int* size) {
GOOGLE_CHECK_LE(input_position_, input_buffer_end_);
if (input_position_ == input_buffer_end_) {
if (finished_)
return false;
Compress();
*data = input_buffer_;
*size = input_buffer_length_;
} else {
*data = input_position_;
*size = input_buffer_end_ - input_position_;
}
input_position_ = input_buffer_end_;
return true;
}
void LzipOutputStream::BackUp(int count) {
GOOGLE_CHECK_LE(input_buffer_length_ - count, input_position_ - static_cast(input_buffer_));
input_position_ -= count;
}
int64 LzipOutputStream::ByteCount() const {
return LZ_compress_total_in_size(encoder_);
}
// --- private ---
void LzipOutputStream::Compress(bool flush) {
uint8_t* next_in = static_cast(input_buffer_);
int avail_in = input_position_ - next_in;
int bytes_written, bytes_read;
do {
bytes_written = LZ_compress_write(encoder_, next_in, avail_in);
errno_ = LZ_compress_errno(encoder_);
GOOGLE_CHECK(errno_ == LZ_ok);
GOOGLE_CHECK_GE(bytes_written, 0);
next_in += bytes_written;
avail_in -= bytes_written;
if (flush) {
GOOGLE_CHECK(LZ_compress_sync_flush(encoder_) == LZ_ok);
flush = false;
}
int avail_out;
void* next_out;
if (sub_stream_->Next(&next_out, &avail_out)) {
bytes_read = LZ_compress_read(encoder_, static_cast(next_out), avail_out);
errno_ = LZ_compress_errno(encoder_);
GOOGLE_CHECK(errno_ == LZ_ok);
GOOGLE_CHECK_GE(bytes_read, 0);
if (LZ_compress_member_finished(encoder_)==1) {
LZ_compress_restart_member(encoder_, member_size_);
}
sub_stream_->BackUp(avail_out - bytes_read);
} else {
// disk full?
finished_ = true;
}
} while (bytes_written>0 || bytes_read>0);
}
const LzipOutputStream::Options LzipOutputStream::options[9] = {
{ 1 << 20, 10 }, // -1
{ 1 << 20, 12 }, // -2
{ 1 << 20, 17 }, // -3
{ 1 << 21, 26 }, // -4
{ 1 << 22, 44 }, // -5
{ 1 << 23, 80 }, // -6
{ 1 << 24, 108 }, // -7
{ 1 << 24, 163 }, // -8
{ 1 << 25, 273 } // -9
};
} // namespace io
} // namespace protobuf
} // namespace google