HBASE-15538 Implement secure async protobuf wal writer

This commit is contained in:
zhangduo 2016-03-29 23:02:41 +08:00
parent 9d56105eec
commit d6fd859451
10 changed files with 266 additions and 76 deletions

View File

@ -22,8 +22,12 @@ import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRA
import java.io.IOException;
import java.io.OutputStream;
import java.security.Key;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -33,9 +37,16 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.FSUtils;
/**
@ -63,10 +74,9 @@ public abstract class AbstractProtobufLogWriter {
return WALCellCodec.create(conf, null, compressionContext);
}
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
throws IOException {
private WALHeader buildWALHeader0(Configuration conf, WALHeader.Builder builder) {
if (!builder.hasWriterClsName()) {
builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
builder.setWriterClsName(getWriterClassName());
}
if (!builder.hasCellCodecClsName()) {
builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf));
@ -74,6 +84,60 @@ public abstract class AbstractProtobufLogWriter {
return builder.build();
}
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
throws IOException {
return buildWALHeader0(conf, builder);
}
// should be called in sub classes's buildWALHeader method to build WALHeader for secure
// environment. Do not forget to override the setEncryptor method as it will be called in this
// method to init your encryptor.
protected final WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder)
throws IOException {
builder.setWriterClsName(getWriterClassName());
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
EncryptionTest.testKeyProvider(conf);
EncryptionTest.testCipherProvider(conf);
// Get an instance of our cipher
final String cipherName =
conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
Cipher cipher = Encryption.getCipher(conf, cipherName);
if (cipher == null) {
throw new RuntimeException("Cipher '" + cipherName + "' is not available");
}
// Generate an encryption key for this WAL
SecureRandom rng = new SecureRandom();
byte[] keyBytes = new byte[cipher.getKeyLength()];
rng.nextBytes(keyBytes);
Key key = new SecretKeySpec(keyBytes, cipher.getName());
builder.setEncryptionKey(ByteStringer.wrap(EncryptionUtil.wrapKey(conf,
conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
User.getCurrent().getShortName())),
key)));
// Set up the encryptor
Encryptor encryptor = cipher.getEncryptor();
encryptor.setKey(key);
setEncryptor(encryptor);
if (LOG.isTraceEnabled()) {
LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
}
}
builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
return buildWALHeader0(conf, builder);
}
// override this if you need a encryptor
protected void setEncryptor(Encryptor encryptor) {
}
protected String getWriterClassName() {
return getClass().getSimpleName();
}
private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
if (doCompress) {
@ -115,7 +179,7 @@ public abstract class AbstractProtobufLogWriter {
}
}
protected void initAfterHeader(boolean doCompress) throws IOException {
private void initAfterHeader0(boolean doCompress) throws IOException {
WALCellCodec codec = getCodec(conf, this.compressionContext);
this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
if (doCompress) {
@ -123,6 +187,23 @@ public abstract class AbstractProtobufLogWriter {
}
}
protected void initAfterHeader(boolean doCompress) throws IOException {
initAfterHeader0(doCompress);
}
// should be called in sub classes's initAfterHeader method to init SecureWALCellCodec.
protected final void secureInitAfterHeader(boolean doCompress, Encryptor encryptor)
throws IOException {
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) {
WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
// We do not support compression
this.compressionContext = null;
} else {
initAfterHeader0(doCompress);
}
}
void setWALTrailer(WALTrailer walTrailer) {
this.trailer = walTrailer;
}

View File

@ -91,6 +91,7 @@ public class ProtobufLogReader extends ReaderBase {
private static List<String> writerClsNames = new ArrayList<String>();
static {
writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
}
// cell codec classname

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import io.netty.channel.EventLoop;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
private Encryptor encryptor = null;
public SecureAsyncProtobufLogWriter(EventLoop eventLoop) {
super(eventLoop);
}
@Override
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
throws IOException {
return super.buildSecureWALHeader(conf, builder);
}
@Override
protected void setEncryptor(Encryptor encryptor) {
this.encryptor = encryptor;
}
@Override
protected void initAfterHeader(boolean doCompress) throws IOException {
super.secureInitAfterHeader(doCompress, encryptor);
}
}

View File

@ -48,6 +48,8 @@ public class SecureProtobufLogReader extends ProtobufLogReader {
static {
writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName());
writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
writerClsNames.add(SecureAsyncProtobufLogWriter.class.getSimpleName());
}
@Override

View File

@ -19,81 +19,31 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.security.Key;
import java.security.SecureRandom;
import javax.crypto.spec.SecretKeySpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SecureProtobufLogWriter extends ProtobufLogWriter {
private static final Log LOG = LogFactory.getLog(SecureProtobufLogWriter.class);
private Encryptor encryptor = null;
@Override
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
throws IOException {
builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName());
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
EncryptionTest.testKeyProvider(conf);
EncryptionTest.testCipherProvider(conf);
return super.buildSecureWALHeader(conf, builder);
}
// Get an instance of our cipher
final String cipherName =
conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
Cipher cipher = Encryption.getCipher(conf, cipherName);
if (cipher == null) {
throw new RuntimeException("Cipher '" + cipherName + "' is not available");
}
// Generate an encryption key for this WAL
SecureRandom rng = new SecureRandom();
byte[] keyBytes = new byte[cipher.getKeyLength()];
rng.nextBytes(keyBytes);
Key key = new SecretKeySpec(keyBytes, cipher.getName());
builder.setEncryptionKey(ByteStringer.wrap(EncryptionUtil.wrapKey(conf,
conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
User.getCurrent().getShortName())),
key)));
// Set up the encryptor
encryptor = cipher.getEncryptor();
encryptor.setKey(key);
if (LOG.isTraceEnabled()) {
LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
}
}
builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
return super.buildWALHeader(conf, builder);
@Override
protected void setEncryptor(Encryptor encryptor) {
this.encryptor = encryptor;
}
@Override
protected void initAfterHeader(boolean doCompress) throws IOException {
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) {
WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
this.cellEncoder = codec.getEncoder(this.output);
// We do not support compression
this.compressionContext = null;
} else {
super.initAfterHeader(doCompress);
}
super.secureInitAfterHeader(doCompress, encryptor);
}
}

View File

@ -17,8 +17,14 @@
*/
package org.apache.hadoop.hbase.wal;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -30,10 +36,6 @@ import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
/**
* A WAL provider that use {@link AsyncFSWAL}.
*/
@ -41,6 +43,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
@InterfaceStability.Evolving
public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
private static final Log LOG = LogFactory.getLog(AsyncFSWALProvider.class);
// Only public so classes back in regionserver.wal can access
public interface AsyncWriter extends WALProvider.AsyncWriter {
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
@ -66,8 +70,17 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
*/
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
boolean overwritable, EventLoop eventLoop) throws IOException {
AsyncWriter writer = new AsyncProtobufLogWriter(eventLoop);
writer.init(fs, path, conf, overwritable);
return writer;
// Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass =
conf.getClass("hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class,
AsyncWriter.class);
try {
AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class).newInstance(eventLoop);
writer.init(fs, path, conf, overwritable);
return writer;
} catch (Exception e) {
LOG.debug("Error instantiating log writer.", e);
throw new IOException("cannot get log writer", e);
}
}
}

View File

@ -29,6 +29,11 @@ public class InstrumentedLogWriter extends ProtobufLogWriter {
super();
}
@Override
protected String getWriterClassName() {
return ProtobufLogWriter.class.getSimpleName();
}
public static boolean activateFailure = false;
@Override
public void append(Entry entry) throws IOException {

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestSecureAsyncWALReplay extends TestAsyncWALReplay {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
Reader.class);
conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
AsyncWriter.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
TestAsyncWALReplay.setUpBeforeClass();
}
}

View File

@ -216,6 +216,11 @@ public class IOTestProvider implements WALProvider {
super.init(fs, path, conf, overwritable);
}
@Override
protected String getWriterClassName() {
return ProtobufLogWriter.class.getSimpleName();
}
@Override
public void append(Entry entry) throws IOException {
if (doAppends) {

View File

@ -21,12 +21,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -50,19 +51,39 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@Category({RegionServerTests.class, MediumTests.class})
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, MediumTests.class })
public class TestSecureWAL {
private static final Log LOG = LogFactory.getLog(TestSecureWAL.class);
static {
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
.getLogger().setLevel(Level.ALL);
};
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Rule
public TestName name = new TestName();
@Parameter
public String walProvider;
@Parameters(name = "{index}: provider={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
@ -72,13 +93,26 @@ public class TestSecureWAL {
WAL.Reader.class);
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
WALProvider.Writer.class);
conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
WALProvider.AsyncWriter.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
TEST_UTIL.startMiniDFSCluster(3);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() {
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
}
@Test
public void testSecureWAL() throws Exception {
TableName tableName = TableName.valueOf("TestSecureWAL");
TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
@ -92,8 +126,9 @@ public class TestSecureWAL {
final byte[] row = Bytes.toBytes("row");
final byte[] family = Bytes.toBytes("family");
final byte[] value = Bytes.toBytes("Test value");
FileSystem fs = TEST_UTIL.getTestFileSystem();
final WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "TestSecureWAL");
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
final WALFactory wals =
new WALFactory(TEST_UTIL.getConfiguration(), null, tableName.getNameAsString());
// Write the WAL
final WAL wal =
@ -137,5 +172,4 @@ public class TestSecureWAL {
assertEquals("Should have read back as many KVs as written", total, count);
reader.close();
}
}