From e142961099cda5b3f733cd2239cb22ce150f5c08 Mon Sep 17 00:00:00 2001 From: Ted Yu Date: Fri, 1 Aug 2014 17:15:32 +0000 Subject: [PATCH] HBASE-11620 Record the class name of Writer in WAL header so that only proper Reader can open the WAL file (Ted Yu) --- .../hbase/protobuf/generated/WALProtos.java | 226 +++++++++++++++--- hbase-protocol/src/main/protobuf/WAL.proto | 1 + .../hbase/regionserver/wal/HLogFactory.java | 4 + .../regionserver/wal/ProtobufLogReader.java | 39 ++- .../regionserver/wal/ProtobufLogWriter.java | 3 + .../wal/SecureProtobufLogReader.java | 18 +- .../wal/SecureProtobufLogWriter.java | 1 + .../wal/TestHLogReaderOnSecureHLog.java | 187 +++++++++++++++ 8 files changed, 441 insertions(+), 38 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index 19f5690c4cb..3d0d1d06d6b 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -122,6 +122,21 @@ public final class WALProtos { * optional bool has_tag_compression = 3; */ boolean getHasTagCompression(); + + // optional string writer_cls_name = 4; + /** + * optional string writer_cls_name = 4; + */ + boolean hasWriterClsName(); + /** + * optional string writer_cls_name = 4; + */ + java.lang.String getWriterClsName(); + /** + * optional string writer_cls_name = 4; + */ + com.google.protobuf.ByteString + getWriterClsNameBytes(); } /** * Protobuf type {@code WALHeader} @@ -189,6 +204,11 @@ public final class WALProtos { hasTagCompression_ = input.readBool(); break; } + case 34: { + bitField0_ |= 0x00000008; + writerClsName_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -277,10 +297,54 @@ public final class WALProtos { return hasTagCompression_; } + // optional string writer_cls_name = 4; + public static final int WRITER_CLS_NAME_FIELD_NUMBER = 4; + private java.lang.Object writerClsName_; + /** + * optional string writer_cls_name = 4; + */ + public boolean hasWriterClsName() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional string writer_cls_name = 4; + */ + public java.lang.String getWriterClsName() { + java.lang.Object ref = writerClsName_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + writerClsName_ = s; + } + return s; + } + } + /** + * optional string writer_cls_name = 4; + */ + public com.google.protobuf.ByteString + getWriterClsNameBytes() { + java.lang.Object ref = writerClsName_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + writerClsName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { hasCompression_ = false; encryptionKey_ = com.google.protobuf.ByteString.EMPTY; hasTagCompression_ = false; + writerClsName_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -303,6 +367,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(3, hasTagCompression_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(4, getWriterClsNameBytes()); + } getUnknownFields().writeTo(output); } @@ -324,6 +391,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, hasTagCompression_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(4, getWriterClsNameBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -362,6 +433,11 @@ public final class WALProtos { result = result && (getHasTagCompression() == other.getHasTagCompression()); } + result = result && (hasWriterClsName() == other.hasWriterClsName()); + if (hasWriterClsName()) { + result = result && getWriterClsName() + .equals(other.getWriterClsName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -387,6 +463,10 @@ public final class WALProtos { hash = (37 * hash) + HAS_TAG_COMPRESSION_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getHasTagCompression()); } + if (hasWriterClsName()) { + hash = (37 * hash) + WRITER_CLS_NAME_FIELD_NUMBER; + hash = (53 * hash) + getWriterClsName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -502,6 +582,8 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000002); hasTagCompression_ = false; bitField0_ = (bitField0_ & ~0x00000004); + writerClsName_ = ""; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -542,6 +624,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000004; } result.hasTagCompression_ = hasTagCompression_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.writerClsName_ = writerClsName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -567,6 +653,11 @@ public final class WALProtos { if (other.hasHasTagCompression()) { setHasTagCompression(other.getHasTagCompression()); } + if (other.hasWriterClsName()) { + bitField0_ |= 0x00000008; + writerClsName_ = other.writerClsName_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -696,6 +787,80 @@ public final class WALProtos { return this; } + // optional string writer_cls_name = 4; + private java.lang.Object writerClsName_ = ""; + /** + * optional string writer_cls_name = 4; + */ + public boolean hasWriterClsName() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional string writer_cls_name = 4; + */ + public java.lang.String getWriterClsName() { + java.lang.Object ref = writerClsName_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + writerClsName_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string writer_cls_name = 4; + */ + public com.google.protobuf.ByteString + getWriterClsNameBytes() { + java.lang.Object ref = writerClsName_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + writerClsName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string writer_cls_name = 4; + */ + public Builder setWriterClsName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + writerClsName_ = value; + onChanged(); + return this; + } + /** + * optional string writer_cls_name = 4; + */ + public Builder clearWriterClsName() { + bitField0_ = (bitField0_ & ~0x00000008); + writerClsName_ = getDefaultInstance().getWriterClsName(); + onChanged(); + return this; + } + /** + * optional string writer_cls_name = 4; + */ + public Builder setWriterClsNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + writerClsName_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALHeader) } @@ -7664,36 +7829,37 @@ public final class WALProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" + + "\n\tWAL.proto\032\013HBase.proto\"r\n\tWALHeader\022\027\n" + "\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" + - "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\240\002\n\006" + - "WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" + - "able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" + - " \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" + - " \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" + - "Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" + - "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" + - "\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number", - "\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" + - "\nscope_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024Compac" + - "tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" + - "coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" + - " \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" + - "tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" + - "\t\022\023\n\013region_name\030\007 \001(\014\"\353\002\n\017FlushDescript" + - "or\022,\n\006action\030\001 \002(\0162\034.FlushDescriptor.Flu" + - "shAction\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_" + - "region_name\030\003 \002(\014\022\035\n\025flush_sequence_numb", - "er\030\004 \001(\004\022<\n\rstore_flushes\030\005 \003(\0132%.FlushD" + - "escriptor.StoreFlushDescriptor\032Y\n\024StoreF" + - "lushDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016s" + - "tore_home_dir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(" + - "\t\"A\n\013FlushAction\022\017\n\013START_FLUSH\020\000\022\020\n\014COM" + - "MIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"\014\n\nWALTrail" + - "er*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE_LOC" + - "AL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*or" + - "g.apache.hadoop.hbase.protobuf.generated" + - "B\tWALProtosH\001\210\001\000\240\001\001" + "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\022\027\n\017w" + + "riter_cls_name\030\004 \001(\t\"\240\002\n\006WALKey\022\033\n\023encod" + + "ed_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014" + + "\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite_t" + + "ime\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002\030\001" + + "\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022follow" + + "ing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132" + + "\005.UUID\022\022\n\nnonceGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(", + "\004\022\034\n\024orig_sequence_number\030\013 \001(\004\"=\n\013Famil" + + "yScope\022\016\n\006family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002" + + "(\0162\n.ScopeType\"\276\001\n\024CompactionDescriptor\022" + + "\022\n\ntable_name\030\001 \002(\014\022\033\n\023encoded_region_na" + + "me\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020compact" + + "ion_input\030\004 \003(\t\022\031\n\021compaction_output\030\005 \003" + + "(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region_nam" + + "e\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006action\030\001 " + + "\002(\0162\034.FlushDescriptor.FlushAction\022\022\n\ntab" + + "le_name\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002", + "(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n\rsto" + + "re_flushes\030\005 \003(\0132%.FlushDescriptor.Store" + + "FlushDescriptor\032Y\n\024StoreFlushDescriptor\022" + + "\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002" + + " \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushActio" + + "n\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013" + + "ABORT_FLUSH\020\002\"\014\n\nWALTrailer*F\n\tScopeType" + + "\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICA" + + "TION_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop" + + ".hbase.protobuf.generatedB\tWALProtosH\001\210\001", + "\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7705,7 +7871,7 @@ public final class WALProtos { internal_static_WALHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALHeader_descriptor, - new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", }); + new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", "WriterClsName", }); internal_static_WALKey_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_WALKey_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index f14d5f4343f..6c05c75f75b 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -27,6 +27,7 @@ message WALHeader { optional bool has_compression = 1; optional bytes encryption_key = 2; optional bool has_tag_compression = 3; + optional string writer_cls_name = 4; } // Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index 435a3e38454..276e16cb794 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -168,6 +168,10 @@ public class HLogFactory { */ private static Class logWriterClass; + static void resetLogWriterClass() { + logWriterClass = null; + } + /** * Create a writer for the WAL. * @return A WAL writer. Close when done with it. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 11ef770604a..8f0f1c0168d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -23,7 +23,9 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,7 +68,16 @@ public class ProtobufLogReader extends ReaderBase { // in the hlog, the inputstream's position is equal to walEditsStopOffset. private long walEditsStopOffset; private boolean trailerPresent; + private static List writerClsNames = new ArrayList(); + static { + writerClsNames.add(ProtobufLogWriter.class.getSimpleName()); + } + enum WALHdrResult { + EOF, // stream is at EOF when method starts + SUCCESS, + UNKNOWN_WRITER_CLS // name of writer class isn't recognized + } public ProtobufLogReader() { super(); } @@ -95,11 +106,26 @@ public class ProtobufLogReader extends ReaderBase { initInternal(stream, true); } - protected boolean readHeader(Builder builder, FSDataInputStream stream) throws IOException { - return builder.mergeDelimitedFrom(stream); + /* + * Returns names of the accepted writer classes + */ + protected List getWriterClsNames() { + return writerClsNames; } - private void initInternal(FSDataInputStream stream, boolean isFirst) throws IOException { + protected WALHdrResult readHeader(Builder builder, FSDataInputStream stream) + throws IOException { + boolean res = builder.mergeDelimitedFrom(stream); + if (!res) return WALHdrResult.EOF; + if (builder.hasWriterClsName() && + !getWriterClsNames().contains(builder.getWriterClsName())) { + return WALHdrResult.UNKNOWN_WRITER_CLS; + } + return WALHdrResult.SUCCESS; + } + + private void initInternal(FSDataInputStream stream, boolean isFirst) + throws IOException { close(); long expectedPos = PB_WAL_MAGIC.length; if (stream == null) { @@ -111,10 +137,13 @@ public class ProtobufLogReader extends ReaderBase { } // Initialize metadata or, when we reset, just skip the header. WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder(); - boolean hasHeader = readHeader(builder, stream); - if (!hasHeader) { + WALHdrResult walHdrRes = readHeader(builder, stream); + if (walHdrRes == WALHdrResult.EOF) { throw new EOFException("Couldn't read WAL PB header"); } + if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) { + throw new IOException("Got unknown writer class: " + builder.getWriterClsName()); + } if (isFirst) { WALProtos.WALHeader header = builder.build(); this.hasCompression = header.hasHasCompression() && header.getHasCompression(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 682b95487b5..74552f4e98c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -59,6 +59,9 @@ public class ProtobufLogWriter extends WriterBase { } protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException { + if (!builder.hasWriterClsName()) { + builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName()); + } return builder.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java index 0c9de4b28df..7b025f86701 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.security.Key; import java.security.KeyException; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,15 +40,25 @@ public class SecureProtobufLogReader extends ProtobufLogReader { private static final Log LOG = LogFactory.getLog(SecureProtobufLogReader.class); private Decryptor decryptor = null; + private static List writerClsNames = new ArrayList(); + static { + writerClsNames.add(ProtobufLogWriter.class.getSimpleName()); + writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName()); + } @Override - protected boolean readHeader(WALHeader.Builder builder, FSDataInputStream stream) + protected List getWriterClsNames() { + return writerClsNames; + } + + @Override + protected WALHdrResult readHeader(WALHeader.Builder builder, FSDataInputStream stream) throws IOException { - boolean result = super.readHeader(builder, stream); + WALHdrResult result = super.readHeader(builder, stream); // We need to unconditionally handle the case where the WAL has a key in // the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is // no longer set in the site configuration. - if (result && builder.hasEncryptionKey()) { + if (result == WALHdrResult.SUCCESS && builder.hasEncryptionKey()) { // Serialized header data has been merged into the builder from the // stream. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java index ee100fd9fa8..fa9538849bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java @@ -44,6 +44,7 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter { @Override protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException { + builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName()); if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) { // Get an instance of our cipher Cipher cipher = Encryption.getCipher(conf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java new file mode 100644 index 00000000000..02d59f9e7f6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.log4j.Level; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/* + * Test that verifies WAL written by SecureProtobufLogWriter is not readable by ProtobufLogReader + */ +@Category(MediumTests.class) +public class TestHLogReaderOnSecureHLog { + static final Log LOG = LogFactory.getLog(TestHLogReaderOnSecureHLog.class); + static { + ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal")) + .getLogger().setLevel(Level.ALL); + }; + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + final byte[] value = Bytes.toBytes("Test value"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); + conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); + conf.setBoolean("hbase.hlog.split.skip.errors", true); + conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); + } + + private Path writeWAL(String tblName) throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + TableName tableName = TableName.valueOf(tblName); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(tableName.getName())); + HRegionInfo regioninfo = new HRegionInfo(tableName, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); + final int total = 10; + final byte[] row = Bytes.toBytes("row"); + final byte[] family = Bytes.toBytes("family"); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path logDir = TEST_UTIL.getDataTestDir(tblName); + final AtomicLong sequenceId = new AtomicLong(1); + + // Write the WAL + FSHLog wal = new FSHLog(fs, TEST_UTIL.getDataTestDir(), logDir.toString(), conf); + for (int i = 0; i < total; i++) { + WALEdit kvs = new WALEdit(); + kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); + wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId); + } + final Path walPath = ((FSHLog) wal).computeFilename(); + wal.close(); + + return walPath; + } + + @Test() + public void testHLogReaderOnSecureHLog() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + HLogFactory.resetLogReaderClass(); + HLogFactory.resetLogWriterClass(); + conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, + HLog.Reader.class); + conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, + HLog.Writer.class); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path walPath = writeWAL("testHLogReaderOnSecureHLog"); + + // Insure edits are not plaintext + long length = fs.getFileStatus(walPath).getLen(); + FSDataInputStream in = fs.open(walPath); + byte[] fileData = new byte[(int)length]; + IOUtils.readFully(in, fileData); + in.close(); + assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value)); + + // Confirm the WAL cannot be read back by ProtobufLogReader + try { + HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath, conf); + assertFalse(true); + } catch (IOException ioe) { + // expected IOE + } + + FileStatus[] listStatus = fs.listStatus(walPath.getParent()); + RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); + Path rootdir = FSUtils.getRootDir(conf); + try { + HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode); + s.splitLogFile(listStatus[0], null); + Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), + "corrupt"); + assertTrue(fs.exists(file)); + // assertFalse("log splitting should have failed", true); + } catch (IOException ioe) { + assertTrue("WAL should have been sidelined", false); + } + } + + @Test() + public void testSecureHLogReaderOnHLog() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + HLogFactory.resetLogReaderClass(); + HLogFactory.resetLogWriterClass(); + conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, + HLog.Reader.class); + conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, + HLog.Writer.class); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path walPath = writeWAL("testSecureHLogReaderOnHLog"); + + // Ensure edits are plaintext + long length = fs.getFileStatus(walPath).getLen(); + FSDataInputStream in = fs.open(walPath); + byte[] fileData = new byte[(int)length]; + IOUtils.readFully(in, fileData); + in.close(); + assertTrue("Cells should be plaintext", Bytes.contains(fileData, value)); + + // Confirm the WAL can be read back by SecureProtobufLogReader + try { + HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath, conf); + } catch (IOException ioe) { + assertFalse(true); + } + + FileStatus[] listStatus = fs.listStatus(walPath.getParent()); + RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); + Path rootdir = FSUtils.getRootDir(conf); + try { + HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode); + s.splitLogFile(listStatus[0], null); + Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), + "corrupt"); + assertTrue(!fs.exists(file)); + } catch (IOException ioe) { + assertTrue("WAL should have been processed", false); + } + } +}