HBASE-18177 FanOutOneBlockAsyncDFSOutputHelper fails to compile against Hadoop 3

Because ClientProtocol::create has API changes between Hadoop 2/3

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Mike Drob 2017-06-26 11:29:34 -05:00 committed by zhangduo
parent 45cabfb406
commit 2cde0be393
1 changed files with 62 additions and 1 deletions

View File

@ -67,6 +67,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path;
@ -195,6 +196,32 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final ChecksumCreater CHECKSUM_CREATER;
// helper class for creating files.
private interface FileCreator {
default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions) throws Exception {
try {
return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
replication, blockSize, supportedVersions);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Exception) {
throw (Exception) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
};
Object createObject(ClientProtocol instance, String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions) throws Exception;
}
private static final FileCreator FILE_CREATOR;
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
isClientRunningMethod.setAccessible(true);
@ -460,6 +487,39 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
}
private static FileCreator createFileCreator3() throws NoSuchMethodException {
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class,
String.class);
return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
supportedVersions) -> {
return (HdfsFileStatus) createMethod.invoke(instance,
src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions,
null);
};
}
private static FileCreator createFileCreator2() throws NoSuchMethodException {
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class);
return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
supportedVersions) -> {
return (HdfsFileStatus) createMethod.invoke(instance,
src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions);
};
}
private static FileCreator createFileCreator() throws NoSuchMethodException {
try {
return createFileCreator3();
} catch (NoSuchMethodException e) {
LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
}
return createFileCreator2();
}
// cancel the processing if DFSClient is already closed.
static final class CancelOnClose implements CancelableProgressable {
@ -484,6 +544,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
PB_HELPER = createPBHelper();
CHECKSUM_CREATER = createChecksumCreater();
FILE_CREATOR = createFileCreator();
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
@ -679,7 +740,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
ClientProtocol namenode = client.getNamenode();
HdfsFileStatus stat;
try {
stat = namenode.create(src,
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize, CryptoProtocolVersion.supported());