HBASE-26051 Remove reflections used to access HDFS EC APIs (#3446)
Signed-off-by: Michael Stack <stack@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
ef639ff083
commit
fab0505257
|
@ -173,11 +173,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
|
|
||||||
private static final FileCreator FILE_CREATOR;
|
private static final FileCreator FILE_CREATOR;
|
||||||
|
|
||||||
// CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
|
|
||||||
// EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to
|
|
||||||
// indirectly reference it through reflection.
|
|
||||||
private static final CreateFlag SHOULD_REPLICATE_FLAG;
|
|
||||||
|
|
||||||
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
|
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
|
||||||
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
|
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
|
||||||
isClientRunningMethod.setAccessible(true);
|
isClientRunningMethod.setAccessible(true);
|
||||||
|
@ -273,15 +268,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
return createFileCreator2();
|
return createFileCreator2();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static CreateFlag loadShouldReplicateFlag() {
|
|
||||||
try {
|
|
||||||
return CreateFlag.valueOf("SHOULD_REPLICATE");
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cancel the processing if DFSClient is already closed.
|
// cancel the processing if DFSClient is already closed.
|
||||||
static final class CancelOnClose implements CancelableProgressable {
|
static final class CancelOnClose implements CancelableProgressable {
|
||||||
|
|
||||||
|
@ -302,7 +288,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
LEASE_MANAGER = createLeaseManager();
|
LEASE_MANAGER = createLeaseManager();
|
||||||
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
|
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
|
||||||
FILE_CREATOR = createFileCreator();
|
FILE_CREATOR = createFileCreator();
|
||||||
SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String msg = "Couldn't properly initialize access to HDFS internals. Please " +
|
String msg = "Couldn't properly initialize access to HDFS internals. Please " +
|
||||||
"update your WAL Provider to not make use of the 'asyncfs' provider. See " +
|
"update your WAL Provider to not make use of the 'asyncfs' provider. See " +
|
||||||
|
@ -503,9 +488,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
if (overwrite) {
|
if (overwrite) {
|
||||||
flags.add(CreateFlag.OVERWRITE);
|
flags.add(CreateFlag.OVERWRITE);
|
||||||
}
|
}
|
||||||
if (SHOULD_REPLICATE_FLAG != null) {
|
flags.add(CreateFlag.SHOULD_REPLICATE);
|
||||||
flags.add(SHOULD_REPLICATE_FLAG);
|
|
||||||
}
|
|
||||||
return new EnumSetWritable<>(EnumSet.copyOf(flags));
|
return new EnumSetWritable<>(EnumSet.copyOf(flags));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,9 +205,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
private Path entryptionTestDirOnTestFs;
|
private Path entryptionTestDirOnTestFs;
|
||||||
|
|
||||||
private void createEncryptionZone() throws Exception {
|
private void createEncryptionZone() throws Exception {
|
||||||
Method method =
|
FS.createEncryptionZone(entryptionTestDirOnTestFs, TEST_KEY_NAME);
|
||||||
DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
|
|
||||||
method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -67,27 +68,18 @@ public class TestHBaseWalOnEC {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
try {
|
MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
|
||||||
MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
|
||||||
|
|
||||||
Method enableAllECPolicies =
|
DFSTestUtil.enableAllECPolicies(fs);
|
||||||
DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class);
|
|
||||||
enableAllECPolicies.invoke(null, fs);
|
|
||||||
|
|
||||||
DFSClient client = fs.getClient();
|
HdfsAdmin hdfsAdmin = new HdfsAdmin(fs.getUri(), UTIL.getConfiguration());
|
||||||
Method setErasureCodingPolicy =
|
hdfsAdmin.setErasureCodingPolicy(new Path("/"), "RS-3-2-1024k");
|
||||||
DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class);
|
|
||||||
setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy
|
|
||||||
|
|
||||||
try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
|
try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
|
||||||
// If this comes back as having hflush then some test setup assumption is wrong.
|
// If this comes back as having hflush then some test setup assumption is wrong.
|
||||||
// Fail the test so that a developer has to look and triage
|
// Fail the test so that a developer has to look and triage
|
||||||
assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH));
|
assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH));
|
||||||
}
|
|
||||||
} catch (NoSuchMethodException e) {
|
|
||||||
// We're not testing anything interesting if EC is not available, so skip the rest of the test
|
|
||||||
Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
|
UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
|
||||||
|
|
Loading…
Reference in New Issue