HBASE-18177 FanOutOneBlockAsyncDFSOutputHelper fails to compile against Hadoop 3
Because ClientProtocol::create has API changes between Hadoop 2/3 Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
22dce22e06
commit
cb5299ae9b
|
@ -67,6 +67,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||||
import org.apache.hadoop.crypto.Encryptor;
|
import org.apache.hadoop.crypto.Encryptor;
|
||||||
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileSystemLinkResolver;
|
import org.apache.hadoop.fs.FileSystemLinkResolver;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -195,6 +196,32 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
|
|
||||||
private static final ChecksumCreater CHECKSUM_CREATER;
|
private static final ChecksumCreater CHECKSUM_CREATER;
|
||||||
|
|
||||||
|
// helper class for creating files.
|
||||||
|
private interface FileCreator {
|
||||||
|
default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
|
||||||
|
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||||
|
boolean createParent, short replication, long blockSize,
|
||||||
|
CryptoProtocolVersion[] supportedVersions) throws Exception {
|
||||||
|
try {
|
||||||
|
return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
|
||||||
|
replication, blockSize, supportedVersions);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
if (e.getCause() instanceof Exception) {
|
||||||
|
throw (Exception) e.getCause();
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException(e.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Object createObject(ClientProtocol instance, String src, FsPermission masked,
|
||||||
|
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||||
|
boolean createParent, short replication, long blockSize,
|
||||||
|
CryptoProtocolVersion[] supportedVersions) throws Exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final FileCreator FILE_CREATOR;
|
||||||
|
|
||||||
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
|
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
|
||||||
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
|
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
|
||||||
isClientRunningMethod.setAccessible(true);
|
isClientRunningMethod.setAccessible(true);
|
||||||
|
@ -460,6 +487,39 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
|
return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static FileCreator createFileCreator3() throws NoSuchMethodException {
|
||||||
|
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
|
||||||
|
String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class,
|
||||||
|
String.class);
|
||||||
|
|
||||||
|
return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
|
||||||
|
supportedVersions) -> {
|
||||||
|
return (HdfsFileStatus) createMethod.invoke(instance,
|
||||||
|
src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions,
|
||||||
|
null);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FileCreator createFileCreator2() throws NoSuchMethodException {
|
||||||
|
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
|
||||||
|
String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class);
|
||||||
|
|
||||||
|
return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
|
||||||
|
supportedVersions) -> {
|
||||||
|
return (HdfsFileStatus) createMethod.invoke(instance,
|
||||||
|
src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FileCreator createFileCreator() throws NoSuchMethodException {
|
||||||
|
try {
|
||||||
|
return createFileCreator3();
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
|
||||||
|
}
|
||||||
|
return createFileCreator2();
|
||||||
|
}
|
||||||
|
|
||||||
// cancel the processing if DFSClient is already closed.
|
// cancel the processing if DFSClient is already closed.
|
||||||
static final class CancelOnClose implements CancelableProgressable {
|
static final class CancelOnClose implements CancelableProgressable {
|
||||||
|
|
||||||
|
@ -484,6 +544,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
|
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
|
||||||
PB_HELPER = createPBHelper();
|
PB_HELPER = createPBHelper();
|
||||||
CHECKSUM_CREATER = createChecksumCreater();
|
CHECKSUM_CREATER = createChecksumCreater();
|
||||||
|
FILE_CREATOR = createFileCreator();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String msg = "Couldn't properly initialize access to HDFS internals. Please "
|
String msg = "Couldn't properly initialize access to HDFS internals. Please "
|
||||||
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
|
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
|
||||||
|
@ -679,7 +740,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
ClientProtocol namenode = client.getNamenode();
|
ClientProtocol namenode = client.getNamenode();
|
||||||
HdfsFileStatus stat;
|
HdfsFileStatus stat;
|
||||||
try {
|
try {
|
||||||
stat = namenode.create(src,
|
stat = FILE_CREATOR.create(namenode, src,
|
||||||
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
|
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
|
||||||
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
|
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
|
||||||
createParent, replication, blockSize, CryptoProtocolVersion.supported());
|
createParent, replication, blockSize, CryptoProtocolVersion.supported());
|
||||||
|
|
Loading…
Reference in New Issue