HBASE-12095 SecureWALCellCodec should handle the case where encryption is disabled

This commit is contained in:
Ted Yu 2014-10-01 14:22:18 +00:00
parent 4fac4c1ba6
commit 1587068a2c
2 changed files with 23 additions and 6 deletions

View File

@ -46,6 +46,10 @@ public class SecureWALCellCodec extends WALCellCodec {
private Encryptor encryptor; private Encryptor encryptor;
private Decryptor decryptor; private Decryptor decryptor;
public SecureWALCellCodec(Configuration conf, CompressionContext compression) {
super(conf, compression);
}
public SecureWALCellCodec(Configuration conf, Encryptor encryptor) { public SecureWALCellCodec(Configuration conf, Encryptor encryptor) {
super(conf, null); super(conf, null);
this.encryptor = encryptor; this.encryptor = encryptor;
@ -68,11 +72,16 @@ public class SecureWALCellCodec extends WALCellCodec {
public EncryptedKvDecoder(InputStream in, Decryptor decryptor) { public EncryptedKvDecoder(InputStream in, Decryptor decryptor) {
super(in); super(in);
this.decryptor = decryptor; this.decryptor = decryptor;
if (decryptor != null) {
this.iv = new byte[decryptor.getIvLength()]; this.iv = new byte[decryptor.getIvLength()];
} }
}
@Override @Override
protected Cell parseCell() throws IOException { protected Cell parseCell() throws IOException {
if (this.decryptor == null) {
return super.parseCell();
}
int ivLength = 0; int ivLength = 0;
try { try {
ivLength = StreamUtils.readRawVarint32(in); ivLength = StreamUtils.readRawVarint32(in);
@ -171,6 +180,10 @@ public class SecureWALCellCodec extends WALCellCodec {
@Override @Override
public void write(Cell cell) throws IOException { public void write(Cell cell) throws IOException {
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL"); if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
if (encryptor == null) {
super.write(cell);
return;
}
KeyValue kv = (KeyValue)cell; KeyValue kv = (KeyValue)cell;
byte[] kvBuffer = kv.getBuffer(); byte[] kvBuffer = kv.getBuffer();

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.wal.TestCustomWALCellCodec.CustomWALCellCodec;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
@ -74,11 +73,16 @@ public class TestHLogReaderOnSecureHLog {
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
} }
private Path writeWAL(String tblName) throws IOException { private Path writeWAL(String tblName, boolean encrypt) throws IOException {
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, CustomWALCellCodec.class, conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
WALCellCodec.class); WALCellCodec.class);
if (encrypt) {
conf.set("hbase.regionserver.wal.encryption", "true");
} else {
conf.set("hbase.regionserver.wal.encryption", "false");
}
TableName tableName = TableName.valueOf(tblName); TableName tableName = TableName.valueOf(tblName);
HTableDescriptor htd = new HTableDescriptor(tableName); HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName())); htd.addFamily(new HColumnDescriptor(tableName.getName()));
@ -116,7 +120,7 @@ public class TestHLogReaderOnSecureHLog {
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
HLog.Writer.class); HLog.Writer.class);
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Path walPath = writeWAL("testHLogReaderOnSecureHLog"); Path walPath = writeWAL("testHLogReaderOnSecureHLog", true);
// Insure edits are not plaintext // Insure edits are not plaintext
long length = fs.getFileStatus(walPath).getLen(); long length = fs.getFileStatus(walPath).getLen();
@ -160,7 +164,7 @@ public class TestHLogReaderOnSecureHLog {
conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
HLog.Writer.class); HLog.Writer.class);
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Path walPath = writeWAL("testSecureHLogReaderOnHLog"); Path walPath = writeWAL("testSecureHLogReaderOnHLog", false);
// Ensure edits are plaintext // Ensure edits are plaintext
long length = fs.getFileStatus(walPath).getLen(); long length = fs.getFileStatus(walPath).getLen();