HBASE-11762 Record the class name of Codec in WAL header
This commit is contained in:
parent
ac6b747370
commit
12478cded7
|
@ -137,6 +137,21 @@ public final class WALProtos {
|
|||
*/
|
||||
com.google.protobuf.ByteString
|
||||
getWriterClsNameBytes();
|
||||
|
||||
// optional string cell_codec_cls_name = 5;
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
boolean hasCellCodecClsName();
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
java.lang.String getCellCodecClsName();
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
com.google.protobuf.ByteString
|
||||
getCellCodecClsNameBytes();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code WALHeader}
|
||||
|
@ -209,6 +224,11 @@ public final class WALProtos {
|
|||
writerClsName_ = input.readBytes();
|
||||
break;
|
||||
}
|
||||
case 42: {
|
||||
bitField0_ |= 0x00000010;
|
||||
cellCodecClsName_ = input.readBytes();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -340,11 +360,55 @@ public final class WALProtos {
|
|||
}
|
||||
}
|
||||
|
||||
// optional string cell_codec_cls_name = 5;
|
||||
public static final int CELL_CODEC_CLS_NAME_FIELD_NUMBER = 5;
|
||||
private java.lang.Object cellCodecClsName_;
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public boolean hasCellCodecClsName() {
|
||||
return ((bitField0_ & 0x00000010) == 0x00000010);
|
||||
}
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public java.lang.String getCellCodecClsName() {
|
||||
java.lang.Object ref = cellCodecClsName_;
|
||||
if (ref instanceof java.lang.String) {
|
||||
return (java.lang.String) ref;
|
||||
} else {
|
||||
com.google.protobuf.ByteString bs =
|
||||
(com.google.protobuf.ByteString) ref;
|
||||
java.lang.String s = bs.toStringUtf8();
|
||||
if (bs.isValidUtf8()) {
|
||||
cellCodecClsName_ = s;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public com.google.protobuf.ByteString
|
||||
getCellCodecClsNameBytes() {
|
||||
java.lang.Object ref = cellCodecClsName_;
|
||||
if (ref instanceof java.lang.String) {
|
||||
com.google.protobuf.ByteString b =
|
||||
com.google.protobuf.ByteString.copyFromUtf8(
|
||||
(java.lang.String) ref);
|
||||
cellCodecClsName_ = b;
|
||||
return b;
|
||||
} else {
|
||||
return (com.google.protobuf.ByteString) ref;
|
||||
}
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
hasCompression_ = false;
|
||||
encryptionKey_ = com.google.protobuf.ByteString.EMPTY;
|
||||
hasTagCompression_ = false;
|
||||
writerClsName_ = "";
|
||||
cellCodecClsName_ = "";
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -370,6 +434,9 @@ public final class WALProtos {
|
|||
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||
output.writeBytes(4, getWriterClsNameBytes());
|
||||
}
|
||||
if (((bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
output.writeBytes(5, getCellCodecClsNameBytes());
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -395,6 +462,10 @@ public final class WALProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSize(4, getWriterClsNameBytes());
|
||||
}
|
||||
if (((bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSize(5, getCellCodecClsNameBytes());
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -438,6 +509,11 @@ public final class WALProtos {
|
|||
result = result && getWriterClsName()
|
||||
.equals(other.getWriterClsName());
|
||||
}
|
||||
result = result && (hasCellCodecClsName() == other.hasCellCodecClsName());
|
||||
if (hasCellCodecClsName()) {
|
||||
result = result && getCellCodecClsName()
|
||||
.equals(other.getCellCodecClsName());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -467,6 +543,10 @@ public final class WALProtos {
|
|||
hash = (37 * hash) + WRITER_CLS_NAME_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getWriterClsName().hashCode();
|
||||
}
|
||||
if (hasCellCodecClsName()) {
|
||||
hash = (37 * hash) + CELL_CODEC_CLS_NAME_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getCellCodecClsName().hashCode();
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -584,6 +664,8 @@ public final class WALProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
writerClsName_ = "";
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
cellCodecClsName_ = "";
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -628,6 +710,10 @@ public final class WALProtos {
|
|||
to_bitField0_ |= 0x00000008;
|
||||
}
|
||||
result.writerClsName_ = writerClsName_;
|
||||
if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
to_bitField0_ |= 0x00000010;
|
||||
}
|
||||
result.cellCodecClsName_ = cellCodecClsName_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -658,6 +744,11 @@ public final class WALProtos {
|
|||
writerClsName_ = other.writerClsName_;
|
||||
onChanged();
|
||||
}
|
||||
if (other.hasCellCodecClsName()) {
|
||||
bitField0_ |= 0x00000010;
|
||||
cellCodecClsName_ = other.cellCodecClsName_;
|
||||
onChanged();
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -861,6 +952,80 @@ public final class WALProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional string cell_codec_cls_name = 5;
|
||||
private java.lang.Object cellCodecClsName_ = "";
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public boolean hasCellCodecClsName() {
|
||||
return ((bitField0_ & 0x00000010) == 0x00000010);
|
||||
}
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public java.lang.String getCellCodecClsName() {
|
||||
java.lang.Object ref = cellCodecClsName_;
|
||||
if (!(ref instanceof java.lang.String)) {
|
||||
java.lang.String s = ((com.google.protobuf.ByteString) ref)
|
||||
.toStringUtf8();
|
||||
cellCodecClsName_ = s;
|
||||
return s;
|
||||
} else {
|
||||
return (java.lang.String) ref;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public com.google.protobuf.ByteString
|
||||
getCellCodecClsNameBytes() {
|
||||
java.lang.Object ref = cellCodecClsName_;
|
||||
if (ref instanceof String) {
|
||||
com.google.protobuf.ByteString b =
|
||||
com.google.protobuf.ByteString.copyFromUtf8(
|
||||
(java.lang.String) ref);
|
||||
cellCodecClsName_ = b;
|
||||
return b;
|
||||
} else {
|
||||
return (com.google.protobuf.ByteString) ref;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public Builder setCellCodecClsName(
|
||||
java.lang.String value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
bitField0_ |= 0x00000010;
|
||||
cellCodecClsName_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public Builder clearCellCodecClsName() {
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
cellCodecClsName_ = getDefaultInstance().getCellCodecClsName();
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional string cell_codec_cls_name = 5;</code>
|
||||
*/
|
||||
public Builder setCellCodecClsNameBytes(
|
||||
com.google.protobuf.ByteString value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
bitField0_ |= 0x00000010;
|
||||
cellCodecClsName_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:WALHeader)
|
||||
}
|
||||
|
||||
|
@ -7829,37 +7994,37 @@ public final class WALProtos {
|
|||
descriptor;
|
||||
static {
|
||||
java.lang.String[] descriptorData = {
|
||||
"\n\tWAL.proto\032\013HBase.proto\"r\n\tWALHeader\022\027\n" +
|
||||
"\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
|
||||
"\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\022\027\n\017w" +
|
||||
"riter_cls_name\030\004 \001(\t\"\240\002\n\006WALKey\022\033\n\023encod" +
|
||||
"ed_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014" +
|
||||
"\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite_t" +
|
||||
"ime\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002\030\001" +
|
||||
"\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022follow" +
|
||||
"ing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132" +
|
||||
"\005.UUID\022\022\n\nnonceGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(",
|
||||
"\004\022\034\n\024orig_sequence_number\030\013 \001(\004\"=\n\013Famil" +
|
||||
"yScope\022\016\n\006family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002" +
|
||||
"(\0162\n.ScopeType\"\276\001\n\024CompactionDescriptor\022" +
|
||||
"\022\n\ntable_name\030\001 \002(\014\022\033\n\023encoded_region_na" +
|
||||
"me\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020compact" +
|
||||
"ion_input\030\004 \003(\t\022\031\n\021compaction_output\030\005 \003" +
|
||||
"(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region_nam" +
|
||||
"e\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006action\030\001 " +
|
||||
"\002(\0162\034.FlushDescriptor.FlushAction\022\022\n\ntab" +
|
||||
"le_name\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002",
|
||||
"(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n\rsto" +
|
||||
"re_flushes\030\005 \003(\0132%.FlushDescriptor.Store" +
|
||||
"FlushDescriptor\032Y\n\024StoreFlushDescriptor\022" +
|
||||
"\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002" +
|
||||
" \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushActio" +
|
||||
"n\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013" +
|
||||
"ABORT_FLUSH\020\002\"\014\n\nWALTrailer*F\n\tScopeType" +
|
||||
"\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICA" +
|
||||
"TION_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop" +
|
||||
".hbase.protobuf.generatedB\tWALProtosH\001\210\001",
|
||||
"\000\240\001\001"
|
||||
"\n\tWAL.proto\032\013HBase.proto\"\217\001\n\tWALHeader\022\027" +
|
||||
"\n\017has_compression\030\001 \001(\010\022\026\n\016encryption_ke" +
|
||||
"y\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\022\027\n\017" +
|
||||
"writer_cls_name\030\004 \001(\t\022\033\n\023cell_codec_cls_" +
|
||||
"name\030\005 \001(\t\"\240\002\n\006WALKey\022\033\n\023encoded_region_" +
|
||||
"name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023log_se" +
|
||||
"quence_number\030\003 \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022" +
|
||||
"\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes" +
|
||||
"\030\006 \003(\0132\014.FamilyScope\022\032\n\022following_kv_cou" +
|
||||
"nt\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132\005.UUID\022\022\n\n",
|
||||
"nonceGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_" +
|
||||
"sequence_number\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006" +
|
||||
"family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.Scope" +
|
||||
"Type\"\276\001\n\024CompactionDescriptor\022\022\n\ntable_n" +
|
||||
"ame\030\001 \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023" +
|
||||
"\n\013family_name\030\003 \002(\014\022\030\n\020compaction_input\030" +
|
||||
"\004 \003(\t\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016stor" +
|
||||
"e_home_dir\030\006 \002(\t\022\023\n\013region_name\030\007 \001(\014\"\353\002" +
|
||||
"\n\017FlushDescriptor\022,\n\006action\030\001 \002(\0162\034.Flus" +
|
||||
"hDescriptor.FlushAction\022\022\n\ntable_name\030\002 ",
|
||||
"\002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\035\n\025flus" +
|
||||
"h_sequence_number\030\004 \001(\004\022<\n\rstore_flushes" +
|
||||
"\030\005 \003(\0132%.FlushDescriptor.StoreFlushDescr" +
|
||||
"iptor\032Y\n\024StoreFlushDescriptor\022\023\n\013family_" +
|
||||
"name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014fl" +
|
||||
"ush_output\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START" +
|
||||
"_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUS" +
|
||||
"H\020\002\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" +
|
||||
"ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" +
|
||||
"_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro",
|
||||
"tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -7871,7 +8036,7 @@ public final class WALProtos {
|
|||
internal_static_WALHeader_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_WALHeader_descriptor,
|
||||
new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", "WriterClsName", });
|
||||
new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", "WriterClsName", "CellCodecClsName", });
|
||||
internal_static_WALKey_descriptor =
|
||||
getDescriptor().getMessageTypes().get(1);
|
||||
internal_static_WALKey_fieldAccessorTable = new
|
||||
|
|
|
@ -28,6 +28,7 @@ message WALHeader {
|
|||
optional bytes encryption_key = 2;
|
||||
optional bool has_tag_compression = 3;
|
||||
optional string writer_cls_name = 4;
|
||||
optional string cell_codec_cls_name = 5;
|
||||
}
|
||||
|
||||
// Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs
|
||||
|
|
|
@ -78,6 +78,24 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
SUCCESS,
|
||||
UNKNOWN_WRITER_CLS // name of writer class isn't recognized
|
||||
}
|
||||
|
||||
// context for WALHdr carrying information such as Cell Codec classname
|
||||
static class WALHdrContext {
|
||||
WALHdrResult result;
|
||||
String cellCodecClsName;
|
||||
|
||||
WALHdrContext(WALHdrResult result, String cellCodecClsName) {
|
||||
this.result = result;
|
||||
this.cellCodecClsName = cellCodecClsName;
|
||||
}
|
||||
WALHdrResult getResult() {
|
||||
return result;
|
||||
}
|
||||
String getCellCodecClsName() {
|
||||
return cellCodecClsName;
|
||||
}
|
||||
}
|
||||
|
||||
public ProtobufLogReader() {
|
||||
super();
|
||||
}
|
||||
|
@ -97,13 +115,13 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
initInternal(null, false);
|
||||
initAfterCompression(); // We need a new decoder (at least).
|
||||
String clsName = initInternal(null, false);
|
||||
initAfterCompression(clsName); // We need a new decoder (at least).
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initReader(FSDataInputStream stream) throws IOException {
|
||||
initInternal(stream, true);
|
||||
protected String initReader(FSDataInputStream stream) throws IOException {
|
||||
return initInternal(stream, true);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -113,18 +131,22 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
return writerClsNames;
|
||||
}
|
||||
|
||||
protected WALHdrResult readHeader(Builder builder, FSDataInputStream stream)
|
||||
protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream)
|
||||
throws IOException {
|
||||
boolean res = builder.mergeDelimitedFrom(stream);
|
||||
if (!res) return WALHdrResult.EOF;
|
||||
if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
|
||||
if (builder.hasWriterClsName() &&
|
||||
!getWriterClsNames().contains(builder.getWriterClsName())) {
|
||||
return WALHdrResult.UNKNOWN_WRITER_CLS;
|
||||
return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
|
||||
}
|
||||
return WALHdrResult.SUCCESS;
|
||||
String clsName = null;
|
||||
if (builder.hasCellCodecClsName()) {
|
||||
clsName = builder.getCellCodecClsName();
|
||||
}
|
||||
return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
|
||||
}
|
||||
|
||||
private void initInternal(FSDataInputStream stream, boolean isFirst)
|
||||
private String initInternal(FSDataInputStream stream, boolean isFirst)
|
||||
throws IOException {
|
||||
close();
|
||||
long expectedPos = PB_WAL_MAGIC.length;
|
||||
|
@ -137,7 +159,8 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
}
|
||||
// Initialize metadata or, when we reset, just skip the header.
|
||||
WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
|
||||
WALHdrResult walHdrRes = readHeader(builder, stream);
|
||||
WALHdrContext hdrCtxt = readHeader(builder, stream);
|
||||
WALHdrResult walHdrRes = hdrCtxt.getResult();
|
||||
if (walHdrRes == WALHdrResult.EOF) {
|
||||
throw new EOFException("Couldn't read WAL PB header");
|
||||
}
|
||||
|
@ -158,6 +181,7 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
|
||||
+ ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
|
||||
}
|
||||
return hdrCtxt.getCellCodecClsName();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -213,14 +237,14 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
return false;
|
||||
}
|
||||
|
||||
protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
|
||||
throws IOException {
|
||||
return WALCellCodec.create(conf, compressionContext);
|
||||
protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
|
||||
CompressionContext compressionContext) throws IOException {
|
||||
return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initAfterCompression() throws IOException {
|
||||
WALCellCodec codec = getCodec(this.conf, this.compressionContext);
|
||||
protected void initAfterCompression(String cellCodecClsName) throws IOException {
|
||||
WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
|
||||
this.cellDecoder = codec.getDecoder(this.inputStream);
|
||||
if (this.hasCompression) {
|
||||
this.byteStringUncompressor = codec.getByteStringUncompressor();
|
||||
|
|
|
@ -55,13 +55,17 @@ public class ProtobufLogWriter extends WriterBase {
|
|||
|
||||
protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
|
||||
throws IOException {
|
||||
return WALCellCodec.create(conf, compressionContext);
|
||||
return WALCellCodec.create(conf, null, compressionContext);
|
||||
}
|
||||
|
||||
protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
|
||||
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
|
||||
throws IOException {
|
||||
if (!builder.hasWriterClsName()) {
|
||||
builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
|
||||
}
|
||||
if (!builder.hasCellCodecClsName()) {
|
||||
builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -83,7 +87,7 @@ public class ProtobufLogWriter extends WriterBase {
|
|||
output.write(ProtobufLogReader.PB_WAL_MAGIC);
|
||||
boolean doTagCompress = doCompress
|
||||
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
|
||||
buildWALHeader(
|
||||
buildWALHeader(conf,
|
||||
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))
|
||||
.writeDelimitedTo(output);
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ public abstract class ReaderBase implements HLog.Reader {
|
|||
this.fileLength = this.fs.getFileStatus(path).getLen();
|
||||
this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
|
||||
HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
|
||||
initReader(stream);
|
||||
String cellCodecClsName = initReader(stream);
|
||||
|
||||
boolean compression = hasCompression();
|
||||
if (compression) {
|
||||
|
@ -82,7 +82,7 @@ public abstract class ReaderBase implements HLog.Reader {
|
|||
throw new IOException("Failed to initialize CompressionContext", e);
|
||||
}
|
||||
}
|
||||
initAfterCompression();
|
||||
initAfterCompression(cellCodecClsName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -135,13 +135,15 @@ public abstract class ReaderBase implements HLog.Reader {
|
|||
/**
|
||||
* Initializes the log reader with a particular stream (may be null).
|
||||
* Reader assumes ownership of the stream if not null and may use it. Called once.
|
||||
* @return the class name of cell Codec, null if such information is not available
|
||||
*/
|
||||
protected abstract void initReader(FSDataInputStream stream) throws IOException;
|
||||
protected abstract String initReader(FSDataInputStream stream) throws IOException;
|
||||
|
||||
/**
|
||||
* Initializes the compression after the shared stuff has been initialized. Called once.
|
||||
* @param cellCodecClsName class name of cell Codec
|
||||
*/
|
||||
protected abstract void initAfterCompression() throws IOException;
|
||||
protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
|
||||
/**
|
||||
* @return Whether compression is enabled for this log.
|
||||
*/
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.crypto.Cipher;
|
|||
import org.apache.hadoop.hbase.io.crypto.Decryptor;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WALHdrResult;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
|
@ -52,9 +53,10 @@ public class SecureProtobufLogReader extends ProtobufLogReader {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected WALHdrResult readHeader(WALHeader.Builder builder, FSDataInputStream stream)
|
||||
protected WALHdrContext readHeader(WALHeader.Builder builder, FSDataInputStream stream)
|
||||
throws IOException {
|
||||
WALHdrResult result = super.readHeader(builder, stream);
|
||||
WALHdrContext hdrCtxt = super.readHeader(builder, stream);
|
||||
WALHdrResult result = hdrCtxt.getResult();
|
||||
// We need to unconditionally handle the case where the WAL has a key in
|
||||
// the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is
|
||||
// no longer set in the site configuration.
|
||||
|
@ -121,19 +123,19 @@ public class SecureProtobufLogReader extends ProtobufLogReader {
|
|||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
return hdrCtxt;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initAfterCompression() throws IOException {
|
||||
if (decryptor != null) {
|
||||
protected void initAfterCompression(String cellCodecClsName) throws IOException {
|
||||
if (decryptor != null && cellCodecClsName.equals(SecureWALCellCodec.class.getName())) {
|
||||
WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor);
|
||||
this.cellDecoder = codec.getDecoder(this.inputStream);
|
||||
// We do not support compression with WAL encryption
|
||||
this.compressionContext = null;
|
||||
this.hasCompression = false;
|
||||
} else {
|
||||
super.initAfterCompression();
|
||||
super.initAfterCompression(cellCodecClsName);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.security.SecureRandom;
|
|||
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -43,7 +44,8 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter {
|
|||
private Encryptor encryptor = null;
|
||||
|
||||
@Override
|
||||
protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
|
||||
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
|
||||
throws IOException {
|
||||
builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName());
|
||||
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
|
||||
// Get an instance of our cipher
|
||||
|
@ -72,8 +74,8 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter {
|
|||
LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
|
||||
}
|
||||
}
|
||||
|
||||
return super.buildWALHeader(builder);
|
||||
builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
|
||||
return super.buildWALHeader(conf, builder);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -175,16 +175,17 @@ public class SequenceFileLogReader extends ReaderBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void initReader(FSDataInputStream stream) throws IOException {
|
||||
protected String initReader(FSDataInputStream stream) throws IOException {
|
||||
// We don't use the stream because we have to have the magic stream above.
|
||||
if (stream != null) {
|
||||
stream.close();
|
||||
}
|
||||
reset();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initAfterCompression() throws IOException {
|
||||
protected void initAfterCompression(String cellCodecClsName) throws IOException {
|
||||
// Nothing to do here
|
||||
}
|
||||
|
||||
|
|
|
@ -77,20 +77,29 @@ public class WALCellCodec implements Codec {
|
|||
this.compression = compression;
|
||||
}
|
||||
|
||||
static String getWALCellCodecClass(Configuration conf) {
|
||||
return conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and setup a {@link WALCellCodec} from the {@link Configuration} and CompressionContext,
|
||||
* if they have been specified. Fully prepares the codec for use.
|
||||
* Create and setup a {@link WALCellCodec} from the {@link cellCodecClsName} and
|
||||
* CompressionContext, if {@link cellCodecClsName} is specified.
|
||||
* Otherwise Cell Codec classname is read from {@link Configuration}.
|
||||
* Fully prepares the codec for use.
|
||||
* @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
|
||||
* uses a {@link WALCellCodec}.
|
||||
* @param compression compression the codec should use
|
||||
* @return a {@link WALCellCodec} ready for use.
|
||||
* @throws UnsupportedOperationException if the codec cannot be instantiated
|
||||
*/
|
||||
public static WALCellCodec create(Configuration conf, CompressionContext compression)
|
||||
throws UnsupportedOperationException {
|
||||
String className = conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
||||
return ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class,
|
||||
CompressionContext.class }, new Object[] { conf, compression });
|
||||
|
||||
public static WALCellCodec create(Configuration conf, String cellCodecClsName,
|
||||
CompressionContext compression) throws UnsupportedOperationException {
|
||||
if (cellCodecClsName == null) {
|
||||
cellCodecClsName = getWALCellCodecClass(conf);
|
||||
}
|
||||
return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
|
||||
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
|
||||
}
|
||||
|
||||
public interface ByteStringCompressor {
|
||||
|
|
|
@ -53,10 +53,10 @@ public class TestCustomWALCellCodec {
|
|||
Configuration conf = new Configuration(false);
|
||||
conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, CustomWALCellCodec.class,
|
||||
WALCellCodec.class);
|
||||
CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null);
|
||||
CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null, null);
|
||||
assertEquals("Custom codec didn't get initialized with the right configuration!", conf,
|
||||
codec.conf);
|
||||
assertEquals("Custom codec didn't get initialized with the right compression context!", null,
|
||||
codec.context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.MediumTests;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.TestCustomWALCellCodec.CustomWALCellCodec;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||
|
@ -74,6 +75,9 @@ public class TestHLogReaderOnSecureHLog {
|
|||
|
||||
private Path writeWAL(String tblName) throws IOException {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
||||
conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, CustomWALCellCodec.class,
|
||||
WALCellCodec.class);
|
||||
TableName tableName = TableName.valueOf(tblName);
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor(tableName.getName()));
|
||||
|
@ -95,6 +99,8 @@ public class TestHLogReaderOnSecureHLog {
|
|||
}
|
||||
final Path walPath = ((FSHLog) wal).computeFilename();
|
||||
wal.close();
|
||||
// restore the cell codec class
|
||||
conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
|
||||
|
||||
return walPath;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue