HBASE-11620 Record the class name of Writer in WAL header so that only proper Reader can open the WAL file (Ted Yu)
This commit is contained in:
parent
85e317b3ae
commit
e142961099
|
@ -122,6 +122,21 @@ public final class WALProtos {
|
||||||
* <code>optional bool has_tag_compression = 3;</code>
|
* <code>optional bool has_tag_compression = 3;</code>
|
||||||
*/
|
*/
|
||||||
boolean getHasTagCompression();
|
boolean getHasTagCompression();
|
||||||
|
|
||||||
|
// optional string writer_cls_name = 4;
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
boolean hasWriterClsName();
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
java.lang.String getWriterClsName();
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
com.google.protobuf.ByteString
|
||||||
|
getWriterClsNameBytes();
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Protobuf type {@code WALHeader}
|
* Protobuf type {@code WALHeader}
|
||||||
|
@ -189,6 +204,11 @@ public final class WALProtos {
|
||||||
hasTagCompression_ = input.readBool();
|
hasTagCompression_ = input.readBool();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 34: {
|
||||||
|
bitField0_ |= 0x00000008;
|
||||||
|
writerClsName_ = input.readBytes();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||||
|
@ -277,10 +297,54 @@ public final class WALProtos {
|
||||||
return hasTagCompression_;
|
return hasTagCompression_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional string writer_cls_name = 4;
|
||||||
|
public static final int WRITER_CLS_NAME_FIELD_NUMBER = 4;
|
||||||
|
private java.lang.Object writerClsName_;
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public boolean hasWriterClsName() {
|
||||||
|
return ((bitField0_ & 0x00000008) == 0x00000008);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public java.lang.String getWriterClsName() {
|
||||||
|
java.lang.Object ref = writerClsName_;
|
||||||
|
if (ref instanceof java.lang.String) {
|
||||||
|
return (java.lang.String) ref;
|
||||||
|
} else {
|
||||||
|
com.google.protobuf.ByteString bs =
|
||||||
|
(com.google.protobuf.ByteString) ref;
|
||||||
|
java.lang.String s = bs.toStringUtf8();
|
||||||
|
if (bs.isValidUtf8()) {
|
||||||
|
writerClsName_ = s;
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public com.google.protobuf.ByteString
|
||||||
|
getWriterClsNameBytes() {
|
||||||
|
java.lang.Object ref = writerClsName_;
|
||||||
|
if (ref instanceof java.lang.String) {
|
||||||
|
com.google.protobuf.ByteString b =
|
||||||
|
com.google.protobuf.ByteString.copyFromUtf8(
|
||||||
|
(java.lang.String) ref);
|
||||||
|
writerClsName_ = b;
|
||||||
|
return b;
|
||||||
|
} else {
|
||||||
|
return (com.google.protobuf.ByteString) ref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void initFields() {
|
private void initFields() {
|
||||||
hasCompression_ = false;
|
hasCompression_ = false;
|
||||||
encryptionKey_ = com.google.protobuf.ByteString.EMPTY;
|
encryptionKey_ = com.google.protobuf.ByteString.EMPTY;
|
||||||
hasTagCompression_ = false;
|
hasTagCompression_ = false;
|
||||||
|
writerClsName_ = "";
|
||||||
}
|
}
|
||||||
private byte memoizedIsInitialized = -1;
|
private byte memoizedIsInitialized = -1;
|
||||||
public final boolean isInitialized() {
|
public final boolean isInitialized() {
|
||||||
|
@ -303,6 +367,9 @@ public final class WALProtos {
|
||||||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||||
output.writeBool(3, hasTagCompression_);
|
output.writeBool(3, hasTagCompression_);
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
output.writeBytes(4, getWriterClsNameBytes());
|
||||||
|
}
|
||||||
getUnknownFields().writeTo(output);
|
getUnknownFields().writeTo(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,6 +391,10 @@ public final class WALProtos {
|
||||||
size += com.google.protobuf.CodedOutputStream
|
size += com.google.protobuf.CodedOutputStream
|
||||||
.computeBoolSize(3, hasTagCompression_);
|
.computeBoolSize(3, hasTagCompression_);
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeBytesSize(4, getWriterClsNameBytes());
|
||||||
|
}
|
||||||
size += getUnknownFields().getSerializedSize();
|
size += getUnknownFields().getSerializedSize();
|
||||||
memoizedSerializedSize = size;
|
memoizedSerializedSize = size;
|
||||||
return size;
|
return size;
|
||||||
|
@ -362,6 +433,11 @@ public final class WALProtos {
|
||||||
result = result && (getHasTagCompression()
|
result = result && (getHasTagCompression()
|
||||||
== other.getHasTagCompression());
|
== other.getHasTagCompression());
|
||||||
}
|
}
|
||||||
|
result = result && (hasWriterClsName() == other.hasWriterClsName());
|
||||||
|
if (hasWriterClsName()) {
|
||||||
|
result = result && getWriterClsName()
|
||||||
|
.equals(other.getWriterClsName());
|
||||||
|
}
|
||||||
result = result &&
|
result = result &&
|
||||||
getUnknownFields().equals(other.getUnknownFields());
|
getUnknownFields().equals(other.getUnknownFields());
|
||||||
return result;
|
return result;
|
||||||
|
@ -387,6 +463,10 @@ public final class WALProtos {
|
||||||
hash = (37 * hash) + HAS_TAG_COMPRESSION_FIELD_NUMBER;
|
hash = (37 * hash) + HAS_TAG_COMPRESSION_FIELD_NUMBER;
|
||||||
hash = (53 * hash) + hashBoolean(getHasTagCompression());
|
hash = (53 * hash) + hashBoolean(getHasTagCompression());
|
||||||
}
|
}
|
||||||
|
if (hasWriterClsName()) {
|
||||||
|
hash = (37 * hash) + WRITER_CLS_NAME_FIELD_NUMBER;
|
||||||
|
hash = (53 * hash) + getWriterClsName().hashCode();
|
||||||
|
}
|
||||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||||
memoizedHashCode = hash;
|
memoizedHashCode = hash;
|
||||||
return hash;
|
return hash;
|
||||||
|
@ -502,6 +582,8 @@ public final class WALProtos {
|
||||||
bitField0_ = (bitField0_ & ~0x00000002);
|
bitField0_ = (bitField0_ & ~0x00000002);
|
||||||
hasTagCompression_ = false;
|
hasTagCompression_ = false;
|
||||||
bitField0_ = (bitField0_ & ~0x00000004);
|
bitField0_ = (bitField0_ & ~0x00000004);
|
||||||
|
writerClsName_ = "";
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -542,6 +624,10 @@ public final class WALProtos {
|
||||||
to_bitField0_ |= 0x00000004;
|
to_bitField0_ |= 0x00000004;
|
||||||
}
|
}
|
||||||
result.hasTagCompression_ = hasTagCompression_;
|
result.hasTagCompression_ = hasTagCompression_;
|
||||||
|
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
|
||||||
|
to_bitField0_ |= 0x00000008;
|
||||||
|
}
|
||||||
|
result.writerClsName_ = writerClsName_;
|
||||||
result.bitField0_ = to_bitField0_;
|
result.bitField0_ = to_bitField0_;
|
||||||
onBuilt();
|
onBuilt();
|
||||||
return result;
|
return result;
|
||||||
|
@ -567,6 +653,11 @@ public final class WALProtos {
|
||||||
if (other.hasHasTagCompression()) {
|
if (other.hasHasTagCompression()) {
|
||||||
setHasTagCompression(other.getHasTagCompression());
|
setHasTagCompression(other.getHasTagCompression());
|
||||||
}
|
}
|
||||||
|
if (other.hasWriterClsName()) {
|
||||||
|
bitField0_ |= 0x00000008;
|
||||||
|
writerClsName_ = other.writerClsName_;
|
||||||
|
onChanged();
|
||||||
|
}
|
||||||
this.mergeUnknownFields(other.getUnknownFields());
|
this.mergeUnknownFields(other.getUnknownFields());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -696,6 +787,80 @@ public final class WALProtos {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional string writer_cls_name = 4;
|
||||||
|
private java.lang.Object writerClsName_ = "";
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public boolean hasWriterClsName() {
|
||||||
|
return ((bitField0_ & 0x00000008) == 0x00000008);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public java.lang.String getWriterClsName() {
|
||||||
|
java.lang.Object ref = writerClsName_;
|
||||||
|
if (!(ref instanceof java.lang.String)) {
|
||||||
|
java.lang.String s = ((com.google.protobuf.ByteString) ref)
|
||||||
|
.toStringUtf8();
|
||||||
|
writerClsName_ = s;
|
||||||
|
return s;
|
||||||
|
} else {
|
||||||
|
return (java.lang.String) ref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public com.google.protobuf.ByteString
|
||||||
|
getWriterClsNameBytes() {
|
||||||
|
java.lang.Object ref = writerClsName_;
|
||||||
|
if (ref instanceof String) {
|
||||||
|
com.google.protobuf.ByteString b =
|
||||||
|
com.google.protobuf.ByteString.copyFromUtf8(
|
||||||
|
(java.lang.String) ref);
|
||||||
|
writerClsName_ = b;
|
||||||
|
return b;
|
||||||
|
} else {
|
||||||
|
return (com.google.protobuf.ByteString) ref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public Builder setWriterClsName(
|
||||||
|
java.lang.String value) {
|
||||||
|
if (value == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
bitField0_ |= 0x00000008;
|
||||||
|
writerClsName_ = value;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public Builder clearWriterClsName() {
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
|
writerClsName_ = getDefaultInstance().getWriterClsName();
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <code>optional string writer_cls_name = 4;</code>
|
||||||
|
*/
|
||||||
|
public Builder setWriterClsNameBytes(
|
||||||
|
com.google.protobuf.ByteString value) {
|
||||||
|
if (value == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
bitField0_ |= 0x00000008;
|
||||||
|
writerClsName_ = value;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
// @@protoc_insertion_point(builder_scope:WALHeader)
|
// @@protoc_insertion_point(builder_scope:WALHeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7664,36 +7829,37 @@ public final class WALProtos {
|
||||||
descriptor;
|
descriptor;
|
||||||
static {
|
static {
|
||||||
java.lang.String[] descriptorData = {
|
java.lang.String[] descriptorData = {
|
||||||
"\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" +
|
"\n\tWAL.proto\032\013HBase.proto\"r\n\tWALHeader\022\027\n" +
|
||||||
"\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
|
"\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
|
||||||
"\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\240\002\n\006" +
|
"\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\022\027\n\017w" +
|
||||||
"WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" +
|
"riter_cls_name\030\004 \001(\t\"\240\002\n\006WALKey\022\033\n\023encod" +
|
||||||
"able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" +
|
"ed_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014" +
|
||||||
" \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" +
|
"\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite_t" +
|
||||||
" \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" +
|
"ime\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002\030\001" +
|
||||||
"Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" +
|
"\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022follow" +
|
||||||
"ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
|
"ing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132" +
|
||||||
"\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
|
"\005.UUID\022\022\n\nnonceGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(",
|
||||||
"\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" +
|
"\004\022\034\n\024orig_sequence_number\030\013 \001(\004\"=\n\013Famil" +
|
||||||
"\nscope_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024Compac" +
|
"yScope\022\016\n\006family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002" +
|
||||||
"tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
|
"(\0162\n.ScopeType\"\276\001\n\024CompactionDescriptor\022" +
|
||||||
"coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
|
"\022\n\ntable_name\030\001 \002(\014\022\033\n\023encoded_region_na" +
|
||||||
" \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
|
"me\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020compact" +
|
||||||
"tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
|
"ion_input\030\004 \003(\t\022\031\n\021compaction_output\030\005 \003" +
|
||||||
"\t\022\023\n\013region_name\030\007 \001(\014\"\353\002\n\017FlushDescript" +
|
"(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region_nam" +
|
||||||
"or\022,\n\006action\030\001 \002(\0162\034.FlushDescriptor.Flu" +
|
"e\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006action\030\001 " +
|
||||||
"shAction\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_" +
|
"\002(\0162\034.FlushDescriptor.FlushAction\022\022\n\ntab" +
|
||||||
"region_name\030\003 \002(\014\022\035\n\025flush_sequence_numb",
|
"le_name\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002",
|
||||||
"er\030\004 \001(\004\022<\n\rstore_flushes\030\005 \003(\0132%.FlushD" +
|
"(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n\rsto" +
|
||||||
"escriptor.StoreFlushDescriptor\032Y\n\024StoreF" +
|
"re_flushes\030\005 \003(\0132%.FlushDescriptor.Store" +
|
||||||
"lushDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016s" +
|
"FlushDescriptor\032Y\n\024StoreFlushDescriptor\022" +
|
||||||
"tore_home_dir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(" +
|
"\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002" +
|
||||||
"\t\"A\n\013FlushAction\022\017\n\013START_FLUSH\020\000\022\020\n\014COM" +
|
" \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushActio" +
|
||||||
"MIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"\014\n\nWALTrail" +
|
"n\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013" +
|
||||||
"er*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE_LOC" +
|
"ABORT_FLUSH\020\002\"\014\n\nWALTrailer*F\n\tScopeType" +
|
||||||
"AL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*or" +
|
"\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICA" +
|
||||||
"g.apache.hadoop.hbase.protobuf.generated" +
|
"TION_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop" +
|
||||||
"B\tWALProtosH\001\210\001\000\240\001\001"
|
".hbase.protobuf.generatedB\tWALProtosH\001\210\001",
|
||||||
|
"\000\240\001\001"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
@ -7705,7 +7871,7 @@ public final class WALProtos {
|
||||||
internal_static_WALHeader_fieldAccessorTable = new
|
internal_static_WALHeader_fieldAccessorTable = new
|
||||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||||
internal_static_WALHeader_descriptor,
|
internal_static_WALHeader_descriptor,
|
||||||
new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", });
|
new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", "WriterClsName", });
|
||||||
internal_static_WALKey_descriptor =
|
internal_static_WALKey_descriptor =
|
||||||
getDescriptor().getMessageTypes().get(1);
|
getDescriptor().getMessageTypes().get(1);
|
||||||
internal_static_WALKey_fieldAccessorTable = new
|
internal_static_WALKey_fieldAccessorTable = new
|
||||||
|
|
|
@ -27,6 +27,7 @@ message WALHeader {
|
||||||
optional bool has_compression = 1;
|
optional bool has_compression = 1;
|
||||||
optional bytes encryption_key = 2;
|
optional bytes encryption_key = 2;
|
||||||
optional bool has_tag_compression = 3;
|
optional bool has_tag_compression = 3;
|
||||||
|
optional string writer_cls_name = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs
|
// Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs
|
||||||
|
|
|
@ -168,6 +168,10 @@ public class HLogFactory {
|
||||||
*/
|
*/
|
||||||
private static Class<? extends Writer> logWriterClass;
|
private static Class<? extends Writer> logWriterClass;
|
||||||
|
|
||||||
|
static void resetLogWriterClass() {
|
||||||
|
logWriterClass = null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a writer for the WAL.
|
* Create a writer for the WAL.
|
||||||
* @return A WAL writer. Close when done with it.
|
* @return A WAL writer. Close when done with it.
|
||||||
|
|
|
@ -23,7 +23,9 @@ import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -66,7 +68,16 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
// in the hlog, the inputstream's position is equal to walEditsStopOffset.
|
// in the hlog, the inputstream's position is equal to walEditsStopOffset.
|
||||||
private long walEditsStopOffset;
|
private long walEditsStopOffset;
|
||||||
private boolean trailerPresent;
|
private boolean trailerPresent;
|
||||||
|
private static List<String> writerClsNames = new ArrayList<String>();
|
||||||
|
static {
|
||||||
|
writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
|
||||||
|
}
|
||||||
|
|
||||||
|
enum WALHdrResult {
|
||||||
|
EOF, // stream is at EOF when method starts
|
||||||
|
SUCCESS,
|
||||||
|
UNKNOWN_WRITER_CLS // name of writer class isn't recognized
|
||||||
|
}
|
||||||
public ProtobufLogReader() {
|
public ProtobufLogReader() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
@ -95,11 +106,26 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
initInternal(stream, true);
|
initInternal(stream, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean readHeader(Builder builder, FSDataInputStream stream) throws IOException {
|
/*
|
||||||
return builder.mergeDelimitedFrom(stream);
|
* Returns names of the accepted writer classes
|
||||||
|
*/
|
||||||
|
protected List<String> getWriterClsNames() {
|
||||||
|
return writerClsNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initInternal(FSDataInputStream stream, boolean isFirst) throws IOException {
|
protected WALHdrResult readHeader(Builder builder, FSDataInputStream stream)
|
||||||
|
throws IOException {
|
||||||
|
boolean res = builder.mergeDelimitedFrom(stream);
|
||||||
|
if (!res) return WALHdrResult.EOF;
|
||||||
|
if (builder.hasWriterClsName() &&
|
||||||
|
!getWriterClsNames().contains(builder.getWriterClsName())) {
|
||||||
|
return WALHdrResult.UNKNOWN_WRITER_CLS;
|
||||||
|
}
|
||||||
|
return WALHdrResult.SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initInternal(FSDataInputStream stream, boolean isFirst)
|
||||||
|
throws IOException {
|
||||||
close();
|
close();
|
||||||
long expectedPos = PB_WAL_MAGIC.length;
|
long expectedPos = PB_WAL_MAGIC.length;
|
||||||
if (stream == null) {
|
if (stream == null) {
|
||||||
|
@ -111,10 +137,13 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
}
|
}
|
||||||
// Initialize metadata or, when we reset, just skip the header.
|
// Initialize metadata or, when we reset, just skip the header.
|
||||||
WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
|
WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
|
||||||
boolean hasHeader = readHeader(builder, stream);
|
WALHdrResult walHdrRes = readHeader(builder, stream);
|
||||||
if (!hasHeader) {
|
if (walHdrRes == WALHdrResult.EOF) {
|
||||||
throw new EOFException("Couldn't read WAL PB header");
|
throw new EOFException("Couldn't read WAL PB header");
|
||||||
}
|
}
|
||||||
|
if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
|
||||||
|
throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
|
||||||
|
}
|
||||||
if (isFirst) {
|
if (isFirst) {
|
||||||
WALProtos.WALHeader header = builder.build();
|
WALProtos.WALHeader header = builder.build();
|
||||||
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
|
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
|
||||||
|
|
|
@ -59,6 +59,9 @@ public class ProtobufLogWriter extends WriterBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
|
protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
|
||||||
|
if (!builder.hasWriterClsName()) {
|
||||||
|
builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
|
||||||
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.Key;
|
import java.security.Key;
|
||||||
import java.security.KeyException;
|
import java.security.KeyException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -38,15 +40,25 @@ public class SecureProtobufLogReader extends ProtobufLogReader {
|
||||||
private static final Log LOG = LogFactory.getLog(SecureProtobufLogReader.class);
|
private static final Log LOG = LogFactory.getLog(SecureProtobufLogReader.class);
|
||||||
|
|
||||||
private Decryptor decryptor = null;
|
private Decryptor decryptor = null;
|
||||||
|
private static List<String> writerClsNames = new ArrayList<String>();
|
||||||
|
static {
|
||||||
|
writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
|
||||||
|
writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean readHeader(WALHeader.Builder builder, FSDataInputStream stream)
|
protected List<String> getWriterClsNames() {
|
||||||
|
return writerClsNames;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected WALHdrResult readHeader(WALHeader.Builder builder, FSDataInputStream stream)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean result = super.readHeader(builder, stream);
|
WALHdrResult result = super.readHeader(builder, stream);
|
||||||
// We need to unconditionally handle the case where the WAL has a key in
|
// We need to unconditionally handle the case where the WAL has a key in
|
||||||
// the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is
|
// the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is
|
||||||
// no longer set in the site configuration.
|
// no longer set in the site configuration.
|
||||||
if (result && builder.hasEncryptionKey()) {
|
if (result == WALHdrResult.SUCCESS && builder.hasEncryptionKey()) {
|
||||||
// Serialized header data has been merged into the builder from the
|
// Serialized header data has been merged into the builder from the
|
||||||
// stream.
|
// stream.
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
|
protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException {
|
||||||
|
builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName());
|
||||||
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
|
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
|
||||||
// Get an instance of our cipher
|
// Get an instance of our cipher
|
||||||
Cipher cipher = Encryption.getCipher(conf,
|
Cipher cipher = Encryption.getCipher(conf,
|
||||||
|
|
|
@ -0,0 +1,187 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test that verifies WAL written by SecureProtobufLogWriter is not readable by ProtobufLogReader
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestHLogReaderOnSecureHLog {
|
||||||
|
static final Log LOG = LogFactory.getLog(TestHLogReaderOnSecureHLog.class);
|
||||||
|
static {
|
||||||
|
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
|
||||||
|
.getLogger().setLevel(Level.ALL);
|
||||||
|
};
|
||||||
|
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
final byte[] value = Bytes.toBytes("Test value");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||||
|
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||||
|
conf.setBoolean("hbase.hlog.split.skip.errors", true);
|
||||||
|
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path writeWAL(String tblName) throws IOException {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
TableName tableName = TableName.valueOf(tblName);
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
htd.addFamily(new HColumnDescriptor(tableName.getName()));
|
||||||
|
HRegionInfo regioninfo = new HRegionInfo(tableName,
|
||||||
|
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
|
||||||
|
final int total = 10;
|
||||||
|
final byte[] row = Bytes.toBytes("row");
|
||||||
|
final byte[] family = Bytes.toBytes("family");
|
||||||
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
Path logDir = TEST_UTIL.getDataTestDir(tblName);
|
||||||
|
final AtomicLong sequenceId = new AtomicLong(1);
|
||||||
|
|
||||||
|
// Write the WAL
|
||||||
|
FSHLog wal = new FSHLog(fs, TEST_UTIL.getDataTestDir(), logDir.toString(), conf);
|
||||||
|
for (int i = 0; i < total; i++) {
|
||||||
|
WALEdit kvs = new WALEdit();
|
||||||
|
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
|
||||||
|
wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
|
||||||
|
}
|
||||||
|
final Path walPath = ((FSHLog) wal).computeFilename();
|
||||||
|
wal.close();
|
||||||
|
|
||||||
|
return walPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test()
|
||||||
|
public void testHLogReaderOnSecureHLog() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
HLogFactory.resetLogReaderClass();
|
||||||
|
HLogFactory.resetLogWriterClass();
|
||||||
|
conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
|
||||||
|
HLog.Reader.class);
|
||||||
|
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
|
||||||
|
HLog.Writer.class);
|
||||||
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
Path walPath = writeWAL("testHLogReaderOnSecureHLog");
|
||||||
|
|
||||||
|
// Insure edits are not plaintext
|
||||||
|
long length = fs.getFileStatus(walPath).getLen();
|
||||||
|
FSDataInputStream in = fs.open(walPath);
|
||||||
|
byte[] fileData = new byte[(int)length];
|
||||||
|
IOUtils.readFully(in, fileData);
|
||||||
|
in.close();
|
||||||
|
assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
|
||||||
|
|
||||||
|
// Confirm the WAL cannot be read back by ProtobufLogReader
|
||||||
|
try {
|
||||||
|
HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath, conf);
|
||||||
|
assertFalse(true);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// expected IOE
|
||||||
|
}
|
||||||
|
|
||||||
|
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
|
||||||
|
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
|
||||||
|
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||||
|
Path rootdir = FSUtils.getRootDir(conf);
|
||||||
|
try {
|
||||||
|
HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode);
|
||||||
|
s.splitLogFile(listStatus[0], null);
|
||||||
|
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
|
||||||
|
"corrupt");
|
||||||
|
assertTrue(fs.exists(file));
|
||||||
|
// assertFalse("log splitting should have failed", true);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
assertTrue("WAL should have been sidelined", false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test()
|
||||||
|
public void testSecureHLogReaderOnHLog() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
HLogFactory.resetLogReaderClass();
|
||||||
|
HLogFactory.resetLogWriterClass();
|
||||||
|
conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
|
||||||
|
HLog.Reader.class);
|
||||||
|
conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
|
||||||
|
HLog.Writer.class);
|
||||||
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
Path walPath = writeWAL("testSecureHLogReaderOnHLog");
|
||||||
|
|
||||||
|
// Ensure edits are plaintext
|
||||||
|
long length = fs.getFileStatus(walPath).getLen();
|
||||||
|
FSDataInputStream in = fs.open(walPath);
|
||||||
|
byte[] fileData = new byte[(int)length];
|
||||||
|
IOUtils.readFully(in, fileData);
|
||||||
|
in.close();
|
||||||
|
assertTrue("Cells should be plaintext", Bytes.contains(fileData, value));
|
||||||
|
|
||||||
|
// Confirm the WAL can be read back by SecureProtobufLogReader
|
||||||
|
try {
|
||||||
|
HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath, conf);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
assertFalse(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
|
||||||
|
RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
|
||||||
|
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
|
||||||
|
Path rootdir = FSUtils.getRootDir(conf);
|
||||||
|
try {
|
||||||
|
HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode);
|
||||||
|
s.splitLogFile(listStatus[0], null);
|
||||||
|
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
|
||||||
|
"corrupt");
|
||||||
|
assertTrue(!fs.exists(file));
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
assertTrue("WAL should have been processed", false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue