// This file contains the implementation of classes // LzipInputStream and LzipOutputStream used to compress // and decompress Google's Protocol Buffer Streams using // the Lempel-Ziv-Markow-Algorithm. // // Derived from http://protobuf.googlecode.com/svn/tags/2.2.0/src/google/protobuf/io/gzip_stream.cc // Copyright 2009 by Jacob Rief // Evaluation copy - don't use in production code #include #include namespace google { namespace protobuf { namespace io { static const int kDefaultBufferSize = 8192; // === LzipInputStream === LzipInputStream::LzipInputStream(ZeroCopyInputStream* sub_stream) : sub_stream_(sub_stream), finished_(false), output_buffer_length_(kDefaultBufferSize), output_buffer_(operator new(output_buffer_length_)), output_position_(NULL), next_out_(NULL), avail_out_(0), errno_(LZ_ok) { GOOGLE_CHECK(output_buffer_ != NULL); decoder_ = LZ_decompress_open(); errno_ = LZ_decompress_errno(decoder_); GOOGLE_CHECK(errno_ == LZ_ok); } LzipInputStream::~LzipInputStream() { if (decoder_ != NULL) { Close(); } if (output_buffer_ != NULL) { operator delete(output_buffer_); } } bool LzipInputStream::Close() { errno_ = LZ_decompress_errno(decoder_); bool ok = LZ_decompress_close(decoder_) == LZ_ok; decoder_ = NULL; return ok; } // --- implements ZeroCopyInputStream --- bool LzipInputStream::Next(const void** data, int* size) { GOOGLE_CHECK_GE(next_out_, output_position_); if (next_out_ == output_position_) { if (finished_ && LZ_decompress_finished(decoder_)) return false; output_position_ = next_out_ = static_cast(output_buffer_); avail_out_ = output_buffer_length_; Decompress(); } *data = output_position_; *size = next_out_ - output_position_; output_position_ = next_out_; return true; } void LzipInputStream::BackUp(int count) { GOOGLE_CHECK_GE(output_position_-static_cast(output_buffer_), count); output_position_ -= count; } bool LzipInputStream::Skip(int count) { const void* data; int size; bool ok = Next(&data, &size); while (ok && (size < count)) { count -= size; ok = Next(&data, &size); } if (size > count) { BackUp(size - count); } return ok; } int64 LzipInputStream::ByteCount() const { return LZ_decompress_total_out_size(decoder_); } // --- private --- void LzipInputStream::Decompress() { GOOGLE_CHECK_GT(avail_out_, 0); if (!finished_) { int avail_in; const void* next_in; if (sub_stream_->Next(&next_in, &avail_in)) { int bytes_written = LZ_decompress_write(decoder_, static_cast(next_in), avail_in); errno_ = LZ_decompress_errno(decoder_); GOOGLE_CHECK(errno_ == LZ_ok); GOOGLE_CHECK_GE(bytes_written, 0); sub_stream_->BackUp(avail_in - bytes_written); } else { GOOGLE_CHECK(LZ_decompress_finish(decoder_) == LZ_ok); finished_ = true; } } int bytes_read = LZ_decompress_read(decoder_, next_out_, avail_out_); errno_ = LZ_decompress_errno(decoder_); GOOGLE_CHECK(errno_ == LZ_ok); GOOGLE_CHECK_GE(bytes_read, 0); next_out_ += bytes_read; avail_out_ -= bytes_read; } // === LzipOutputStream === LzipOutputStream::LzipOutputStream(ZeroCopyOutputStream* sub_stream, size_t compression_level, int64_t member_size) : input_buffer_length_(kDefaultBufferSize), input_buffer_(operator new(input_buffer_length_)), input_position_(static_cast(input_buffer_)), input_buffer_end_(input_position_ + input_buffer_length_), sub_stream_(sub_stream), finished_(false), member_size_(member_size) { GOOGLE_CHECK(input_buffer_ != NULL); GOOGLE_CHECK_GT(compression_level, 0); compression_level--; GOOGLE_CHECK_LT(compression_level, sizeof(options)/sizeof(Options)); encoder_ = LZ_compress_open(options[compression_level].dictionary_size, options[compression_level].match_len_limit, member_size); errno_ = LZ_compress_errno(encoder_); GOOGLE_CHECK(errno_ == LZ_ok); } LzipOutputStream::~LzipOutputStream() { if (encoder_ != NULL) { Close(); } if (input_buffer_ != NULL) { operator delete(input_buffer_); } } bool LzipOutputStream::Flush() { Compress(true); input_position_ = static_cast(input_buffer_); return true; } bool LzipOutputStream::Close() { if (finished_) return false; Compress(); GOOGLE_CHECK(LZ_compress_finish(encoder_) == LZ_ok); do { int avail_out; void* next_out; if (sub_stream_->Next(&next_out, &avail_out)) { int bytes_read = LZ_compress_read(encoder_, static_cast(next_out), avail_out); errno_ = LZ_compress_errno(encoder_); GOOGLE_CHECK(errno_ == LZ_ok); GOOGLE_CHECK_GE(bytes_read, 0); sub_stream_->BackUp(avail_out - bytes_read); } else { // disk full? return false; } } while (!LZ_compress_finished(encoder_)); bool ok = LZ_compress_close(encoder_) == LZ_ok; encoder_ = NULL; return ok; } // --- implements ZeroCopyOutputStream --- bool LzipOutputStream::Next(void** data, int* size) { GOOGLE_CHECK_LE(input_position_, input_buffer_end_); if (input_position_ == input_buffer_end_) { if (finished_) return false; Compress(); *data = input_buffer_; *size = input_buffer_length_; } else { *data = input_position_; *size = input_buffer_end_ - input_position_; } input_position_ = input_buffer_end_; return true; } void LzipOutputStream::BackUp(int count) { GOOGLE_CHECK_LE(input_buffer_length_ - count, input_position_ - static_cast(input_buffer_)); input_position_ -= count; } int64 LzipOutputStream::ByteCount() const { return LZ_compress_total_in_size(encoder_); } // --- private --- void LzipOutputStream::Compress(bool flush) { uint8_t* next_in = static_cast(input_buffer_); int avail_in = input_position_ - next_in; int bytes_written, bytes_read; do { bytes_written = LZ_compress_write(encoder_, next_in, avail_in); errno_ = LZ_compress_errno(encoder_); GOOGLE_CHECK(errno_ == LZ_ok); GOOGLE_CHECK_GE(bytes_written, 0); next_in += bytes_written; avail_in -= bytes_written; if (flush) { GOOGLE_CHECK(LZ_compress_sync_flush(encoder_) == LZ_ok); flush = false; } int avail_out; void* next_out; if (sub_stream_->Next(&next_out, &avail_out)) { bytes_read = LZ_compress_read(encoder_, static_cast(next_out), avail_out); errno_ = LZ_compress_errno(encoder_); GOOGLE_CHECK(errno_ == LZ_ok); GOOGLE_CHECK_GE(bytes_read, 0); if (LZ_compress_member_finished(encoder_)==1) { LZ_compress_restart_member(encoder_, member_size_); } sub_stream_->BackUp(avail_out - bytes_read); } else { // disk full? finished_ = true; } } while (bytes_written>0 || bytes_read>0); } const LzipOutputStream::Options LzipOutputStream::options[9] = { { 1 << 20, 10 }, // -1 { 1 << 20, 12 }, // -2 { 1 << 20, 17 }, // -3 { 1 << 21, 26 }, // -4 { 1 << 22, 44 }, // -5 { 1 << 23, 80 }, // -6 { 1 << 24, 108 }, // -7 { 1 << 24, 163 }, // -8 { 1 << 25, 273 } // -9 }; } // namespace io } // namespace protobuf } // namespace google