HBASE-10434 Store Tag compression information for a WAL in its header (Anoop Sam John)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1562567 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2014-01-29 19:52:32 +00:00
parent b75d3a54c1
commit 7756ace9a2
10 changed files with 143 additions and 29 deletions

View File

@ -112,6 +112,16 @@ public final class WALProtos {
* <code>optional bytes encryption_key = 2;</code>
*/
com.google.protobuf.ByteString getEncryptionKey();
// optional bool has_tag_compression = 3;
/**
* <code>optional bool has_tag_compression = 3;</code>
*/
boolean hasHasTagCompression();
/**
* <code>optional bool has_tag_compression = 3;</code>
*/
boolean getHasTagCompression();
}
/**
* Protobuf type {@code WALHeader}
@ -174,6 +184,11 @@ public final class WALProtos {
encryptionKey_ = input.readBytes();
break;
}
case 24: {
bitField0_ |= 0x00000004;
hasTagCompression_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -246,9 +261,26 @@ public final class WALProtos {
return encryptionKey_;
}
// optional bool has_tag_compression = 3;
public static final int HAS_TAG_COMPRESSION_FIELD_NUMBER = 3;
private boolean hasTagCompression_;
/**
* <code>optional bool has_tag_compression = 3;</code>
*/
public boolean hasHasTagCompression() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional bool has_tag_compression = 3;</code>
*/
public boolean getHasTagCompression() {
return hasTagCompression_;
}
private void initFields() {
hasCompression_ = false;
encryptionKey_ = com.google.protobuf.ByteString.EMPTY;
hasTagCompression_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -268,6 +300,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, encryptionKey_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBool(3, hasTagCompression_);
}
getUnknownFields().writeTo(output);
}
@ -285,6 +320,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, encryptionKey_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, hasTagCompression_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -318,6 +357,11 @@ public final class WALProtos {
result = result && getEncryptionKey()
.equals(other.getEncryptionKey());
}
result = result && (hasHasTagCompression() == other.hasHasTagCompression());
if (hasHasTagCompression()) {
result = result && (getHasTagCompression()
== other.getHasTagCompression());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -339,6 +383,10 @@ public final class WALProtos {
hash = (37 * hash) + ENCRYPTION_KEY_FIELD_NUMBER;
hash = (53 * hash) + getEncryptionKey().hashCode();
}
if (hasHasTagCompression()) {
hash = (37 * hash) + HAS_TAG_COMPRESSION_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getHasTagCompression());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -452,6 +500,8 @@ public final class WALProtos {
bitField0_ = (bitField0_ & ~0x00000001);
encryptionKey_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000002);
hasTagCompression_ = false;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -488,6 +538,10 @@ public final class WALProtos {
to_bitField0_ |= 0x00000002;
}
result.encryptionKey_ = encryptionKey_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.hasTagCompression_ = hasTagCompression_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -510,6 +564,9 @@ public final class WALProtos {
if (other.hasEncryptionKey()) {
setEncryptionKey(other.getEncryptionKey());
}
if (other.hasHasTagCompression()) {
setHasTagCompression(other.getHasTagCompression());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -606,6 +663,39 @@ public final class WALProtos {
return this;
}
// optional bool has_tag_compression = 3;
private boolean hasTagCompression_ ;
/**
* <code>optional bool has_tag_compression = 3;</code>
*/
public boolean hasHasTagCompression() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional bool has_tag_compression = 3;</code>
*/
public boolean getHasTagCompression() {
return hasTagCompression_;
}
/**
* <code>optional bool has_tag_compression = 3;</code>
*/
public Builder setHasTagCompression(boolean value) {
bitField0_ |= 0x00000004;
hasTagCompression_ = value;
onChanged();
return this;
}
/**
* <code>optional bool has_tag_compression = 3;</code>
*/
public Builder clearHasTagCompression() {
bitField0_ = (bitField0_ & ~0x00000004);
hasTagCompression_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:WALHeader)
}
@ -5084,25 +5174,26 @@ public final class WALProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\tWAL.proto\032\013HBase.proto\"<\n\tWALHeader\022\027\n" +
"\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" +
"\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
"\030\002 \001(\014\"\202\002\n\006WALKey\022\033\n\023encoded_region_name" +
"\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023log_sequen" +
"ce_number\030\003 \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\nc" +
"luster_id\030\005 \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003" +
"(\0132\014.FamilyScope\022\032\n\022following_kv_count\030\007" +
" \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonc" +
"eGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilySco" +
"pe\022\016\n\006family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n",
".ScopeType\"\251\001\n\024CompactionDescriptor\022\022\n\nt" +
"able_name\030\001 \002(\014\022\033\n\023encoded_region_name\030\002" +
" \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020compaction_" +
"input\030\004 \003(\t\022\031\n\021compaction_output\030\005 \003(\t\022\026" +
"\n\016store_home_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\t" +
"ScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034" +
"\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apac" +
"he.hadoop.hbase.protobuf.generatedB\tWALP" +
"rotosH\001\210\001\000\240\001\001"
"\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" +
"WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" +
"able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" +
" \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" +
" \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" +
"Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" +
"ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
"\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil",
"y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
"\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
" \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" +
"ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
"\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
"e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
"\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
"ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
"base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
"\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5114,7 +5205,7 @@ public final class WALProtos {
internal_static_WALHeader_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_WALHeader_descriptor,
new java.lang.String[] { "HasCompression", "EncryptionKey", });
new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", });
internal_static_WALKey_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_WALKey_fieldAccessorTable = new

View File

@ -26,6 +26,7 @@ import "HBase.proto";
message WALHeader {
optional bool has_compression = 1;
optional bytes encryption_key = 2;
optional bool has_tag_compression = 3;
}
// Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs

View File

@ -22,7 +22,6 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.util.Dictionary;
@ -44,8 +43,8 @@ class CompressionContext {
TagCompressionContext tagCompressionContext = null;
public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
Configuration conf) throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
boolean hasTagCompression) throws SecurityException, NoSuchMethodException,
InstantiationException, IllegalAccessException, InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor =
dictType.getConstructor();
regionDict = dictConstructor.newInstance();
@ -64,7 +63,7 @@ class CompressionContext {
rowDict.init(Short.MAX_VALUE);
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
if (conf != null && conf.getBoolean(ENABLE_WAL_TAGS_COMPRESSION, true)) {
if (hasTagCompression) {
tagCompressionContext = new TagCompressionContext(dictType);
}
}

View File

@ -61,6 +61,7 @@ public class ProtobufLogReader extends ReaderBase {
protected Codec.Decoder cellDecoder;
protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
protected boolean hasCompression = false;
protected boolean hasTagCompression = false;
// walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry
// in the hlog, the inputstream's position is equal to walEditsStopOffset.
private long walEditsStopOffset;
@ -117,6 +118,7 @@ public class ProtobufLogReader extends ReaderBase {
if (isFirst) {
WALProtos.WALHeader header = builder.build();
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
}
this.inputStream = stream;
this.walEditsStopOffset = this.fileLength;
@ -201,6 +203,11 @@ public class ProtobufLogReader extends ReaderBase {
return this.hasCompression;
}
@Override
protected boolean hasTagCompression() {
return this.hasTagCompression;
}
@Override
protected boolean readNext(HLog.Entry entry) throws IOException {
while (true) {

View File

@ -78,7 +78,11 @@ public class ProtobufLogWriter extends WriterBase {
FSUtils.getDefaultBlockSize(fs, path));
output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
output.write(ProtobufLogReader.PB_WAL_MAGIC);
buildWALHeader(WALHeader.newBuilder().setHasCompression(doCompress)).writeDelimitedTo(output);
boolean doTagCompress = doCompress
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
buildWALHeader(
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))
.writeDelimitedTo(output);
initAfterHeader(doCompress);

View File

@ -74,7 +74,7 @@ public abstract class ReaderBase implements HLog.Reader {
try {
if (compressionContext == null) {
compressionContext = new CompressionContext(LRUDictionary.class,
FSUtils.isRecoveredEdits(path), conf);
FSUtils.isRecoveredEdits(path), hasTagCompression());
} else {
compressionContext.clear();
}
@ -147,6 +147,11 @@ public abstract class ReaderBase implements HLog.Reader {
*/
protected abstract boolean hasCompression();
/**
* @return Whether tag compression is enabled for this log.
*/
protected abstract boolean hasTagCompression();
/**
* Read next entry.
* @param e The entry to read into.

View File

@ -193,6 +193,12 @@ public class SequenceFileLogReader extends ReaderBase {
return isWALCompressionEnabled(reader.getMetadata());
}
@Override
protected boolean hasTagCompression() {
// Tag compression not supported with old SequenceFileLog Reader/Writer
return false;
}
/**
* Call this method after init() has been executed
* @return whether WAL compression is enabled

View File

@ -48,7 +48,8 @@ public abstract class WriterBase implements HLog.Writer {
if (doCompress) {
try {
this.compressionContext = new CompressionContext(LRUDictionary.class,
FSUtils.isRecoveredEdits(path), conf);
FSUtils.isRecoveredEdits(path), conf.getBoolean(
CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}

View File

@ -68,7 +68,7 @@ public class TestKeyValueCompression {
}
private void runTestCycle(List<KeyValue> kvs) throws Exception {
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
for (KeyValue kv : kvs) {
KeyValueCompression.writeKV(buf, kv, ctx);
@ -85,7 +85,7 @@ public class TestKeyValueCompression {
@Test
public void testKVWithTags() throws Exception {
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
KeyValueCompression.writeKV(buf, createKV(1), ctx);
KeyValueCompression.writeKV(buf, createKV(0), ctx);

View File

@ -55,7 +55,7 @@ public class TestWALCellCodecWithCompression {
Configuration conf = new Configuration(false);
conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
conf));
compressTags));
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
encoder.write(createKV(1));