HBASE-19369 Switch to Builder Pattern In WAL

This patch switches to the builder pattern by adding a helper method.
It also checks to ensure that the pattern is available (i.e. that
HBase is running on a hadoop version that supports it).

Amending-Author: Mike Drob <mdrob@apache.org>
Signed-off-by: tedyu <yuzhihong@gmail.com>
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Alex Leblang 2018-07-26 15:47:13 -05:00 committed by Mike Drob
parent 8bfdb19e85
commit 31cbd7ab8f
No known key found for this signature in database
GPG Key ID: 3E48C0C6EF362B9E
5 changed files with 256 additions and 5 deletions

View File

@ -821,6 +821,133 @@ public abstract class CommonFSUtils {
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
} }
private static class DfsBuilderUtility {
static Class<?> dfsClass = null;
static Method createMethod;
static Method overwriteMethod;
static Method bufferSizeMethod;
static Method blockSizeMethod;
static Method recursiveMethod;
static Method replicateMethod;
static Method replicationMethod;
static Method buildMethod;
static boolean allMethodsPresent = false;
static {
String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
Class<?> builderClass = null;
try {
dfsClass = Class.forName(dfsName);
} catch (ClassNotFoundException e) {
LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
}
try {
builderClass = Class.forName(builderName);
} catch (ClassNotFoundException e) {
LOG.debug("{} not available, will not use builder API for file creation.", builderName);
}
if (dfsClass != null && builderClass != null) {
try {
createMethod = dfsClass.getMethod("createFile", Path.class);
overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
blockSizeMethod = builderClass.getMethod("blockSize", long.class);
recursiveMethod = builderClass.getMethod("recursive");
replicateMethod = builderClass.getMethod("replicate");
replicationMethod = builderClass.getMethod("replication", short.class);
buildMethod = builderClass.getMethod("build");
allMethodsPresent = true;
LOG.debug("Using builder API via reflection for DFS file creation.");
} catch (NoSuchMethodException e) {
LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
e.getMessage());
}
}
}
/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
*/
static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
if (allMethodsPresent && dfsClass.isInstance(fs)) {
try {
Object builder;
builder = createMethod.invoke(fs, path);
builder = overwriteMethod.invoke(builder, overwritable);
builder = bufferSizeMethod.invoke(builder, bufferSize);
builder = blockSizeMethod.invoke(builder, blockSize);
if (isRecursive) {
builder = recursiveMethod.invoke(builder);
}
builder = replicateMethod.invoke(builder);
builder = replicationMethod.invoke(builder, replication);
return (FSDataOutputStream) buildMethod.invoke(builder);
} catch (IllegalAccessException | InvocationTargetException e) {
// Should have caught this failure during initialization, so log full trace here
LOG.warn("Couldn't use reflection with builder API", e);
}
}
if (isRecursive) {
return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
}
return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
}
/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
*/
static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
throws IOException {
if (allMethodsPresent && dfsClass.isInstance(fs)) {
try {
Object builder;
builder = createMethod.invoke(fs, path);
builder = overwriteMethod.invoke(builder, overwritable);
builder = replicateMethod.invoke(builder);
return (FSDataOutputStream) buildMethod.invoke(builder);
} catch (IllegalAccessException | InvocationTargetException e) {
// Should have caught this failure during initialization, so log full trace here
LOG.warn("Couldn't use reflection with builder API", e);
}
}
return fs.create(path, overwritable);
}
}
/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* <p>
* Will not attempt to enable replication when passed an HFileSystem.
*/
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
throws IOException {
return DfsBuilderUtility.createHelper(fs, path, overwritable);
}
/**
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* <p>
* Will not attempt to enable replication when passed an HFileSystem.
*/
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
blockSize, isRecursive);
}
// Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
// not until we attempt to reference it. // not until we attempt to reference it.
private static class StreamCapabilities { private static class StreamCapabilities {

View File

@ -1028,7 +1028,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
long startPos = -1; long startPos = -1;
newLogFile = getLogFilePath(logId); newLogFile = getLogFilePath(logId);
try { try {
newStream = fs.create(newLogFile, false); newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
} catch (FileAlreadyExistsException e) { } catch (FileAlreadyExistsException e) {
LOG.error("Log file with id=" + logId + " already exists", e); LOG.error("Log file with id=" + logId + " already exists", e);
return false; return false;

View File

@ -54,6 +54,7 @@ public final class AsyncFSOutputHelper {
final FSDataOutputStream out; final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
// This is not a Distributed File System, so it won't be erasure coded; no builder API needed
if (createParent) { if (createParent) {
out = fs.create(f, overwrite, bufferSize, replication, blockSize, null); out = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
} else { } else {

View File

@ -60,7 +60,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
public void close() throws IOException { public void close() throws IOException {
if (this.output != null) { if (this.output != null) {
try { try {
if (!trailerWritten) writeWALTrailer(); if (!trailerWritten) {
writeWALTrailer();
}
this.output.close(); this.output.close();
} catch (NullPointerException npe) { } catch (NullPointerException npe) {
// Can get a NPE coming up from down in DFSClient$DFSOutputStream#close // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
@ -73,7 +75,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
@Override @Override
public void sync(boolean forceSync) throws IOException { public void sync(boolean forceSync) throws IOException {
FSDataOutputStream fsdos = this.output; FSDataOutputStream fsdos = this.output;
if (fsdos == null) return; // Presume closed if (fsdos == null) {
return; // Presume closed
}
fsdos.flush(); fsdos.flush();
if (forceSync) { if (forceSync) {
fsdos.hsync(); fsdos.hsync();
@ -90,8 +94,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
@Override @Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException { short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
null); blockSize, false);
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
if (!CommonFSUtils.hasCapability(output, "hflush")) { if (!CommonFSUtils.hasCapability(output, "hflush")) {
throw new StreamLacksCapabilityException("hflush"); throw new StreamLacksCapabilityException("hflush");

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestHBaseWalOnEC {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
private static final HBaseTestingUtility util = new HBaseTestingUtility();
private static final String HFLUSH = "hflush";
@BeforeClass
public static void setup() throws Exception {
try {
MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
DistributedFileSystem fs = cluster.getFileSystem();
Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies",
DistributedFileSystem.class);
enableAllECPolicies.invoke(null, fs);
DFSClient client = fs.getClient();
Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy",
String.class, String.class);
setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy
try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
// If this comes back as having hflush then some test setup assumption is wrong.
// Fail the test so that a developer has to look and triage
assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, HFLUSH));
}
} catch (NoSuchMethodException e) {
// We're not testing anything interesting if EC is not available, so skip the rest of the test
Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
}
util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
util.startMiniCluster();
}
@AfterClass
public static void tearDown() throws Exception {
util.shutdownMiniCluster();
}
@Test
public void testStreamCreate() throws IOException {
try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
new Path("/testStreamCreate"), true)) {
assertTrue(CommonFSUtils.hasCapability(out, HFLUSH));
}
}
@Test
public void testFlush() throws IOException {
byte[] row = Bytes.toBytes("row");
byte[] cf = Bytes.toBytes("cf");
byte[] cq = Bytes.toBytes("cq");
byte[] value = Bytes.toBytes("value");
TableName name = TableName.valueOf(getClass().getSimpleName());
Table t = util.createTable(name, cf);
t.put(new Put(row).addColumn(cf, cq, value));
util.getAdmin().flush(name);
assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
}
}