diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 9520e16c85c..48b762d7555 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.io.asyncfs; -import static org.apache.hadoop.fs.CreateFlag.CREATE; -import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; @@ -177,6 +175,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final FileCreator FILE_CREATOR; + // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but + // EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to + // indirectly reference it through reflection. + private static final CreateFlag SHOULD_REPLICATE_FLAG; + private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); isClientRunningMethod.setAccessible(true); @@ -272,6 +275,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { return createFileCreator2(); } + private static CreateFlag loadShouldReplicateFlag() { + try { + return CreateFlag.valueOf("SHOULD_REPLICATE"); + } catch (IllegalArgumentException e) { + LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e); + return null; + } + } + // cancel the processing if DFSClient is already closed. static final class CancelOnClose implements CancelableProgressable { @@ -292,6 +304,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { LEASE_MANAGER = createLeaseManager(); DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); FILE_CREATOR = createFileCreator(); + SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag(); } catch (Exception e) { String msg = "Couldn't properly initialize access to HDFS internals. Please " + "update your WAL Provider to not make use of the 'asyncfs' provider. See " + @@ -486,6 +499,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } } + private static EnumSetWritable getCreateFlags(boolean overwrite) { + List flags = new ArrayList<>(); + flags.add(CreateFlag.CREATE); + if (overwrite) { + flags.add(CreateFlag.OVERWRITE); + } + if (SHOULD_REPLICATE_FLAG != null) { + flags.add(SHOULD_REPLICATE_FLAG); + } + return new EnumSetWritable<>(EnumSet.copyOf(flags)); + } + private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { @@ -502,8 +527,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { try { stat = FILE_CREATOR.create(namenode, src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), - createParent, replication, blockSize, CryptoProtocolVersion.supported()); + getCreateFlags(overwrite), createParent, replication, blockSize, + CryptoProtocolVersion.supported()); } catch (Exception e) { if (e instanceof RemoteException) { throw (RemoteException) e; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 6be44e9158c..d23e8161342 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.io.asyncfs; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import java.io.FileNotFoundException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java index 96c57294234..5a96b4bbb97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java @@ -20,10 +20,11 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; @@ -34,40 +35,49 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assume; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; -@Category(LargeTests.class) +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, LargeTests.class }) public class TestHBaseWalOnEC { + @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHBaseWalOnEC.class); + HBaseClassTestRule.forClass(TestHBaseWalOnEC.class); - private static final HBaseTestingUtility util = new HBaseTestingUtility(); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); @BeforeClass - public static void setup() throws Exception { + public static void setUpBeforeClass() throws Exception { try { - MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy + MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy DistributedFileSystem fs = cluster.getFileSystem(); - Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies", - DistributedFileSystem.class); + Method enableAllECPolicies = + DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class); enableAllECPolicies.invoke(null, fs); DFSClient client = fs.getClient(); - Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy", - String.class, String.class); + Method setErasureCodingPolicy = + DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class); setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy try (FSDataOutputStream out = fs.create(new Path("/canary"))) { @@ -80,25 +90,31 @@ public class TestHBaseWalOnEC { Assume.assumeNoException("Using an older version of hadoop; EC not available.", e); } - util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true); - util.startMiniCluster(); + UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true); + } - @AfterClass - public static void tearDown() throws Exception { - util.shutdownMiniCluster(); + @Parameter + public String walProvider; + + @Parameters + public static List params() { + return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" }); + } + + @Before + public void setUp() throws Exception { + UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider); + UTIL.startMiniCluster(3); + } + + @After + public void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); } @Test - public void testStreamCreate() throws IOException { - try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(), - new Path("/testStreamCreate"), true)) { - assertTrue(out.hasCapability(StreamCapabilities.HFLUSH)); - } - } - - @Test - public void testFlush() throws IOException { + public void testReadWrite() throws IOException { byte[] row = Bytes.toBytes("row"); byte[] cf = Bytes.toBytes("cf"); byte[] cq = Bytes.toBytes("cq"); @@ -106,12 +122,11 @@ public class TestHBaseWalOnEC { TableName name = TableName.valueOf(getClass().getSimpleName()); - Table t = util.createTable(name, cf); + Table t = UTIL.createTable(name, cf); t.put(new Put(row).addColumn(cf, cq, value)); - util.getAdmin().flush(name); + UTIL.getAdmin().flush(name); assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq)); } } -