HBASE-24055 Make AsyncFSWAL can run on EC cluster (#1437)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2020-04-07 23:41:35 +08:00 committed by GitHub
parent 62718cdb28
commit f60f0bdbf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 33 deletions

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.hbase.io.asyncfs; package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@ -177,6 +175,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final FileCreator FILE_CREATOR; private static final FileCreator FILE_CREATOR;
// CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
// EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to
// indirectly reference it through reflection.
private static final CreateFlag SHOULD_REPLICATE_FLAG;
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
isClientRunningMethod.setAccessible(true); isClientRunningMethod.setAccessible(true);
@ -272,6 +275,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return createFileCreator2(); return createFileCreator2();
} }
private static CreateFlag loadShouldReplicateFlag() {
try {
return CreateFlag.valueOf("SHOULD_REPLICATE");
} catch (IllegalArgumentException e) {
LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
return null;
}
}
// cancel the processing if DFSClient is already closed. // cancel the processing if DFSClient is already closed.
static final class CancelOnClose implements CancelableProgressable { static final class CancelOnClose implements CancelableProgressable {
@ -292,6 +304,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
LEASE_MANAGER = createLeaseManager(); LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
FILE_CREATOR = createFileCreator(); FILE_CREATOR = createFileCreator();
SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
} catch (Exception e) { } catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please " + String msg = "Couldn't properly initialize access to HDFS internals. Please " +
"update your WAL Provider to not make use of the 'asyncfs' provider. See " + "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
@ -486,6 +499,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
} }
} }
private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
List<CreateFlag> flags = new ArrayList<>();
flags.add(CreateFlag.CREATE);
if (overwrite) {
flags.add(CreateFlag.OVERWRITE);
}
if (SHOULD_REPLICATE_FLAG != null) {
flags.add(SHOULD_REPLICATE_FLAG);
}
return new EnumSetWritable<>(EnumSet.copyOf(flags));
}
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
@ -502,8 +527,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
try { try {
stat = FILE_CREATOR.create(namenode, src, stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), getCreateFlags(overwrite), createParent, replication, blockSize,
createParent, replication, blockSize, CryptoProtocolVersion.supported()); CryptoProtocolVersion.supported());
} catch (Exception e) { } catch (Exception e) {
if (e instanceof RemoteException) { if (e instanceof RemoteException) {
throw (RemoteException) e; throw (RemoteException) e;

View File

@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;

View File

@ -20,10 +20,11 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
@ -34,40 +35,49 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass; import org.junit.After;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@Category(LargeTests.class) @RunWith(Parameterized.class)
@Category({ RegionServerTests.class, LargeTests.class })
public class TestHBaseWalOnEC { public class TestHBaseWalOnEC {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBaseWalOnEC.class); HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
private static final HBaseTestingUtility util = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@BeforeClass @BeforeClass
public static void setup() throws Exception { public static void setUpBeforeClass() throws Exception {
try { try {
MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies", Method enableAllECPolicies =
DistributedFileSystem.class); DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class);
enableAllECPolicies.invoke(null, fs); enableAllECPolicies.invoke(null, fs);
DFSClient client = fs.getClient(); DFSClient client = fs.getClient();
Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy", Method setErasureCodingPolicy =
String.class, String.class); DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class);
setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy
try (FSDataOutputStream out = fs.create(new Path("/canary"))) { try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
@ -80,25 +90,31 @@ public class TestHBaseWalOnEC {
Assume.assumeNoException("Using an older version of hadoop; EC not available.", e); Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
} }
util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true); UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
util.startMiniCluster();
} }
@AfterClass @Parameter
public static void tearDown() throws Exception { public String walProvider;
util.shutdownMiniCluster();
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
}
@Before
public void setUp() throws Exception {
UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
UTIL.startMiniCluster(3);
}
@After
public void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
} }
@Test @Test
public void testStreamCreate() throws IOException { public void testReadWrite() throws IOException {
try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
new Path("/testStreamCreate"), true)) {
assertTrue(out.hasCapability(StreamCapabilities.HFLUSH));
}
}
@Test
public void testFlush() throws IOException {
byte[] row = Bytes.toBytes("row"); byte[] row = Bytes.toBytes("row");
byte[] cf = Bytes.toBytes("cf"); byte[] cf = Bytes.toBytes("cf");
byte[] cq = Bytes.toBytes("cq"); byte[] cq = Bytes.toBytes("cq");
@ -106,12 +122,11 @@ public class TestHBaseWalOnEC {
TableName name = TableName.valueOf(getClass().getSimpleName()); TableName name = TableName.valueOf(getClass().getSimpleName());
Table t = util.createTable(name, cf); Table t = UTIL.createTable(name, cf);
t.put(new Put(row).addColumn(cf, cq, value)); t.put(new Put(row).addColumn(cf, cq, value));
util.getAdmin().flush(name); UTIL.getAdmin().flush(name);
assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq)); assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
} }
} }