HBASE-19371 Running WALPerformanceEvaluation against asyncfswal throws exceptions
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
eb67ee0d0f
commit
856ee283fa
|
@ -192,7 +192,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
|
||||
// helper class for creating data checksum.
|
||||
private interface ChecksumCreater {
|
||||
DataChecksum createChecksum(Object conf);
|
||||
DataChecksum createChecksum(DFSClient client);
|
||||
}
|
||||
|
||||
private static final ChecksumCreater CHECKSUM_CREATER;
|
||||
|
@ -200,12 +200,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
// helper class for creating files.
|
||||
private interface FileCreator {
|
||||
default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize,
|
||||
CryptoProtocolVersion[] supportedVersions) throws Exception {
|
||||
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
|
||||
throws Exception {
|
||||
try {
|
||||
return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
|
||||
replication, blockSize, supportedVersions);
|
||||
replication, blockSize, supportedVersions);
|
||||
} catch (InvocationTargetException e) {
|
||||
if (e.getCause() instanceof Exception) {
|
||||
throw (Exception) e.getCause();
|
||||
|
@ -215,9 +215,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
}
|
||||
};
|
||||
|
||||
Object createObject(ClientProtocol instance, String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize,
|
||||
Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
|
||||
EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
|
||||
CryptoProtocolVersion[] supportedVersions) throws Exception;
|
||||
}
|
||||
|
||||
|
@ -276,9 +275,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
|
||||
.asSubclass(Enum.class);
|
||||
} catch (ClassNotFoundException e) {
|
||||
String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
|
||||
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
|
||||
+ "HBASE-16110 for more information.";
|
||||
String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
|
||||
"update your WAL Provider to not make use of the 'asyncfs' provider. See " +
|
||||
"HBASE-16110 for more information.";
|
||||
LOG.error(msg, e);
|
||||
throw new Error(msg, e);
|
||||
}
|
||||
|
@ -332,7 +331,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
try {
|
||||
return createPipelineAckStatusGetter27();
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.debug("Can not get expected methods, should be hadoop 2.6-", e);
|
||||
LOG.debug("Can not get expected method " + e.getMessage() +
|
||||
", this usually because your Hadoop is pre 2.7.0, " +
|
||||
"try the methods in Hadoop 2.6.x instead.");
|
||||
}
|
||||
return createPipelineAckStatusGetter26();
|
||||
}
|
||||
|
@ -414,7 +415,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
helperClass = Class.forName(clazzName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
|
||||
LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
|
||||
LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
|
||||
helperClass.toString() + " instead.");
|
||||
}
|
||||
Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
|
||||
|
@ -441,7 +442,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
};
|
||||
}
|
||||
|
||||
private static ChecksumCreater createChecksumCreater28(Class<?> confClass)
|
||||
private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
|
||||
throws NoSuchMethodException {
|
||||
for (Method method : confClass.getMethods()) {
|
||||
if (method.getName().equals("createChecksum")) {
|
||||
|
@ -449,9 +450,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
return new ChecksumCreater() {
|
||||
|
||||
@Override
|
||||
public DataChecksum createChecksum(Object conf) {
|
||||
public DataChecksum createChecksum(DFSClient client) {
|
||||
try {
|
||||
return (DataChecksum) createChecksumMethod.invoke(conf, (Object) null);
|
||||
return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
|
||||
(Object) null);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -462,16 +464,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
|
||||
}
|
||||
|
||||
private static ChecksumCreater createChecksumCreater27(Class<?> confClass)
|
||||
private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
|
||||
throws NoSuchMethodException {
|
||||
Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
|
||||
createChecksumMethod.setAccessible(true);
|
||||
return new ChecksumCreater() {
|
||||
|
||||
@Override
|
||||
public DataChecksum createChecksum(Object conf) {
|
||||
public DataChecksum createChecksum(DFSClient client) {
|
||||
try {
|
||||
return (DataChecksum) createChecksumMethod.invoke(conf);
|
||||
return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -481,36 +483,38 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
|
||||
private static ChecksumCreater createChecksumCreater()
|
||||
throws NoSuchMethodException, ClassNotFoundException {
|
||||
Method getConfMethod = DFSClient.class.getMethod("getConf");
|
||||
try {
|
||||
return createChecksumCreater28(
|
||||
return createChecksumCreater28(getConfMethod,
|
||||
Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
|
||||
}
|
||||
return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
|
||||
return createChecksumCreater27(getConfMethod,
|
||||
Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
|
||||
}
|
||||
|
||||
private static FileCreator createFileCreator3() throws NoSuchMethodException {
|
||||
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
|
||||
String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class,
|
||||
String.class);
|
||||
String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
|
||||
CryptoProtocolVersion[].class, String.class);
|
||||
|
||||
return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
|
||||
supportedVersions) -> {
|
||||
return (HdfsFileStatus) createMethod.invoke(instance,
|
||||
src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions,
|
||||
null);
|
||||
supportedVersions) -> {
|
||||
return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
|
||||
createParent, replication, blockSize, supportedVersions, null);
|
||||
};
|
||||
}
|
||||
|
||||
private static FileCreator createFileCreator2() throws NoSuchMethodException {
|
||||
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
|
||||
String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class);
|
||||
String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
|
||||
CryptoProtocolVersion[].class);
|
||||
|
||||
return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
|
||||
supportedVersions) -> {
|
||||
return (HdfsFileStatus) createMethod.invoke(instance,
|
||||
src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions);
|
||||
supportedVersions) -> {
|
||||
return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
|
||||
createParent, replication, blockSize, supportedVersions);
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -549,9 +553,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
CHECKSUM_CREATER = createChecksumCreater();
|
||||
FILE_CREATOR = createFileCreator();
|
||||
} catch (Exception e) {
|
||||
String msg = "Couldn't properly initialize access to HDFS internals. Please "
|
||||
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
|
||||
+ "HBASE-16110 for more information.";
|
||||
String msg = "Couldn't properly initialize access to HDFS internals. Please " +
|
||||
"update your WAL Provider to not make use of the 'asyncfs' provider. See " +
|
||||
"HBASE-16110 for more information.";
|
||||
LOG.error(msg, e);
|
||||
throw new Error(msg, e);
|
||||
}
|
||||
|
@ -566,7 +570,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
}
|
||||
|
||||
static DataChecksum createChecksum(DFSClient client) {
|
||||
return CHECKSUM_CREATER.createChecksum(client.getConf());
|
||||
return CHECKSUM_CREATER.createChecksum(client);
|
||||
}
|
||||
|
||||
static Status getStatus(PipelineAckProto ack) {
|
||||
|
@ -590,11 +594,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
|
||||
if (resp.getStatus() != Status.SUCCESS) {
|
||||
if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
|
||||
throw new InvalidBlockTokenException("Got access token error" + ", status message "
|
||||
+ resp.getMessage() + ", " + logInfo);
|
||||
throw new InvalidBlockTokenException("Got access token error" + ", status message " +
|
||||
resp.getMessage() + ", " + logInfo);
|
||||
} else {
|
||||
throw new IOException("Got error" + ", status=" + resp.getStatus().name()
|
||||
+ ", status message " + resp.getMessage() + ", " + logInfo);
|
||||
throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
|
||||
", status message " + resp.getMessage() + ", " + logInfo);
|
||||
}
|
||||
}
|
||||
// success
|
||||
|
@ -667,10 +671,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
|||
});
|
||||
}
|
||||
|
||||
private static List<Future<Channel>> connectToDataNodes(Configuration conf,
|
||||
DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd,
|
||||
long latestGS, BlockConstructionStage stage, DataChecksum summer,
|
||||
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
|
||||
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
|
||||
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
|
||||
BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
|
||||
Class<? extends Channel> channelClass) {
|
||||
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
|
||||
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
|
||||
boolean connectToDnViaHostname =
|
||||
|
|
Loading…
Reference in New Issue