HBASE-27702 Remove 'hbase.regionserver.hlog.writer.impl' config (#5096)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
e6977a9597
commit
997d132601
|
@ -26,10 +26,8 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||||
import org.apache.hadoop.hbase.util.EncryptionTest;
|
import org.apache.hadoop.hbase.util.EncryptionTest;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -52,8 +50,6 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest {
|
||||||
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
|
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
|
||||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||||
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
|
|
||||||
Writer.class);
|
|
||||||
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
||||||
}
|
}
|
||||||
// Check if the cluster configuration can support this test
|
// Check if the cluster configuration can support this test
|
||||||
|
|
|
@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
import org.apache.hadoop.hbase.codec.Codec;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
|
@ -55,13 +54,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
|
||||||
/**
|
/**
|
||||||
* Base class for Protobuf log writer.
|
* Base class for Protobuf log writer.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
@InterfaceAudience.Private
|
||||||
public abstract class AbstractProtobufLogWriter {
|
public abstract class AbstractProtobufLogWriter {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufLogWriter.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufLogWriter.class);
|
||||||
|
|
||||||
protected CompressionContext compressionContext;
|
protected CompressionContext compressionContext;
|
||||||
protected Configuration conf;
|
protected Configuration conf;
|
||||||
|
protected Encryptor encryptor;
|
||||||
protected Codec.Encoder cellEncoder;
|
protected Codec.Encoder cellEncoder;
|
||||||
protected WALCellCodec.ByteStringCompressor compressor;
|
protected WALCellCodec.ByteStringCompressor compressor;
|
||||||
protected boolean trailerWritten;
|
protected boolean trailerWritten;
|
||||||
|
@ -77,64 +77,53 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
return WALCellCodec.create(conf, null, compressionContext);
|
return WALCellCodec.create(conf, null, compressionContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
private WALHeader buildWALHeader0(Configuration conf, WALHeader.Builder builder) {
|
private WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
|
||||||
if (!builder.hasWriterClsName()) {
|
throws IOException {
|
||||||
builder.setWriterClsName(getWriterClassName());
|
builder.setWriterClsName(getWriterClassName());
|
||||||
}
|
builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf).getName());
|
||||||
if (!builder.hasCellCodecClsName()) {
|
|
||||||
builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf).getName());
|
|
||||||
}
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
|
private WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return buildWALHeader0(conf, builder);
|
EncryptionTest.testKeyProvider(conf);
|
||||||
}
|
EncryptionTest.testCipherProvider(conf);
|
||||||
|
|
||||||
// should be called in sub classes's buildWALHeader method to build WALHeader for secure
|
// Get an instance of our cipher
|
||||||
// environment. Do not forget to override the setEncryptor method as it will be called in this
|
final String cipherName =
|
||||||
// method to init your encryptor.
|
conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
|
||||||
protected final WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder)
|
Cipher cipher = Encryption.getCipher(conf, cipherName);
|
||||||
throws IOException {
|
if (cipher == null) {
|
||||||
builder.setWriterClsName(getWriterClassName());
|
throw new RuntimeException("Cipher '" + cipherName + "' is not available");
|
||||||
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
|
|
||||||
EncryptionTest.testKeyProvider(conf);
|
|
||||||
EncryptionTest.testCipherProvider(conf);
|
|
||||||
|
|
||||||
// Get an instance of our cipher
|
|
||||||
final String cipherName =
|
|
||||||
conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
|
|
||||||
Cipher cipher = Encryption.getCipher(conf, cipherName);
|
|
||||||
if (cipher == null) {
|
|
||||||
throw new RuntimeException("Cipher '" + cipherName + "' is not available");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate a random encryption key for this WAL
|
|
||||||
Key key = cipher.getRandomKey();
|
|
||||||
builder.setEncryptionKey(UnsafeByteOperations.unsafeWrap(EncryptionUtil.wrapKey(conf,
|
|
||||||
conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
|
|
||||||
conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName())),
|
|
||||||
key)));
|
|
||||||
|
|
||||||
// Set up the encryptor
|
|
||||||
Encryptor encryptor = cipher.getEncryptor();
|
|
||||||
encryptor.setKey(key);
|
|
||||||
setEncryptor(encryptor);
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Generate a random encryption key for this WAL
|
||||||
|
Key key = cipher.getRandomKey();
|
||||||
|
builder.setEncryptionKey(UnsafeByteOperations.unsafeWrap(EncryptionUtil.wrapKey(conf,
|
||||||
|
conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
|
||||||
|
conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName())),
|
||||||
|
key)));
|
||||||
|
|
||||||
|
// Set up the encryptor
|
||||||
|
Encryptor encryptor = cipher.getEncryptor();
|
||||||
|
encryptor.setKey(key);
|
||||||
|
this.encryptor = encryptor;
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Initialized secure protobuf WAL: cipher={}", cipher.getName());
|
||||||
|
}
|
||||||
|
builder.setWriterClsName(getWriterClassName());
|
||||||
builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
|
builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
|
||||||
return buildWALHeader0(conf, builder);
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
// override this if you need a encryptor
|
private String getWriterClassName() {
|
||||||
protected void setEncryptor(Encryptor encryptor) {
|
// class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
|
||||||
}
|
// IOException: Got unknown writer class: AsyncProtobufLogWriter
|
||||||
|
if (encryptor == null) {
|
||||||
protected String getWriterClassName() {
|
return "ProtobufLogWriter";
|
||||||
return getClass().getSimpleName();
|
} else {
|
||||||
|
return "SecureProtobufLogWriter";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
|
private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
|
||||||
|
@ -187,9 +176,13 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
headerBuilder.setValueCompressionAlgorithm(
|
headerBuilder.setValueCompressionAlgorithm(
|
||||||
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
|
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
|
||||||
}
|
}
|
||||||
length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));
|
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
|
||||||
|
length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildSecureWALHeader(conf, headerBuilder)));
|
||||||
initAfterHeader(doCompress);
|
secureInitAfterHeader(doCompress, encryptor);
|
||||||
|
} else {
|
||||||
|
length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));
|
||||||
|
initAfterHeader(doCompress);
|
||||||
|
}
|
||||||
|
|
||||||
// instantiate trailer to default value.
|
// instantiate trailer to default value.
|
||||||
trailer = WALTrailer.newBuilder().build();
|
trailer = WALTrailer.newBuilder().build();
|
||||||
|
@ -205,7 +198,7 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initAfterHeader0(boolean doCompress) throws IOException {
|
private void initAfterHeader(boolean doCompress) throws IOException {
|
||||||
WALCellCodec codec = getCodec(conf, this.compressionContext);
|
WALCellCodec codec = getCodec(conf, this.compressionContext);
|
||||||
this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
|
this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
|
||||||
if (doCompress) {
|
if (doCompress) {
|
||||||
|
@ -215,21 +208,15 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void initAfterHeader(boolean doCompress) throws IOException {
|
private void secureInitAfterHeader(boolean doCompress, Encryptor encryptor) throws IOException {
|
||||||
initAfterHeader0(doCompress);
|
if (encryptor != null) {
|
||||||
}
|
|
||||||
|
|
||||||
// should be called in sub classes's initAfterHeader method to init SecureWALCellCodec.
|
|
||||||
protected final void secureInitAfterHeader(boolean doCompress, Encryptor encryptor)
|
|
||||||
throws IOException {
|
|
||||||
if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) {
|
|
||||||
WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
|
WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
|
||||||
this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
|
this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
|
||||||
// We do not support compression
|
// We do not support compression
|
||||||
this.compressionContext = null;
|
this.compressionContext = null;
|
||||||
this.compressor = WALCellCodec.getNoneCompressor();
|
this.compressor = WALCellCodec.getNoneCompressor();
|
||||||
} else {
|
} else {
|
||||||
initAfterHeader0(doCompress);
|
initAfterHeader(doCompress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,7 +232,7 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void writeWALTrailer() {
|
protected final void writeWALTrailer() {
|
||||||
try {
|
try {
|
||||||
int trailerSize = 0;
|
int trailerSize = 0;
|
||||||
if (this.trailer == null) {
|
if (this.trailer == null) {
|
||||||
|
|
|
@ -82,10 +82,9 @@ public abstract class AbstractProtobufWALReader
|
||||||
static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
|
static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
|
||||||
static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
|
static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
|
||||||
|
|
||||||
private static final List<String> WRITER_CLS_NAMES =
|
private static final List<String> WRITER_CLS_NAMES = ImmutableList.of(
|
||||||
ImmutableList.of(ProtobufLogWriter.class.getSimpleName(),
|
ProtobufLogWriter.class.getSimpleName(), AsyncProtobufLogWriter.class.getSimpleName(),
|
||||||
AsyncProtobufLogWriter.class.getSimpleName(), SecureProtobufLogWriter.class.getSimpleName(),
|
"SecureProtobufLogWriter", "SecureAsyncProtobufLogWriter");
|
||||||
SecureAsyncProtobufLogWriter.class.getSimpleName());
|
|
||||||
|
|
||||||
protected Configuration conf;
|
protected Configuration conf;
|
||||||
|
|
||||||
|
|
|
@ -127,15 +127,6 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
|
AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
|
|
||||||
* IOException: Got unknown writer class: AsyncProtobufLogWriter
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected String getWriterClassName() {
|
|
||||||
return "ProtobufLogWriter";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void append(Entry entry) {
|
public void append(Entry entry) {
|
||||||
int buffered = output.buffered();
|
int buffered = output.buffered();
|
||||||
|
|
|
@ -1,65 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.Encryptor;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
|
|
||||||
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
|
||||||
public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
|
|
||||||
|
|
||||||
private Encryptor encryptor = null;
|
|
||||||
|
|
||||||
public SecureAsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
|
|
||||||
Class<? extends Channel> channelClass) {
|
|
||||||
super(eventLoopGroup, channelClass);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* @return class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
|
|
||||||
* IOException: Got unknown writer class: SecureAsyncProtobufLogWriter
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected String getWriterClassName() {
|
|
||||||
return "SecureProtobufLogWriter";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
|
|
||||||
throws IOException {
|
|
||||||
return super.buildSecureWALHeader(conf, builder);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setEncryptor(Encryptor encryptor) {
|
|
||||||
this.encryptor = encryptor;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void initAfterHeader(boolean doCompress) throws IOException {
|
|
||||||
super.secureInitAfterHeader(doCompress, encryptor);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,48 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.Encryptor;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
|
|
||||||
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
|
||||||
public class SecureProtobufLogWriter extends ProtobufLogWriter {
|
|
||||||
|
|
||||||
private Encryptor encryptor = null;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
|
|
||||||
throws IOException {
|
|
||||||
return super.buildSecureWALHeader(conf, builder);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setEncryptor(Encryptor encryptor) {
|
|
||||||
this.encryptor = encryptor;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void initAfterHeader(boolean doCompress) throws IOException {
|
|
||||||
super.secureInitAfterHeader(doCompress, encryptor);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -49,7 +49,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class);
|
||||||
|
|
||||||
public static final String WRITER_IMPL = "hbase.regionserver.hlog.async.writer.impl";
|
public static final String WRITER_IMPL = "hbase.regionserver.wal.async.writer.impl";
|
||||||
|
|
||||||
// Only public so classes back in regionserver.wal can access
|
// Only public so classes back in regionserver.wal can access
|
||||||
public interface AsyncWriter extends WALProvider.AsyncWriter {
|
public interface AsyncWriter extends WALProvider.AsyncWriter {
|
||||||
|
|
|
@ -41,6 +41,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(FSHLogProvider.class);
|
private static final Logger LOG = LoggerFactory.getLogger(FSHLogProvider.class);
|
||||||
|
|
||||||
|
public static final String WRITER_IMPL = "hbase.regionserver.wal.writer.impl";
|
||||||
|
|
||||||
// Only public so classes back in regionserver.wal can access
|
// Only public so classes back in regionserver.wal can access
|
||||||
public interface Writer extends WALProvider.Writer {
|
public interface Writer extends WALProvider.Writer {
|
||||||
/**
|
/**
|
||||||
|
@ -71,7 +73,7 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
||||||
final boolean overwritable, long blocksize) throws IOException {
|
final boolean overwritable, long blocksize) throws IOException {
|
||||||
// Configuration already does caching for the Class lookup.
|
// Configuration already does caching for the Class lookup.
|
||||||
Class<? extends Writer> logWriterClass =
|
Class<? extends Writer> logWriterClass =
|
||||||
conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, Writer.class);
|
conf.getClass(WRITER_IMPL, ProtobufLogWriter.class, Writer.class);
|
||||||
Writer writer = null;
|
Writer writer = null;
|
||||||
try {
|
try {
|
||||||
writer = logWriterClass.getDeclaredConstructor().newInstance();
|
writer = logWriterClass.getDeclaredConstructor().newInstance();
|
||||||
|
|
|
@ -27,11 +27,6 @@ public class InstrumentedLogWriter extends ProtobufLogWriter {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getWriterClassName() {
|
|
||||||
return ProtobufLogWriter.class.getSimpleName();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean activateFailure = false;
|
public static boolean activateFailure = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
|
@ -94,7 +95,7 @@ public class TestLogRollingNoCluster {
|
||||||
FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration());
|
FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration());
|
||||||
FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration());
|
FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration());
|
||||||
TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME);
|
TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME);
|
||||||
conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
|
conf.set(FSHLogProvider.WRITER_IMPL, HighLatencySyncWriter.class.getName());
|
||||||
final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName());
|
final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName());
|
||||||
final WAL wal = wals.getWAL(null);
|
final WAL wal = wals.getWAL(null);
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -40,8 +39,6 @@ public class TestSecureAsyncWALReplay extends TestAsyncWALReplay {
|
||||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||||
conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
|
|
||||||
AsyncWriter.class);
|
|
||||||
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
||||||
TestAsyncWALReplay.setUpBeforeClass();
|
TestAsyncWALReplay.setUpBeforeClass();
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -40,8 +39,6 @@ public class TestSecureWALReplay extends TestWALReplay {
|
||||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||||
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
|
|
||||||
Writer.class);
|
|
||||||
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
||||||
AbstractTestWALReplay.setUpBeforeClass();
|
AbstractTestWALReplay.setUpBeforeClass();
|
||||||
}
|
}
|
||||||
|
|
|
@ -250,11 +250,6 @@ public class IOTestProvider implements WALProvider {
|
||||||
super.init(fs, path, conf, overwritable, blocksize, monitor);
|
super.init(fs, path, conf, overwritable, blocksize, monitor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getWriterClassName() {
|
|
||||||
return ProtobufLogWriter.class.getSimpleName();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void append(Entry entry) throws IOException {
|
public void append(Entry entry) throws IOException {
|
||||||
if (doAppends) {
|
if (doAppends) {
|
||||||
|
|
|
@ -40,8 +40,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -86,10 +84,6 @@ public class TestSecureWAL {
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||||
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
|
|
||||||
WALProvider.Writer.class);
|
|
||||||
conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
|
|
||||||
WALProvider.AsyncWriter.class);
|
|
||||||
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
||||||
CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
|
CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
|
||||||
TEST_UTIL.startMiniDFSCluster(3);
|
TEST_UTIL.startMiniDFSCluster(3);
|
||||||
|
|
|
@ -155,7 +155,8 @@ public class TestWALSplit {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
conf = TEST_UTIL.getConfiguration();
|
conf = TEST_UTIL.getConfiguration();
|
||||||
conf.setClass("hbase.regionserver.hlog.writer.impl", InstrumentedLogWriter.class, Writer.class);
|
conf.set(WALFactory.WAL_PROVIDER, "filesystem");
|
||||||
|
conf.setClass(FSHLogProvider.WRITER_IMPL, InstrumentedLogWriter.class, Writer.class);
|
||||||
// This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
|
// This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
|
||||||
System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
|
System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
|
||||||
// Create fake maping user to group and set it to the conf.
|
// Create fake maping user to group and set it to the conf.
|
||||||
|
|
|
@ -57,14 +57,12 @@ import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.LogRoller;
|
import org.apache.hadoop.hbase.regionserver.LogRoller;
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -257,8 +255,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||||
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
|
|
||||||
Writer.class);
|
|
||||||
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
|
||||||
conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
|
conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue