HBASE-19346 Use EventLoopGroup to create AsyncFSOutput

This commit is contained in:
zhangduo 2017-11-28 17:56:13 +08:00
parent 0e6f1a0240
commit e2e08866f6
10 changed files with 459 additions and 365 deletions

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
/** /**
* Interface for asynchronous filesystem output stream. * Interface for asynchronous filesystem output stream.
* <p>
* The implementation is not required to be thread safe.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface AsyncFSOutput extends Closeable { public interface AsyncFSOutput extends Closeable {

View File

@ -17,12 +17,6 @@
*/ */
package org.apache.hadoop.hbase.io.asyncfs; package org.apache.hadoop.hbase.io.asyncfs;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -35,12 +29,17 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
/** /**
* Helper class for creating AsyncFSOutput. * Helper class for creating AsyncFSOutput.
@ -56,12 +55,12 @@ public final class AsyncFSOutputHelper {
* implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}. * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
*/ */
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoop eventLoop, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) Class<? extends Channel> channelClass)
throws IOException, CommonFSUtils.StreamLacksCapabilityException { throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) { if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoop, channelClass); overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
} }
final FSDataOutputStream fsOut; final FSDataOutputStream fsOut;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
@ -75,23 +74,19 @@ public final class AsyncFSOutputHelper {
// ensure that we can provide the level of data safety we're configured // ensure that we can provide the level of data safety we're configured
// to provide. // to provide.
if (!(CommonFSUtils.hasCapability(fsOut, "hflush") && if (!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
CommonFSUtils.hasCapability(fsOut, "hsync"))) { CommonFSUtils.hasCapability(fsOut, "hsync"))) {
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync"); throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
} }
final ExecutorService flushExecutor = final ExecutorService flushExecutor =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build()); .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
return new AsyncFSOutput() { return new AsyncFSOutput() {
private final ByteArrayOutputStream out = new ByteArrayOutputStream(); private final ByteArrayOutputStream out = new ByteArrayOutputStream();
@Override @Override
public void write(final byte[] b, final int off, final int len) { public void write(byte[] b, int off, int len) {
if (eventLoop.inEventLoop()) { out.write(b, off, len);
out.write(b, off, len);
} else {
eventLoop.submit(() -> out.write(b, off, len)).syncUninterruptibly();
}
} }
@Override @Override
@ -99,6 +94,16 @@ public final class AsyncFSOutputHelper {
write(b, 0, b.length); write(b, 0, b.length);
} }
@Override
public void writeInt(int i) {
out.writeInt(i);
}
@Override
public void write(ByteBuffer bb) {
out.write(bb, bb.position(), bb.remaining());
}
@Override @Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException { public void recoverAndClose(CancelableProgressable reporter) throws IOException {
fsOut.close(); fsOut.close();
@ -116,7 +121,7 @@ public final class AsyncFSOutputHelper {
out.reset(); out.reset();
} }
} catch (IOException e) { } catch (IOException e) {
eventLoop.execute(() -> future.completeExceptionally(e)); eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
return; return;
} }
try { try {
@ -126,9 +131,9 @@ public final class AsyncFSOutputHelper {
fsOut.hflush(); fsOut.hflush();
} }
long pos = fsOut.getPos(); long pos = fsOut.getPos();
eventLoop.execute(() -> future.complete(pos)); eventLoopGroup.next().execute(() -> future.complete(pos));
} catch (IOException e) { } catch (IOException e) {
eventLoop.execute(() -> future.completeExceptionally(e)); eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
} }
} }
@ -164,16 +169,6 @@ public final class AsyncFSOutputHelper {
public int buffered() { public int buffered() {
return out.size(); return out.size();
} }
@Override
public void writeInt(int i) {
out.writeInt(i);
}
@Override
public void write(ByteBuffer bb) {
out.write(bb, bb.position(), bb.remaining());
}
}; };
} }
} }

View File

@ -17,48 +17,33 @@
*/ */
package org.apache.hadoop.hbase.io.asyncfs; package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.PromiseCombiner;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Deque; import java.util.Iterator;
import java.util.IdentityHashMap;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -66,14 +51,28 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelId;
import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
/** /**
* An asynchronous HDFS output stream implementation which fans out data to datanode and only * An asynchronous HDFS output stream implementation which fans out data to datanode and only
@ -81,21 +80,16 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
* <p> * <p>
* Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
* usage of this class is implementing WAL, so we only expose a little HDFS configurations in the * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
* method. And we place it here under util package because we want to make it independent of WAL * method. And we place it here under io package because we want to make it independent of WAL
* implementation thus easier to move it to HDFS project finally. * implementation thus easier to move it to HDFS project finally.
* <p> * <p>
* Note that, all connections to datanode will run in the same {@link EventLoop} which means we only * Note that, although we support pipelined flush, i.e, write new data and then flush before the
* need one thread here. But be careful, we do some blocking operations in {@link #close()} and * previous flush succeeds, the implementation is not thread safe, so you should not call its
* {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside * methods concurrently.
* {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)},
* {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop},
* there will be an extra context-switch.
* <p> * <p>
* Advantages compare to DFSOutputStream: * Advantages compare to DFSOutputStream:
* <ol> * <ol>
* <li>The fan out mechanism. This will reduce the latency.</li> * <li>The fan out mechanism. This will reduce the latency.</li>
* <li>The asynchronous WAL could also run in the same EventLoop, we could just call write and flush
* inside the EventLoop thread, so generally we only have one thread to do all the things.</li>
* <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
* ASAP.</li> * ASAP.</li>
* <li>We could benefit from netty's ByteBuf management mechanism.</li> * <li>We could benefit from netty's ByteBuf management mechanism.</li>
@ -124,12 +118,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private final long fileId; private final long fileId;
private final LocatedBlock locatedBlock; private final ExtendedBlock block;
private final DatanodeInfo[] locations;
private final Encryptor encryptor; private final Encryptor encryptor;
private final EventLoop eventLoop;
private final List<Channel> datanodeList; private final List<Channel> datanodeList;
private final DataChecksum summer; private final DataChecksum summer;
@ -140,65 +134,81 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private static final class Callback { private static final class Callback {
private final Promise<Void> promise; private final CompletableFuture<Long> future;
private final long ackedLength; private final long ackedLength;
private final Set<Channel> unfinishedReplicas; // should be backed by a thread safe collection
private final Set<ChannelId> unfinishedReplicas;
public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) { public Callback(CompletableFuture<Long> future, long ackedLength,
this.promise = promise; Collection<Channel> replicas) {
this.future = future;
this.ackedLength = ackedLength; this.ackedLength = ackedLength;
if (replicas.isEmpty()) { if (replicas.isEmpty()) {
this.unfinishedReplicas = Collections.emptySet(); this.unfinishedReplicas = Collections.emptySet();
} else { } else {
this.unfinishedReplicas = this.unfinishedReplicas =
Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size())); Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
this.unfinishedReplicas.addAll(replicas); replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
} }
} }
} }
private final Deque<Callback> waitingAckQueue = new ArrayDeque<>(); private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
private volatile long ackedBlockLength = 0L;
// this could be different from acked block length because a packet can not start at the middle of // this could be different from acked block length because a packet can not start at the middle of
// a chunk. // a chunk.
private long nextPacketOffsetInBlock = 0L; private long nextPacketOffsetInBlock = 0L;
// the length of the trailing partial chunk, this is because the packet start offset must be
// aligned with the length of checksum chunk so we need to resend the same data.
private int trailingPartialChunkLength = 0;
private long nextPacketSeqno = 0L; private long nextPacketSeqno = 0L;
private ByteBuf buf; private ByteBuf buf;
// buf's initial capacity - 4KB
private int capacity = 4 * 1024;
// LIMIT is 128MB private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
private static final int LIMIT = 128 * 1024 * 1024;
// State for connections to DN
private enum State { private enum State {
STREAMING, CLOSING, BROKEN, CLOSED STREAMING, CLOSING, BROKEN, CLOSED
} }
private State state; private volatile State state;
// all lock-free to make it run faster
private void completed(Channel channel) { private void completed(Channel channel) {
if (waitingAckQueue.isEmpty()) { for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
return; Callback c = iter.next();
} // if the current unfinished replicas does not contain us then it means that we have already
for (Callback c : waitingAckQueue) { // acked this one, let's iterate to find the one we have not acked yet.
if (c.unfinishedReplicas.remove(channel)) { if (c.unfinishedReplicas.remove(channel.id())) {
if (c.unfinishedReplicas.isEmpty()) { if (c.unfinishedReplicas.isEmpty()) {
c.promise.trySuccess(null); // we need to remove first before complete the future. It is possible that after we
// since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas // complete the future the upper layer will call close immediately before we remove the
// is empty, so this could only happen at the head of waitingAckQueue, so we just call // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
// removeFirst here. // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
waitingAckQueue.removeFirst(); // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
// also wake up flush requests which have the same length. // removing the entry returned by calling previous next, so if the entry has already been
for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) { // removed then it is a no-op, and for the assign, the values are the same so no problem.
if (cb.ackedLength == c.ackedLength) { iter.remove();
cb.promise.trySuccess(null); ackedBlockLength = c.ackedLength;
waitingAckQueue.removeFirst(); // the future.complete check is to confirm that we are the only one who grabbed the work,
} else { // otherwise just give up and return.
break; if (c.future.complete(c.ackedLength)) {
// also wake up flush requests which have the same length.
while (iter.hasNext()) {
Callback maybeDummyCb = iter.next();
if (maybeDummyCb.ackedLength == c.ackedLength) {
iter.remove();
maybeDummyCb.future.complete(c.ackedLength);
} else {
break;
}
} }
} }
} }
@ -207,13 +217,16 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
} }
} }
private void failed(Channel channel, Supplier<Throwable> errorSupplier) { // this usually does not happen which means it is not on the critical path so make it synchronized
// so that the implementation will not burn up our brain as there are multiple state changes and
// checks.
private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
if (state == State.BROKEN || state == State.CLOSED) { if (state == State.BROKEN || state == State.CLOSED) {
return; return;
} }
if (state == State.CLOSING) { if (state == State.CLOSING) {
Callback c = waitingAckQueue.peekFirst(); Callback c = waitingAckQueue.peekFirst();
if (c == null || !c.unfinishedReplicas.contains(channel)) { if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
// nothing, the endBlock request has already finished. // nothing, the endBlock request has already finished.
return; return;
} }
@ -221,8 +234,21 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// disable further write, and fail all pending ack. // disable further write, and fail all pending ack.
state = State.BROKEN; state = State.BROKEN;
Throwable error = errorSupplier.get(); Throwable error = errorSupplier.get();
waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error)); for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
waitingAckQueue.clear(); Callback c = iter.next();
// find the first sync request which we have not acked yet and fail all the request after it.
if (!c.unfinishedReplicas.contains(channel.id())) {
continue;
}
for (;;) {
c.future.completeExceptionally(error);
if (!iter.hasNext()) {
break;
}
c = iter.next();
}
break;
}
datanodeList.forEach(ch -> ch.close()); datanodeList.forEach(ch -> ch.close());
} }
@ -239,13 +265,13 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
Status reply = getStatus(ack); Status reply = getStatus(ack);
if (reply != Status.SUCCESS) { if (reply != Status.SUCCESS) {
failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " +
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress())); block + " from datanode " + ctx.channel().remoteAddress()));
return; return;
} }
if (PipelineAck.isRestartOOBStatus(reply)) { if (PipelineAck.isRestartOOBStatus(reply)) {
failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " +
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress())); block + " from datanode " + ctx.channel().remoteAddress()));
return; return;
} }
if (ack.getSeqno() == HEART_BEAT_SEQNO) { if (ack.getSeqno() == HEART_BEAT_SEQNO) {
@ -256,6 +282,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (state == State.CLOSED) {
return;
}
failed(ctx.channel(), failed(ctx.channel(),
() -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
} }
@ -299,8 +328,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop, LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) { DataChecksum summer, ByteBufAllocator alloc) {
this.conf = conf; this.conf = conf;
this.fsUtils = fsUtils; this.fsUtils = fsUtils;
this.dfs = dfs; this.dfs = dfs;
@ -309,81 +338,53 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.fileId = fileId; this.fileId = fileId;
this.clientName = clientName; this.clientName = clientName;
this.src = src; this.src = src;
this.locatedBlock = locatedBlock; this.block = locatedBlock.getBlock();
this.locations = locatedBlock.getLocations();
this.encryptor = encryptor; this.encryptor = encryptor;
this.eventLoop = eventLoop;
this.datanodeList = datanodeList; this.datanodeList = datanodeList;
this.summer = summer; this.summer = summer;
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
this.alloc = alloc; this.alloc = alloc;
this.buf = alloc.directBuffer(capacity); this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
this.state = State.STREAMING; this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
} }
private void writeInt0(int i) { @Override
public void writeInt(int i) {
buf.ensureWritable(4); buf.ensureWritable(4);
buf.writeInt(i); buf.writeInt(i);
} }
@Override @Override
public void writeInt(int i) { public void write(ByteBuffer bb) {
if (eventLoop.inEventLoop()) {
writeInt0(i);
} else {
eventLoop.submit(() -> writeInt0(i));
}
}
private void write0(ByteBuffer bb) {
buf.ensureWritable(bb.remaining()); buf.ensureWritable(bb.remaining());
buf.writeBytes(bb); buf.writeBytes(bb);
} }
@Override
public void write(ByteBuffer bb) {
if (eventLoop.inEventLoop()) {
write0(bb);
} else {
eventLoop.submit(() -> write0(bb));
}
}
@Override @Override
public void write(byte[] b) { public void write(byte[] b) {
write(b, 0, b.length); write(b, 0, b.length);
} }
private void write0(byte[] b, int off, int len) { @Override
public void write(byte[] b, int off, int len) {
buf.ensureWritable(len); buf.ensureWritable(len);
buf.writeBytes(b, off, len); buf.writeBytes(b, off, len);
} }
@Override
public void write(byte[] b, int off, int len) {
if (eventLoop.inEventLoop()) {
write0(b, off, len);
} else {
eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly();
}
}
@Override @Override
public int buffered() { public int buffered() {
if (eventLoop.inEventLoop()) { return buf.readableBytes();
return buf.readableBytes();
} else {
return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue();
}
} }
@Override @Override
public DatanodeInfo[] getPipeline() { public DatanodeInfo[] getPipeline() {
return locatedBlock.getLocations(); return locations;
} }
private Promise<Void> flushBuffer(ByteBuf dataBuf, long nextPacketOffsetInBlock, private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
boolean syncBlock) { long nextPacketOffsetInBlock, boolean syncBlock) {
int dataLen = dataBuf.readableBytes(); int dataLen = dataBuf.readableBytes();
int chunkLen = summer.getBytesPerChecksum(); int chunkLen = summer.getBytesPerChecksum();
int trailingPartialChunkLen = dataLen % chunkLen; int trailingPartialChunkLen = dataLen % chunkLen;
@ -398,24 +399,24 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
ByteBuf headerBuf = alloc.buffer(headerLen); ByteBuf headerBuf = alloc.buffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen); headerBuf.writerIndex(headerLen);
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
long ackedLength = nextPacketOffsetInBlock + dataLen; waitingAckQueue.addLast(c);
Promise<Void> promise = eventLoop.<Void> newPromise().addListener(future -> { // recheck again after we pushed the callback to queue
if (future.isSuccess()) { if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
locatedBlock.getBlock().setNumBytes(ackedLength); future.completeExceptionally(new IOException("stream already broken"));
} // it's the one we have just pushed or just a no-op
}); waitingAckQueue.removeFirst();
waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList)); return;
for (Channel ch : datanodeList) {
ch.write(headerBuf.duplicate().retain());
ch.write(checksumBuf.duplicate().retain());
ch.writeAndFlush(dataBuf.duplicate().retain());
} }
datanodeList.forEach(ch -> {
ch.write(headerBuf.retainedDuplicate());
ch.write(checksumBuf.retainedDuplicate());
ch.writeAndFlush(dataBuf.retainedDuplicate());
});
checksumBuf.release(); checksumBuf.release();
headerBuf.release(); headerBuf.release();
dataBuf.release(); dataBuf.release();
nextPacketSeqno++; nextPacketSeqno++;
return promise;
} }
private void flush0(CompletableFuture<Long> future, boolean syncBlock) { private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
@ -424,11 +425,43 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
return; return;
} }
int dataLen = buf.readableBytes(); int dataLen = buf.readableBytes();
if (dataLen == trailingPartialChunkLength) {
// no new data
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
Callback lastFlush = waitingAckQueue.peekLast();
if (lastFlush != null) {
Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
waitingAckQueue.addLast(c);
// recheck here if we have already removed the previous callback from the queue
if (waitingAckQueue.peekFirst() == c) {
// all previous callbacks have been removed
// notice that this does mean we will always win here because the background thread may
// have already started to mark the future here as completed in the completed or failed
// methods but haven't removed it from the queue yet. That's also why the removeFirst
// call below may be a no-op.
if (state != State.STREAMING) {
future.completeExceptionally(new IOException("stream already broken"));
} else {
future.complete(lengthAfterFlush);
}
// it's the one we have just pushed or just a no-op
waitingAckQueue.removeFirst();
}
} else {
// we must have acked all the data so the ackedBlockLength must be same with
// lengthAfterFlush
future.complete(lengthAfterFlush);
}
return;
}
if (encryptor != null) { if (encryptor != null) {
ByteBuf encryptBuf = alloc.directBuffer(dataLen); ByteBuf encryptBuf = alloc.directBuffer(dataLen);
buf.readBytes(encryptBuf, trailingPartialChunkLength);
int toEncryptLength = dataLen - trailingPartialChunkLength;
try { try {
encryptor.encrypt(buf.nioBuffer(buf.readerIndex(), dataLen), encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
encryptBuf.nioBuffer(0, dataLen)); encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
} catch (IOException e) { } catch (IOException e) {
encryptBuf.release(); encryptBuf.release();
future.completeExceptionally(e); future.completeExceptionally(e);
@ -438,56 +471,35 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
buf.release(); buf.release();
buf = encryptBuf; buf = encryptBuf;
} }
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) {
// no new data, just return
future.complete(locatedBlock.getBlock().getNumBytes());
return;
}
Callback c = waitingAckQueue.peekLast();
if (c != null && lengthAfterFlush == c.ackedLength) {
// just append it to the tail of waiting ack queue,, do not issue new hflush request.
waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(f -> {
if (f.isSuccess()) {
future.complete(lengthAfterFlush);
} else {
future.completeExceptionally(f.cause());
}
}), lengthAfterFlush, Collections.<Channel> emptyList()));
return;
}
Promise<Void> promise;
if (dataLen > maxDataLen) { if (dataLen > maxDataLen) {
// We need to write out the data by multiple packets as the max packet allowed is 16M. // We need to write out the data by multiple packets as the max packet allowed is 16M.
PromiseCombiner combiner = new PromiseCombiner();
long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
for (int remaining = dataLen; remaining > 0;) { for (int remaining = dataLen;;) {
int toWriteDataLen = Math.min(remaining, maxDataLen); if (remaining < maxDataLen) {
combiner.add((Future<Void>) flushBuffer(buf.readRetainedSlice(toWriteDataLen), flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
nextSubPacketOffsetInBlock, syncBlock)); syncBlock);
nextSubPacketOffsetInBlock += toWriteDataLen; break;
remaining -= toWriteDataLen; } else {
flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
nextSubPacketOffsetInBlock, syncBlock);
remaining -= maxDataLen;
nextSubPacketOffsetInBlock += maxDataLen;
}
} }
promise = eventLoop.newPromise();
combiner.finish(promise);
} else { } else {
promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock); flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
} }
promise.addListener(f -> { trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
if (f.isSuccess()) { ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
future.complete(lengthAfterFlush); .ensureWritable(trailingPartialChunkLength);
} else { if (trailingPartialChunkLength != 0) {
future.completeExceptionally(f.cause()); buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
} trailingPartialChunkLength);
});
int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
ByteBuf newBuf = alloc.directBuffer(guess(dataLen)).ensureWritable(trailingPartialChunkLen);
if (trailingPartialChunkLen != 0) {
buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen);
} }
buf.release(); buf.release();
this.buf = newBuf; this.buf = newBuf;
nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen; nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
} }
/** /**
@ -497,34 +509,38 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
*/ */
public CompletableFuture<Long> flush(boolean syncBlock) { public CompletableFuture<Long> flush(boolean syncBlock) {
CompletableFuture<Long> future = new CompletableFuture<>(); CompletableFuture<Long> future = new CompletableFuture<>();
if (eventLoop.inEventLoop()) { flush0(future, syncBlock);
flush0(future, syncBlock);
} else {
eventLoop.execute(() -> flush0(future, syncBlock));
}
return future; return future;
} }
private void endBlock(Promise<Void> promise, long size) { private void endBlock() throws IOException {
Preconditions.checkState(waitingAckQueue.isEmpty(),
"should call flush first before calling close");
if (state != State.STREAMING) { if (state != State.STREAMING) {
promise.tryFailure(new IOException("stream already broken")); throw new IOException("stream already broken");
return;
}
if (!waitingAckQueue.isEmpty()) {
promise.tryFailure(new IllegalStateException("should call flush first before calling close"));
return;
} }
state = State.CLOSING; state = State.CLOSING;
PacketHeader header = new PacketHeader(4, size, nextPacketSeqno, true, 0, false); long finalizedLength = ackedBlockLength;
PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
buf.release(); buf.release();
buf = null; buf = null;
int headerLen = header.getSerializedSize(); int headerLen = header.getSerializedSize();
ByteBuf headerBuf = alloc.directBuffer(headerLen); ByteBuf headerBuf = alloc.directBuffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen); headerBuf.writerIndex(headerLen);
waitingAckQueue.add(new Callback(promise, size, datanodeList)); CompletableFuture<Long> future = new CompletableFuture<>();
datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.duplicate().retain())); waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
headerBuf.release(); headerBuf.release();
try {
future.get();
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
throw new IOException(cause);
}
} }
/** /**
@ -532,7 +548,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
*/ */
@Override @Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException { public void recoverAndClose(CancelableProgressable reporter) throws IOException {
assert !eventLoop.inEventLoop(); datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId); endFileLease(client, fileId);
fsUtils.recoverFileLease(dfs, new Path(src), conf, fsUtils.recoverFileLease(dfs, new Path(src), conf,
@ -545,32 +561,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
assert !eventLoop.inEventLoop(); endBlock();
Promise<Void> promise = eventLoop.newPromise(); state = State.CLOSED;
eventLoop.execute(() -> endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes())); datanodeList.forEach(ch -> ch.close());
promise.addListener(f -> datanodeList.forEach(ch -> ch.close())).syncUninterruptibly();
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId); block.setNumBytes(ackedBlockLength);
} completeFile(client, namenode, src, clientName, block, fileId);
@VisibleForTesting
int guess(int bytesWritten) {
// if the bytesWritten is greater than the current capacity
// always increase the capacity in powers of 2.
if (bytesWritten > this.capacity) {
// Ensure we don't cross the LIMIT
if ((this.capacity << 1) <= LIMIT) {
// increase the capacity in the range of power of 2
this.capacity = this.capacity << 1;
}
} else {
// if we see that the bytesWritten is lesser we could again decrease
// the capacity by dividing it by 2 if the bytesWritten is satisfied by
// that reduction
if ((this.capacity >> 1) >= bytesWritten) {
this.capacity = this.capacity >> 1;
}
}
return this.capacity;
} }
} }

View File

@ -17,43 +17,19 @@
*/ */
package org.apache.hadoop.hbase.io.asyncfs; package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
import com.google.protobuf.CodedOutputStream; import com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufOutputStream;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFuture;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFutureListener;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInitializer;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelPipeline;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.FutureListener;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
@ -73,7 +49,6 @@ import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -109,6 +84,32 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufOutputStream;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFuture;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFutureListener;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInitializer;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelPipeline;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.FutureListener;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
/** /**
* Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
@ -664,10 +665,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}); });
} }
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, private static List<Future<Channel>> connectToDataNodes(Configuration conf,
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd,
BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop, long latestGS, BlockConstructionStage stage, DataChecksum summer,
Class<? extends Channel> channelClass) { EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname = boolean connectToDnViaHostname =
@ -690,10 +691,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
for (int i = 0; i < datanodeInfos.length; i++) { for (int i = 0; i < datanodeInfos.length; i++) {
DatanodeInfo dnInfo = datanodeInfos[i]; DatanodeInfo dnInfo = datanodeInfos[i];
Enum<?> storageType = storageTypes[i]; Enum<?> storageType = storageTypes[i];
Promise<Channel> promise = eventLoop.newPromise(); Promise<Channel> promise = eventLoopGroup.next().newPromise();
futureList.add(promise); futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
new Bootstrap().group(eventLoop).channel(channelClass) new Bootstrap().group(eventLoopGroup).channel(channelClass)
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
@Override @Override
@ -732,7 +733,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException { EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
Configuration conf = dfs.getConf(); Configuration conf = dfs.getConf();
FSUtils fsUtils = FSUtils.getInstance(dfs, conf); FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
DFSClient client = dfs.getClient(); DFSClient client = dfs.getClient();
@ -761,7 +762,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
stat.getFileId(), null); stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>(); List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass); PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (Future<Channel> future : futureList) { for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper // fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed. // layer should retry itself if needed.
@ -770,7 +771,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
Encryptor encryptor = createEncryptor(conf, stat, client); Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src, new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, eventLoop, datanodeList, summer, ALLOC); stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
succ = true; succ = true;
return output; return output;
} finally { } finally {
@ -796,19 +797,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
/** /**
* Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
* inside {@link EventLoop}. * inside an {@link EventLoop}.
* @param eventLoop all connections to datanode will use the same event loop.
*/ */
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException { EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override @Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p) public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoop, channelClass); blockSize, eventLoopGroup, channelClass);
} }
@Override @Override

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used to predict the next send buffer size.
*/
@InterfaceAudience.Private
class SendBufSizePredictor {
// LIMIT is 128MB
private static final int LIMIT = 128 * 1024 * 1024;
// buf's initial capacity - 4KB
private int capacity = 4 * 1024;
int initialSize() {
return capacity;
}
int guess(int bytesWritten) {
// if the bytesWritten is greater than the current capacity
// always increase the capacity in powers of 2.
if (bytesWritten > this.capacity) {
// Ensure we don't cross the LIMIT
if ((this.capacity << 1) <= LIMIT) {
// increase the capacity in the range of power of 2
this.capacity = this.capacity << 1;
}
} else {
// if we see that the bytesWritten is lesser we could again decrease
// the capacity by dividing it by 2 if the bytesWritten is satisfied by
// that reduction
if ((this.capacity >> 1) >= bytesWritten) {
this.capacity = this.capacity >> 1;
}
}
return this.capacity;
}
}

View File

@ -17,15 +17,6 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -33,13 +24,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory;
/** /**
* A WAL provider that use {@link AsyncFSWAL}. * A WAL provider that use {@link AsyncFSWAL}.

View File

@ -24,17 +24,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -56,6 +54,12 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
@Category({ MiscTests.class, MediumTests.class }) @Category({ MiscTests.class, MediumTests.class })
public class TestFanOutOneBlockAsyncDFSOutput { public class TestFanOutOneBlockAsyncDFSOutput {
@ -97,9 +101,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
// will fail. // will fail.
for (;;) { for (;;) {
try { try {
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, FanOutOneBlockAsyncDFSOutput out =
new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(), FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
out.close(); out.close();
break; break;
} catch (IOException e) { } catch (IOException e) {
@ -111,17 +115,32 @@ public class TestFanOutOneBlockAsyncDFSOutput {
static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f, static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
final FanOutOneBlockAsyncDFSOutput out) final FanOutOneBlockAsyncDFSOutput out)
throws IOException, InterruptedException, ExecutionException { throws IOException, InterruptedException, ExecutionException {
final byte[] b = new byte[10]; List<CompletableFuture<Long>> futures = new ArrayList<>();
ThreadLocalRandom.current().nextBytes(b); byte[] b = new byte[10];
out.write(b, 0, b.length); Random rand = new Random(12345);
assertEquals(b.length, out.flush(false).get().longValue()); // test pipelined flush
out.close(); for (int i = 0; i < 10; i++) {
assertEquals(b.length, dfs.getFileStatus(f).getLen()); rand.nextBytes(b);
byte[] actual = new byte[b.length]; out.write(b);
try (FSDataInputStream in = dfs.open(f)) { futures.add(out.flush(false));
in.readFully(actual); futures.add(out.flush(false));
}
for (int i = 0; i < 10; i++) {
assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
}
out.close();
assertEquals(b.length * 10, dfs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
rand.setSeed(12345);
try (FSDataInputStream in = dfs.open(f)) {
for (int i = 0; i < 10; i++) {
in.readFully(actual);
rand.nextBytes(b);
assertArrayEquals(b, actual);
}
assertEquals(-1, in.read());
} }
assertArrayEquals(b, actual);
} }
@Test @Test
@ -133,21 +152,6 @@ public class TestFanOutOneBlockAsyncDFSOutput {
writeAndVerify(eventLoop, FS, f, out); writeAndVerify(eventLoop, FS, f, out);
} }
@Test
public void testMaxByteBufAllocated() throws Exception {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
out.guess(5 * 1024);
assertEquals(8 * 1024, out.guess(5 * 1024));
assertEquals(16 * 1024, out.guess(10 * 1024));
// it wont reduce directly to 4KB
assertEquals(8 * 1024, out.guess(4 * 1024));
// This time it will reduece
assertEquals(4 * 1024, out.guess(4 * 1024));
}
@Test @Test
public void testRecover() throws IOException, InterruptedException, ExecutionException { public void testRecover() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
@ -216,7 +220,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
xceiverServerDaemonField.setAccessible(true); xceiverServerDaemonField.setAccessible(true);
Class<?> xceiverServerClass = Class<?> xceiverServerClass =
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
numPeersMethod.setAccessible(true); numPeersMethod.setAccessible(true);
// make one datanode broken // make one datanode broken

View File

@ -61,7 +61,7 @@ public class TestLocalAsyncOutput {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), CHANNEL_CLASS); fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS);
byte[] b = new byte[10]; byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b); ThreadLocalRandom.current().nextBytes(b);
out.write(b); out.write(b);

View File

@ -31,12 +31,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
@ -75,6 +69,12 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@Category({ MiscTests.class, LargeTests.class }) @Category({ MiscTests.class, LargeTests.class })
public class TestSaslFanOutOneBlockAsyncDFSOutput { public class TestSaslFanOutOneBlockAsyncDFSOutput {
@ -90,7 +90,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static int READ_TIMEOUT_MS = 200000; private static int READ_TIMEOUT_MS = 200000;
private static final File KEYTAB_FILE = private static final File KEYTAB_FILE =
new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC; private static MiniKdc KDC;
@ -104,8 +104,6 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static String TEST_KEY_NAME = "test_key"; private static String TEST_KEY_NAME = "test_key";
private static boolean TEST_TRANSPARENT_ENCRYPTION = true;
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@ -118,20 +116,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
@Parameter(2) @Parameter(2)
public String cipherSuite; public String cipherSuite;
@Parameter(3) @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
public boolean useTransparentEncryption;
@Parameters(
name = "{index}: protection={0}, encryption={1}, cipherSuite={2}, transparent_enc={3}")
public static Iterable<Object[]> data() { public static Iterable<Object[]> data() {
List<Object[]> params = new ArrayList<>(); List<Object[]> params = new ArrayList<>();
for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) { for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
for (boolean useTransparentEncryption : Arrays.asList(false, true)) { params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite,
useTransparentEncryption });
}
} }
} }
} }
@ -159,7 +150,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static void setUpKeyProvider(Configuration conf) throws Exception { private static void setUpKeyProvider(Configuration conf) throws Exception {
URI keyProviderUri = URI keyProviderUri =
new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString()); new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
@ -197,13 +188,12 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private Path testDirOnTestFs; private Path testDirOnTestFs;
private Path entryptionTestDirOnTestFs;
private void createEncryptionZone() throws Exception { private void createEncryptionZone() throws Exception {
if (!TEST_TRANSPARENT_ENCRYPTION) {
return;
}
Method method = Method method =
DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class); DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
method.invoke(FS, testDirOnTestFs, TEST_KEY_NAME); method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME);
} }
@Before @Before
@ -225,13 +215,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
} }
TEST_UTIL.startMiniDFSCluster(1); TEST_UTIL.startMiniDFSCluster(3);
FS = TEST_UTIL.getDFSCluster().getFileSystem(); FS = TEST_UTIL.getDFSCluster().getFileSystem();
testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
FS.mkdirs(testDirOnTestFs); FS.mkdirs(testDirOnTestFs);
if (useTransparentEncryption) { entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
createEncryptionZone(); FS.mkdirs(entryptionTestDirOnTestFs);
} createEncryptionZone();
} }
@After @After
@ -243,12 +233,20 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
return new Path(testDirOnTestFs, "test"); return new Path(testDirOnTestFs, "test");
} }
private Path getEncryptionTestFile() {
return new Path(entryptionTestDirOnTestFs, "test");
}
private void test(Path file) throws IOException, InterruptedException, ExecutionException {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, file, out);
}
@Test @Test
public void test() throws IOException, InterruptedException, ExecutionException { public void test() throws IOException, InterruptedException, ExecutionException {
Path f = getTestFile(); test(getTestFile());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); test(getEncryptionTestFile());
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 1, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out);
} }
} }

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.*;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
*
*/
@Category({ MiscTests.class, SmallTests.class })
public class TestSendBufSizePredictor {
@Test
public void test() {
SendBufSizePredictor p = new SendBufSizePredictor();
assertEquals(8 * 1024, p.guess(5 * 1024));
assertEquals(8 * 1024, p.guess(5 * 1024));
assertEquals(16 * 1024, p.guess(10 * 1024));
// it wont reduce directly to 4KB
assertEquals(8 * 1024, p.guess(4 * 1024));
// This time it will reduece
assertEquals(4 * 1024, p.guess(4 * 1024));
}
}