diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 2e480e6cb7d..8924098bec2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -821,6 +821,133 @@ public abstract class CommonFSUtils { conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); } + private static class DfsBuilderUtility { + static Class> dfsClass = null; + static Method createMethod; + static Method overwriteMethod; + static Method bufferSizeMethod; + static Method blockSizeMethod; + static Method recursiveMethod; + static Method replicateMethod; + static Method replicationMethod; + static Method buildMethod; + static boolean allMethodsPresent = false; + + static { + String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem"; + String builderName = dfsName + "$HdfsDataOutputStreamBuilder"; + Class> builderClass = null; + + try { + dfsClass = Class.forName(dfsName); + } catch (ClassNotFoundException e) { + LOG.debug("{} not available, will not use builder API for file creation.", dfsName); + } + try { + builderClass = Class.forName(builderName); + } catch (ClassNotFoundException e) { + LOG.debug("{} not available, will not use builder API for file creation.", builderName); + } + + if (dfsClass != null && builderClass != null) { + try { + createMethod = dfsClass.getMethod("createFile", Path.class); + overwriteMethod = builderClass.getMethod("overwrite", boolean.class); + bufferSizeMethod = builderClass.getMethod("bufferSize", int.class); + blockSizeMethod = builderClass.getMethod("blockSize", long.class); + recursiveMethod = builderClass.getMethod("recursive"); + replicateMethod = builderClass.getMethod("replicate"); + replicationMethod = builderClass.getMethod("replication", short.class); + buildMethod = builderClass.getMethod("build"); + + allMethodsPresent = true; + LOG.debug("Using builder API via reflection for DFS file creation."); + } catch (NoSuchMethodException e) { + LOG.debug("Could not find method on builder; will use old DFS API for file creation {}", + e.getMessage()); + } + } + } + + /** + * Attempt to use builder API via reflection to create a file with the given parameters and + * replication enabled. + */ + static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable, + int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { + if (allMethodsPresent && dfsClass.isInstance(fs)) { + try { + Object builder; + + builder = createMethod.invoke(fs, path); + builder = overwriteMethod.invoke(builder, overwritable); + builder = bufferSizeMethod.invoke(builder, bufferSize); + builder = blockSizeMethod.invoke(builder, blockSize); + if (isRecursive) { + builder = recursiveMethod.invoke(builder); + } + builder = replicateMethod.invoke(builder); + builder = replicationMethod.invoke(builder, replication); + return (FSDataOutputStream) buildMethod.invoke(builder); + } catch (IllegalAccessException | InvocationTargetException e) { + // Should have caught this failure during initialization, so log full trace here + LOG.warn("Couldn't use reflection with builder API", e); + } + } + + if (isRecursive) { + return fs.create(path, overwritable, bufferSize, replication, blockSize, null); + } + return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); + } + + /** + * Attempt to use builder API via reflection to create a file with the given parameters and + * replication enabled. + */ + static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable) + throws IOException { + if (allMethodsPresent && dfsClass.isInstance(fs)) { + try { + Object builder; + + builder = createMethod.invoke(fs, path); + builder = overwriteMethod.invoke(builder, overwritable); + builder = replicateMethod.invoke(builder); + return (FSDataOutputStream) buildMethod.invoke(builder); + } catch (IllegalAccessException | InvocationTargetException e) { + // Should have caught this failure during initialization, so log full trace here + LOG.warn("Couldn't use reflection with builder API", e); + } + } + + return fs.create(path, overwritable); + } + } + + /** + * Attempt to use builder API via reflection to create a file with the given parameters and + * replication enabled. + *
+ * Will not attempt to enable replication when passed an HFileSystem. + */ + public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable) + throws IOException { + return DfsBuilderUtility.createHelper(fs, path, overwritable); + } + + /** + * Attempt to use builder API via reflection to create a file with the given parameters and + * replication enabled. + *
+ * Will not attempt to enable replication when passed an HFileSystem. + */ + public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable, + int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { + return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication, + blockSize, isRecursive); + } + // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and // not until we attempt to reference it. private static class StreamCapabilities { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 99001f7ea92..cc3eea342d5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -1028,7 +1028,7 @@ public class WALProcedureStore extends ProcedureStoreBase { long startPos = -1; newLogFile = getLogFilePath(logId); try { - newStream = fs.create(newLogFile, false); + newStream = CommonFSUtils.createForWal(fs, newLogFile, false); } catch (FileAlreadyExistsException e) { LOG.error("Log file with id=" + logId + " already exists", e); return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index f9d04b91d65..d1645f8462c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -54,6 +54,7 @@ public final class AsyncFSOutputHelper { final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + // This is not a Distributed File System, so it won't be erasure coded; no builder API needed if (createParent) { out = fs.create(f, overwrite, bufferSize, replication, blockSize, null); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index b4e2cbf196d..5c8e0d21f87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -60,7 +60,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter public void close() throws IOException { if (this.output != null) { try { - if (!trailerWritten) writeWALTrailer(); + if (!trailerWritten) { + writeWALTrailer(); + } this.output.close(); } catch (NullPointerException npe) { // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close @@ -73,7 +75,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override public void sync(boolean forceSync) throws IOException { FSDataOutputStream fsdos = this.output; - if (fsdos == null) return; // Presume closed + if (fsdos == null) { + return; // Presume closed + } fsdos.flush(); if (forceSync) { fsdos.hsync(); @@ -90,8 +94,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { - this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, - null); + this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, + blockSize, false); if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { if (!CommonFSUtils.hasCapability(output, "hflush")) { throw new StreamLacksCapabilityException("hflush"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java new file mode 100644 index 00000000000..a7f1624974c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Method; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestHBaseWalOnEC { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHBaseWalOnEC.class); + + private static final HBaseTestingUtility util = new HBaseTestingUtility(); + + private static final String HFLUSH = "hflush"; + + @BeforeClass + public static void setup() throws Exception { + try { + MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy + DistributedFileSystem fs = cluster.getFileSystem(); + + Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies", + DistributedFileSystem.class); + enableAllECPolicies.invoke(null, fs); + + DFSClient client = fs.getClient(); + Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy", + String.class, String.class); + setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy + + try (FSDataOutputStream out = fs.create(new Path("/canary"))) { + // If this comes back as having hflush then some test setup assumption is wrong. + // Fail the test so that a developer has to look and triage + assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, HFLUSH)); + } + } catch (NoSuchMethodException e) { + // We're not testing anything interesting if EC is not available, so skip the rest of the test + Assume.assumeNoException("Using an older version of hadoop; EC not available.", e); + } + + util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDown() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testStreamCreate() throws IOException { + try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(), + new Path("/testStreamCreate"), true)) { + assertTrue(CommonFSUtils.hasCapability(out, HFLUSH)); + } + } + + @Test + public void testFlush() throws IOException { + byte[] row = Bytes.toBytes("row"); + byte[] cf = Bytes.toBytes("cf"); + byte[] cq = Bytes.toBytes("cq"); + byte[] value = Bytes.toBytes("value"); + + TableName name = TableName.valueOf(getClass().getSimpleName()); + + Table t = util.createTable(name, cf); + t.put(new Put(row).addColumn(cf, cq, value)); + + util.getAdmin().flush(name); + + assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq)); + } +} +