HBASE-15407 Add SASL support for fan out OutputStream

This commit is contained in:
zhangduo 2016-03-27 19:01:05 +08:00
parent e450d94a2c
commit 6ea4994569
5 changed files with 1385 additions and 120 deletions

View File

@ -17,11 +17,26 @@
*/
package org.apache.hadoop.hbase.util;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.io.IOException;
@ -36,6 +51,8 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -52,23 +69,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Supplier;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/**
* An asynchronous HDFS output stream implementation which fans out data to datanode and only
* supports writing file with only one block.
@ -278,7 +278,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
if (e.state() == READER_IDLE) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
@ -286,7 +286,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable {
return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
}
});
} else if (e.state() == IdleState.WRITER_IDLE) {
} else if (e.state() == WRITER_IDLE) {
PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
int len = heartbeat.getSerializedSize();
ByteBuf buf = alloc.buffer(len);

View File

@ -18,12 +18,36 @@
package org.apache.hadoop.hbase.util;
import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@ -33,6 +57,10 @@ import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.CodedOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -68,46 +96,21 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.CodedOutputStream;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/**
* Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
*/
@InterfaceAudience.Private
public class FanOutOneBlockAsyncDFSOutputHelper {
public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class);
@ -167,6 +170,7 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
// This is used to terminate a recoverFileLease call when FileSystem is already closed.
// isClientRunning is not public so we need to use reflection.
private interface DFSClientAdaptor {
boolean isClientRunning(DFSClient client);
}
@ -174,14 +178,14 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
private static DFSClientAdaptor createDFSClientAdaptor() {
try {
final Method method = DFSClient.class.getDeclaredMethod("isClientRunning");
method.setAccessible(true);
final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
isClientRunningMethod.setAccessible(true);
return new DFSClientAdaptor() {
@Override
public boolean isClientRunning(DFSClient client) {
try {
return (Boolean) method.invoke(client);
return (Boolean) isClientRunningMethod.invoke(client);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
@ -194,11 +198,11 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
private static LeaseManager createLeaseManager() {
try {
final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
long.class, DFSOutputStream.class);
final Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
long.class);
final Method endFileLeaseMethod =
DFSClient.class.getDeclaredMethod("endFileLease", long.class);
endFileLeaseMethod.setAccessible(true);
return new LeaseManager() {
@ -224,11 +228,11 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e);
}
try {
final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
String.class, DFSOutputStream.class);
final Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
String.class);
final Method endFileLeaseMethod =
DFSClient.class.getDeclaredMethod("endFileLease", String.class);
endFileLeaseMethod.setAccessible(true);
return new LeaseManager() {
@ -261,18 +265,19 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
@SuppressWarnings("rawtypes")
Class<? extends Enum> ecnClass;
try {
ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
.asSubclass(Enum.class);
ecnClass =
Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
.asSubclass(Enum.class);
} catch (ClassNotFoundException e) {
throw new Error(e);
}
@SuppressWarnings("unchecked")
final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass,
Status.class);
final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader",
int.class);
final Method combineHeaderMethod =
PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
final Method getStatusFromHeaderMethod =
PipelineAck.class.getMethod("getStatusFromHeader", int.class);
return new PipelineAckStatusGetter() {
@Override
@ -317,8 +322,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
private static StorageTypeSetter createStorageTypeSetter() {
final Method setStorageTypeMethod;
try {
setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType",
StorageTypeProto.class);
setStorageTypeMethod =
OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
} catch (NoSuchMethodException e) {
LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e);
return new StorageTypeSetter() {
@ -362,8 +367,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws IOException {
try {
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
createParent, replication, blockSize);
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
flag, createParent, replication, blockSize);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
@ -374,16 +379,16 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
};
} else {
try {
Class<?> cryptoProtocolVersionClass = Class
.forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
Class<?> cryptoProtocolVersionClass =
Class.forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
final Object supported = supportedMethod.invoke(null);
return new FileCreater() {
@Override
public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws IOException {
public HdfsFileStatus create(ClientProtocol namenode, String src,
FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize) throws IOException {
try {
return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
flag, createParent, replication, blockSize, supported);
@ -481,8 +486,12 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
}
// success
ChannelPipeline p = ctx.pipeline();
while (p.first() != null) {
p.removeFirst();
for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
// do not remove all handlers because we may have wrap or unwrap handlers at the header
// of pipeline.
if (handler instanceof IdleStateHandler) {
break;
}
}
// Disable auto read here. Enable it after we setup the streaming pipeline in
// FanOutOneBLockAsyncDFSOutput.
@ -497,8 +506,7 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent
&& ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
promise
.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
} else {
@ -515,39 +523,64 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
private static void requestWriteBlock(Channel channel, Enum<?> storageType,
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
// TODO: SASL negotiation. should be done using a netty Handler.
OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
int protoLen = proto.getSerializedSize();
ByteBuf buffer = channel.alloc()
.buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
ByteBuf buffer =
channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
buffer.writeByte(Op.WRITE_BLOCK.code);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
channel.writeAndFlush(buffer);
}
private static List<Future<Channel>> connectToDataNodes(Configuration conf, String clientName,
LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage,
DataChecksum summer, EventLoop eventLoop) {
private static void initialize(Configuration conf, final Channel channel,
final DatanodeInfo dnInfo, final Enum<?> storageType,
final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs,
DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
saslPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
// setup response processing pipeline first, then send request.
processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
} else {
promise.tryFailure(future.cause());
}
}
});
}
private static List<Future<Channel>> connectToDataNodes(final Configuration conf,
final DFSClient client, String clientName, final LocatedBlock locatedBlock,
long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer,
EventLoop eventLoop) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
boolean connectToDnViaHostname =
conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
final int timeoutMs =
conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT);
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
blockCopy.setNumBytes(locatedBlock.getBlockSize());
ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
.setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy))
.setToken(PBHelper.convert(locatedBlock.getBlockToken())))
.setClientName(clientName).build();
ClientOperationHeaderProto header =
ClientOperationHeaderProto
.newBuilder()
.setBaseHeader(
BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy))
.setToken(PBHelper.convert(locatedBlock.getBlockToken())))
.setClientName(clientName).build();
ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
.setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
.setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
.setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
.setRequestedChecksum(checksumProto)
.setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
final OpWriteBlockProto.Builder writeBlockProtoBuilder =
OpWriteBlockProto.newBuilder().setHeader(header)
.setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
.setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
.setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
.setRequestedChecksum(checksumProto)
.setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
for (int i = 0; i < datanodeInfos.length; i++) {
final DatanodeInfo dnInfo = datanodeInfos[i];
@ -562,14 +595,17 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
@Override
protected void initChannel(Channel ch) throws Exception {
processWriteBlockResponse(ch, dnInfo, promise, timeoutMs);
// we need to get the remote address of the channel so we can only move on after
// channel connected. Leave an empty implementation here because netty does not allow
// a null handler.
}
}).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
requestWriteBlock(future.channel(), storageType, writeBlockProtoBuilder);
initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
timeoutMs, client, locatedBlock.getBlockToken(), promise);
} else {
promise.tryFailure(future.cause());
}
@ -601,11 +637,14 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
ClientProtocol namenode = client.getNamenode();
HdfsFileStatus stat;
try {
stat = FILE_CREATER.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<CreateFlag>(
overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize);
stat =
FILE_CREATER.create(
namenode,
src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)),
clientName,
new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet
.of(CREATE)), createParent, replication, blockSize);
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
@ -619,19 +658,20 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(),
null);
locatedBlock =
namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE,
summer, eventLoop);
futureList =
connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE,
summer, eventLoop);
for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed.
datanodeList.add(future.syncUninterruptibly().getNow());
}
succ = true;
return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC);
return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName,
src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC);
} finally {
if (!succ) {
if (futureList != null) {
@ -664,8 +704,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException,
UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoop);
}
@ -684,8 +724,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
return e.getClassName().endsWith("RetryStartFileException");
}
static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
ExtendedBlock block, long fileId) {
static void completeFile(DFSClient client, ClientProtocol namenode, String src,
String clientName, ExtendedBlock block, long fileId) {
for (int retry = 0;; retry++) {
try {
if (namenode.complete(src, clientName, block, fileId)) {

View File

@ -102,8 +102,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
}
}
private void writeAndVerify(EventLoop eventLoop, Path f, final FanOutOneBlockAsyncDFSOutput out)
throws IOException, InterruptedException, ExecutionException {
static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
final FanOutOneBlockAsyncDFSOutput out)
throws IOException, InterruptedException, ExecutionException {
final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
@ -117,9 +118,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
});
assertEquals(b.length, handler.get());
out.close();
assertEquals(b.length, FS.getFileStatus(f).getLen());
assertEquals(b.length, dfs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = FS.open(f)) {
try (FSDataInputStream in = dfs.open(f)) {
in.readFully(actual);
}
assertArrayEquals(b, actual);
@ -131,7 +132,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
writeAndVerify(eventLoop, f, out);
writeAndVerify(eventLoop, FS, f, out);
}
@Test
@ -191,7 +192,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(eventLoop, f, out);
writeAndVerify(eventLoop, FS, f, out);
}
/**

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@RunWith(Parameterized.class)
@Category({ MiscTests.class, MediumTests.class })
public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static DistributedFileSystem FS;
private static EventLoopGroup EVENT_LOOP_GROUP;
private static int READ_TIMEOUT_MS = 200000;
private static final File KEYTAB_FILE = new File(
TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
private static String HOST = "localhost";
private static String USERNAME;
private static String PRINCIPAL;
private static String HTTP_PRINCIPAL;
@Rule
public TestName name = new TestName();
@Parameter(0)
public String protection;
@Parameter(1)
public String encryptionAlgorithm;
@Parameters(name = "{index}: protection={0}, encryption={1}")
public static Iterable<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
params.add(new Object[] { protection, encryptionAlgorithm });
}
}
return params;
}
private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
// change XXX_USER_NAME_KEY to XXX_KERBEROS_PRINCIPAL_KEY after we drop support for hadoop-2.4.1
conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm());
conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm());
conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
HTTP_PRINCIPAL + "@" + KDC.getRealm());
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath());
keystoresDir.mkdirs();
String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestGenerateDelegationToken.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
conf.setBoolean("ignore.secure.ports.for.testing", true);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG);
Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG);
EVENT_LOOP_GROUP = new NioEventLoopGroup();
TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
Properties conf = MiniKdc.createConf();
conf.put(MiniKdc.DEBUG, true);
KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
KDC.start();
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
PRINCIPAL = USERNAME + "/" + HOST;
HTTP_PRINCIPAL = "HTTP/" + HOST;
KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration());
HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration());
UserGroupInformation.setConfiguration(TEST_UTIL.getConfiguration());
}
@AfterClass
public static void tearDownAfterClass() throws IOException, InterruptedException {
if (EVENT_LOOP_GROUP != null) {
EVENT_LOOP_GROUP.shutdownGracefully().sync();
}
if (KDC != null) {
KDC.stop();
}
}
@Before
public void setUp() throws Exception {
TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
if (StringUtils.isBlank(encryptionAlgorithm)) {
TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
} else {
TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
}
TEST_UTIL.startMiniDFSCluster(3);
FS = TEST_UTIL.getDFSCluster().getFileSystem();
}
@After
public void tearDown() throws IOException {
TEST_UTIL.shutdownMiniDFSCluster();
}
private Path getTestFile() {
return new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
}
@Test
public void test() throws IOException, InterruptedException, ExecutionException {
Path f = getTestFile();
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out);
}
}