HBASE-16110 AsyncFS WAL doesn't work with Hadoop 2.8+
Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
@ -71,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.util.DataChecksum;
@ -339,7 +339,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.alloc = alloc;
this.buf = alloc.directBuffer();
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT));
@ -99,15 +99,15 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -128,8 +128,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// copied from DFSPacket since it is package private.
public static final long HEART_BEAT_SEQNO = -1L;
// helper class for creating DataChecksum object.
private static final Method CREATE_CHECKSUM;
// Timeouts for communicating with DataNode for streaming writes/reads
public static final int READ_TIMEOUT = 60 * 1000;
public static final int READ_TIMEOUT_EXTENSION = 5 * 1000;
public static final int WRITE_TIMEOUT = 8 * 60 * 1000;
// helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
// getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
@ -161,6 +163,17 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final FileCreater FILE_CREATER;
// helper class for calling add block method on namenode. There is a addBlockFlags parameter for
// hadoop 2.8 or later. See createBlockAdder for more details.
private interface BlockAdder {
LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
throws IOException;
private static final BlockAdder BLOCK_ADDER;
// helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and
// hadoop 2.5 or after use inodeId. See createLeaseManager for more details.
private interface LeaseManager {
@ -181,156 +194,182 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
private static DFSClientAdaptor createDFSClientAdaptor() {
try {
final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
return new DFSClientAdaptor() {
// helper class for convert protos.
private interface PBHelper {
public boolean isClientRunning(DFSClient client) {
try {
return (Boolean) isClientRunningMethod.invoke(client);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new Error(e);
ExtendedBlockProto convert(final ExtendedBlock b);
TokenProto convert(Token<?> tok);
private static LeaseManager createLeaseManager() {
try {
final Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
final Method endFileLeaseMethod =
DFSClient.class.getDeclaredMethod("endFileLease", long.class);
return new LeaseManager() {
private static final PBHelper PB_HELPER;
public void begin(DFSClient client, String src, long inodeId) {
try {
beginFileLeaseMethod.invoke(client, inodeId, null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
public void end(DFSClient client, String src, long inodeId) {
try {
endFileLeaseMethod.invoke(client, inodeId);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e);
try {
final Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
final Method endFileLeaseMethod =
DFSClient.class.getDeclaredMethod("endFileLease", String.class);
return new LeaseManager() {
public void begin(DFSClient client, String src, long inodeId) {
try {
beginFileLeaseMethod.invoke(client, src, null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
public void end(DFSClient client, String src, long inodeId) {
try {
endFileLeaseMethod.invoke(client, src);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new Error(e);
// helper class for creating data checksum.
private interface ChecksumCreater {
DataChecksum createChecksum(Object conf);
private static PipelineAckStatusGetter createPipelineAckStatusGetter() {
try {
final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
Class<? extends Enum> ecnClass;
try {
ecnClass =
} catch (ClassNotFoundException e) {
throw new Error(e);
private static final ChecksumCreater CHECKSUM_CREATER;
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
return new DFSClientAdaptor() {
public boolean isClientRunning(DFSClient client) {
try {
return (Boolean) isClientRunningMethod.invoke(client);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
final Method combineHeaderMethod =
PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
final Method getStatusFromHeaderMethod =
PipelineAck.class.getMethod("getStatusFromHeader", int.class);
return new PipelineAckStatusGetter() {
public Status get(PipelineAckProto ack) {
try {
List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
Integer headerFlag;
if (flagList.isEmpty()) {
Status reply = (Status) getReplyMethod.invoke(ack, 0);
headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
} else {
headerFlag = flagList.get(0);
return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
private static LeaseManager createLeaseManager25() throws NoSuchMethodException {
final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
long.class, DFSOutputStream.class);
final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
return new LeaseManager() {
public void begin(DFSClient client, String src, long inodeId) {
try {
beginFileLeaseMethod.invoke(client, inodeId, null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
LOG.warn("Can not get expected methods, should be hadoop 2.6-", e);
public void end(DFSClient client, String src, long inodeId) {
try {
endFileLeaseMethod.invoke(client, inodeId);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
private static LeaseManager createLeaseManager24() throws NoSuchMethodException {
final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
String.class, DFSOutputStream.class);
final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
return new LeaseManager() {
public void begin(DFSClient client, String src, long inodeId) {
try {
beginFileLeaseMethod.invoke(client, src, null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
public void end(DFSClient client, String src, long inodeId) {
try {
endFileLeaseMethod.invoke(client, src);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
private static LeaseManager createLeaseManager() throws NoSuchMethodException {
try {
final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
return new PipelineAckStatusGetter() {
public Status get(PipelineAckProto ack) {
try {
return (Status) getStatusMethod.invoke(ack, 0);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
return createLeaseManager25();
} catch (NoSuchMethodException e) {
throw new Error(e);
LOG.debug("No inodeId related lease methods found, should be hadoop 2.4-", e);
return createLeaseManager24();
private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
throws NoSuchMethodException {
final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
Class<? extends Enum> ecnClass;
try {
ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
} catch (ClassNotFoundException e) {
final String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
throw new Error(msg, e);
final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass,
final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader",
return new PipelineAckStatusGetter() {
public Status get(PipelineAckProto ack) {
try {
List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
Integer headerFlag;
if (flagList.isEmpty()) {
Status reply = (Status) getReplyMethod.invoke(ack, 0);
headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
} else {
headerFlag = flagList.get(0);
return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
throws NoSuchMethodException {
final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
return new PipelineAckStatusGetter() {
public Status get(PipelineAckProto ack) {
try {
return (Status) getStatusMethod.invoke(ack, 0);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
private static PipelineAckStatusGetter createPipelineAckStatusGetter()
throws NoSuchMethodException {
try {
return createPipelineAckStatusGetter27();
} catch (NoSuchMethodException e) {
LOG.debug("Can not get expected methods, should be hadoop 2.6-", e);
return createPipelineAckStatusGetter26();
private static StorageTypeSetter createStorageTypeSetter() {
final Method setStorageTypeMethod;
try {
setStorageTypeMethod =
OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType",
} catch (NoSuchMethodException e) {
LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e);
LOG.debug("noSetStorageType method found, should be hadoop 2.5-", e);
return new StorageTypeSetter() {
@ -359,7 +398,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static FileCreater createFileCreater() {
private static FileCreater createFileCreater() throws ClassNotFoundException,
NoSuchMethodException, IllegalAccessException, InvocationTargetException {
for (Method method : ClientProtocol.class.getMethods()) {
if (method.getName().equals("create")) {
final Method createMethod = method;
@ -372,8 +412,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws IOException {
try {
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
flag, createParent, replication, blockSize);
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
createParent, replication, blockSize);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
@ -383,36 +423,159 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
} else {
try {
Class<?> cryptoProtocolVersionClass =
Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
final Object supported = supportedMethod.invoke(null);
return new FileCreater() {
Class<?> cryptoProtocolVersionClass = Class
Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
final Object supported = supportedMethod.invoke(null);
return new FileCreater() {
public HdfsFileStatus create(ClientProtocol namenode, String src,
FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize) throws IOException {
try {
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
flag, createParent, replication, blockSize, supported);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e);
public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws IOException {
try {
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
createParent, replication, blockSize, supported);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
throw new Error(e);
throw new Error("No create method found for " + ClientProtocol.class.getName());
throw new NoSuchMethodException("Can not find create method in ClientProtocol");
private static BlockAdder createBlockAdder() throws NoSuchMethodException {
for (Method method : ClientProtocol.class.getMethods()) {
if (method.getName().equals("addBlock")) {
final Method addBlockMethod = method;
Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
if (paramTypes[paramTypes.length - 1] == String[].class) {
return new BlockAdder() {
public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
String[] favoredNodes) throws IOException {
try {
return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
excludeNodes, fileId, favoredNodes);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e);
} else {
return new BlockAdder() {
public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
String[] favoredNodes) throws IOException {
try {
return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
excludeNodes, fileId, favoredNodes, null);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e);
throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
private static PBHelper createPBHelper() throws NoSuchMethodException {
Class<?> helperClass;
try {
helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
} catch (ClassNotFoundException e) {
LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
final Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
return new PBHelper() {
public ExtendedBlockProto convert(ExtendedBlock b) {
try {
return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
public TokenProto convert(Token<?> tok) {
try {
return (TokenProto) convertTokenMethod.invoke(null, tok);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
private static ChecksumCreater createChecksumCreater28(Class<?> confClass)
throws NoSuchMethodException {
for (Method method : confClass.getMethods()) {
if (method.getName().equals("createChecksum")) {
final Method createChecksumMethod = method;
return new ChecksumCreater() {
public DataChecksum createChecksum(Object conf) {
try {
return (DataChecksum) createChecksumMethod.invoke(conf, (Object) null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
private static ChecksumCreater createChecksumCreater27(Class<?> confClass)
throws NoSuchMethodException {
final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
return new ChecksumCreater() {
public DataChecksum createChecksum(Object conf) {
try {
return (DataChecksum) createChecksumMethod.invoke(conf);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
private static ChecksumCreater createChecksumCreater()
throws NoSuchMethodException, ClassNotFoundException {
try {
return createChecksumCreater28(
} catch (ClassNotFoundException e) {
LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
// cancel the processing if DFSClient is already closed.
@ -432,17 +595,21 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
static {
try {
CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum");
} catch (NoSuchMethodException e) {
throw new Error(e);
PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
STORAGE_TYPE_SETTER = createStorageTypeSetter();
FILE_CREATER = createFileCreater();
BLOCK_ADDER = createBlockAdder();
LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
PB_HELPER = createPBHelper();
CHECKSUM_CREATER = createChecksumCreater();
} catch (Exception e) {
final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
throw new Error(msg, e);
PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
STORAGE_TYPE_SETTER = createStorageTypeSetter();
FILE_CREATER = createFileCreater();
LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
static void beginFileLease(DFSClient client, String src, long inodeId) {
@ -454,11 +621,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
static DataChecksum createChecksum(DFSClient client) {
try {
return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
return CHECKSUM_CREATER.createChecksum(client.getConf());
static Status getStatus(PipelineAckProto ack) {
@ -530,8 +693,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
int protoLen = proto.getSerializedSize();
ByteBuf buffer =
channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
ByteBuf buffer = channel.alloc()
.buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
@ -540,8 +703,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static void initialize(Configuration conf, final Channel channel,
final DatanodeInfo dnInfo, final Enum<?> storageType,
final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs,
DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client,
Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
saslPromise.addListener(new FutureListener<Void>() {
@ -560,32 +723,26 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static List<Future<Channel>> connectToDataNodes(final Configuration conf,
final DFSClient client, String clientName, final LocatedBlock locatedBlock,
long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer,
EventLoop eventLoop) {
final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd,
long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname =
final int timeoutMs =
boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
ClientOperationHeaderProto header =
ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
final OpWriteBlockProto.Builder writeBlockProtoBuilder =
final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
for (int i = 0; i < datanodeInfos.length; i++) {
final DatanodeInfo dnInfo = datanodeInfos[i];
@ -642,14 +799,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
ClientProtocol namenode = client.getNamenode();
HdfsFileStatus stat;
try {
stat =
new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet
.of(CREATE)), createParent, replication, blockSize);
stat = FILE_CREATER.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<CreateFlag>(
overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize);
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
@ -663,12 +817,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
locatedBlock =
namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null);
locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null,
stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>();
futureList =
connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE,
summer, eventLoop);
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoop);
for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed.
@ -712,8 +865,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException,
UnresolvedLinkException {
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoop);
@ -86,7 +86,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
@ -112,8 +111,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final String NAME_DELIMITER = " ";
static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
@ -185,7 +183,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
try {
cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec");
} catch (ClassNotFoundException e) {
LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e);
LOG.debug("No CryptoCodec class found, should be hadoop 2.5-", e);
if (cryptoCodecClass != null) {
Method getInstanceMethod = null;
@ -195,8 +193,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
CREATE_CODEC = getInstanceMethod;
try {
if (getInstanceMethod == null) {
throw new NoSuchMethodException(
"Can not find suitable getInstance method in CryptoCodec");
CREATE_CODEC = getInstanceMethod;
CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
@ -207,11 +209,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor");
INIT_DECRYPTOR = decryptorClass.getMethod("init", byte[].class, byte[].class);
DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class);
} catch (NoSuchMethodException | ClassNotFoundException e) {
throw new Error(e);
} catch (Exception e) {
final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
throw new Error(msg, e);
} else {
LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-");
@ -329,62 +334,53 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static SaslAdaptor createSaslAdaptor25() {
try {
final Field trustedChannelResolverField = DFSClient.class
final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
return new SaslAdaptor() {
private static SaslAdaptor createSaslAdaptor25()
throws NoSuchFieldException, NoSuchMethodException {
final Field trustedChannelResolverField = DFSClient.class
final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
return new SaslAdaptor() {
public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
try {
return (TrustedChannelResolver) trustedChannelResolverField.get(client);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
try {
return (TrustedChannelResolver) trustedChannelResolverField.get(client);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
return null;
public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
return null;
public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
return null;
public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
try {
return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
return null;
public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
try {
return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
} catch (NoSuchFieldException | NoSuchMethodException e) {
throw new Error(e);
private static SaslAdaptor createSaslAdaptor() {
Class<?> saslDataTransferClientClass = null;
private static SaslAdaptor createSaslAdaptor()
throws NoSuchFieldException, NoSuchMethodException {
try {
saslDataTransferClientClass = Class
return createSaslAdaptor27(
} catch (ClassNotFoundException e) {
LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-");
try {
return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass)
: createSaslAdaptor25();
} catch (NoSuchFieldException | NoSuchMethodException e) {
throw new Error(e);
LOG.debug("No SaslDataTransferClient class found, should be hadoop 2.5-", e);
return createSaslAdaptor25();
private static CipherOptionHelper createCipherHelper25() {
@ -451,9 +447,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
final Method convertCipherOptionsMethod = PBHelper.class.getMethod("convertCipherOptions",
Class<?> pbHelperClass;
try {
pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
} catch (ClassNotFoundException e) {
LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions",
final Method convertCipherOptionProtosMethod = PBHelper.class
final Method convertCipherOptionProtosMethod = pbHelperClass
.getMethod("convertCipherOptionProtos", List.class);
final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class
.getMethod("addAllCipherOption", Iterable.class);
@ -577,19 +580,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static CipherOptionHelper createCipherHelper() {
private static CipherOptionHelper createCipherHelper()
throws ClassNotFoundException, NoSuchMethodException {
Class<?> cipherOptionClass;
try {
cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
} catch (ClassNotFoundException e) {
LOG.warn("No CipherOption class found, should be hadoop 2.5-");
LOG.debug("No CipherOption class found, should be hadoop 2.5-", e);
return createCipherHelper25();
try {
return createCipherHelper27(cipherOptionClass);
} catch (NoSuchMethodException | ClassNotFoundException e) {
throw new Error(e);
return createCipherHelper27(cipherOptionClass);
private static TransparentCryptoHelper createTransparentCryptoHelper25() {
@ -646,25 +646,30 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static TransparentCryptoHelper createTransparentCryptoHelper() {
private static TransparentCryptoHelper createTransparentCryptoHelper()
throws NoSuchMethodException, ClassNotFoundException {
Class<?> feInfoClass;
try {
feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo");
} catch (ClassNotFoundException e) {
LOG.warn("No FileEncryptionInfo class found, should be hadoop 2.5-");
LOG.debug("No FileEncryptionInfo class found, should be hadoop 2.5-", e);
return createTransparentCryptoHelper25();
try {
return createTransparentCryptoHelper27(feInfoClass);
} catch (NoSuchMethodException | ClassNotFoundException e) {
throw new Error(e);
return createTransparentCryptoHelper27(feInfoClass);
static {
SASL_ADAPTOR = createSaslAdaptor();
CIPHER_OPTION_HELPER = createCipherHelper();
TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
try {
SASL_ADAPTOR = createSaslAdaptor();
CIPHER_OPTION_HELPER = createCipherHelper();
TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
} catch (Exception e) {
final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
throw new Error(msg, e);
@ -828,40 +833,40 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
byte[] challenge = proto.getPayload().toByteArray();
byte[] response = saslClient.evaluateChallenge(challenge);
switch (step) {
case 1: {
List<Object> cipherOptions = null;
if (requestedQopContainsPrivacy()) {
cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
sendSaslMessage(ctx, response, cipherOptions);
case 1: {
List<Object> cipherOptions = null;
if (requestedQopContainsPrivacy()) {
cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
case 2: {
assert response == null;
Object cipherOption =
CIPHER_OPTION_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
ChannelPipeline p = ctx.pipeline();
while (p.first() != null) {
if (cipherOption != null) {
CryptoCodec codec = new CryptoCodec(conf, cipherOption);
p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
} else {
if (useWrap()) {
p.addLast(new SaslWrapHandler(saslClient),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
new SaslUnwrapHandler(saslClient));
sendSaslMessage(ctx, response, cipherOptions);
case 2: {
assert response == null;
Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto,
isNegotiatedQopPrivacy(), saslClient);
ChannelPipeline p = ctx.pipeline();
while (p.first() != null) {
throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
if (cipherOption != null) {
CryptoCodec codec = new CryptoCodec(conf, cipherOption);
p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
} else {
if (useWrap()) {
p.addLast(new SaslWrapHandler(saslClient),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
new SaslUnwrapHandler(saslClient));
throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
} else {
Reference in New Issue
Block a user