+ * The implementation is not required to be thread safe.
*/
@InterfaceAudience.Private
public interface AsyncFSOutput extends Closeable {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 1f5462f921f..6a7e4fa0b32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -17,12 +17,6 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
@@ -35,12 +29,17 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
/**
* Helper class for creating AsyncFSOutput.
@@ -56,12 +55,12 @@ public final class AsyncFSOutputHelper {
* implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
- boolean createParent, short replication, long blockSize, EventLoop eventLoop,
+ boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
Class extends Channel> channelClass)
- throws IOException, CommonFSUtils.StreamLacksCapabilityException {
+ throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
- overwrite, createParent, replication, blockSize, eventLoop, channelClass);
+ overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
}
final FSDataOutputStream fsOut;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
@@ -75,23 +74,19 @@ public final class AsyncFSOutputHelper {
// ensure that we can provide the level of data safety we're configured
// to provide.
if (!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
- CommonFSUtils.hasCapability(fsOut, "hsync"))) {
+ CommonFSUtils.hasCapability(fsOut, "hsync"))) {
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
}
final ExecutorService flushExecutor =
- Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
+ Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
return new AsyncFSOutput() {
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
@Override
- public void write(final byte[] b, final int off, final int len) {
- if (eventLoop.inEventLoop()) {
- out.write(b, off, len);
- } else {
- eventLoop.submit(() -> out.write(b, off, len)).syncUninterruptibly();
- }
+ public void write(byte[] b, int off, int len) {
+ out.write(b, off, len);
}
@Override
@@ -99,6 +94,16 @@ public final class AsyncFSOutputHelper {
write(b, 0, b.length);
}
+ @Override
+ public void writeInt(int i) {
+ out.writeInt(i);
+ }
+
+ @Override
+ public void write(ByteBuffer bb) {
+ out.write(bb, bb.position(), bb.remaining());
+ }
+
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
fsOut.close();
@@ -116,7 +121,7 @@ public final class AsyncFSOutputHelper {
out.reset();
}
} catch (IOException e) {
- eventLoop.execute(() -> future.completeExceptionally(e));
+ eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
return;
}
try {
@@ -126,9 +131,9 @@ public final class AsyncFSOutputHelper {
fsOut.hflush();
}
long pos = fsOut.getPos();
- eventLoop.execute(() -> future.complete(pos));
+ eventLoopGroup.next().execute(() -> future.complete(pos));
} catch (IOException e) {
- eventLoop.execute(() -> future.completeExceptionally(e));
+ eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
}
}
@@ -164,16 +169,6 @@ public final class AsyncFSOutputHelper {
public int buffered() {
return out.size();
}
-
- @Override
- public void writeInt(int i) {
- out.writeInt(i);
- }
-
- @Override
- public void write(ByteBuffer bb) {
- out.write(bb, bb.position(), bb.remaining());
- }
};
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 3daf15b2990..91086d77196 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -17,48 +17,33 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
-import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
-import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
+import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
+import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler.Sharable;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.PromiseCombiner;
-
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
-import java.util.Deque;
-import java.util.IdentityHashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -66,14 +51,28 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler.Sharable;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelId;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
/**
* An asynchronous HDFS output stream implementation which fans out data to datanode and only
@@ -81,21 +80,16 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
*
* Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
* usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
- * method. And we place it here under util package because we want to make it independent of WAL
+ * method. And we place it here under io package because we want to make it independent of WAL
* implementation thus easier to move it to HDFS project finally.
*
- * Note that, all connections to datanode will run in the same {@link EventLoop} which means we only
- * need one thread here. But be careful, we do some blocking operations in {@link #close()} and
- * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside
- * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)},
- * {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop},
- * there will be an extra context-switch.
+ * Note that, although we support pipelined flush, i.e, write new data and then flush before the
+ * previous flush succeeds, the implementation is not thread safe, so you should not call its
+ * methods concurrently.
*
* Advantages compare to DFSOutputStream:
*
*
The fan out mechanism. This will reduce the latency.
- *
The asynchronous WAL could also run in the same EventLoop, we could just call write and flush
- * inside the EventLoop thread, so generally we only have one thread to do all the things.
*
Fail-fast when connection to datanode error. The WAL implementation could open new writer
* ASAP.
*
We could benefit from netty's ByteBuf management mechanism.
@@ -124,12 +118,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private final long fileId;
- private final LocatedBlock locatedBlock;
+ private final ExtendedBlock block;
+
+ private final DatanodeInfo[] locations;
private final Encryptor encryptor;
- private final EventLoop eventLoop;
-
private final List datanodeList;
private final DataChecksum summer;
@@ -140,65 +134,81 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private static final class Callback {
- private final Promise promise;
+ private final CompletableFuture future;
private final long ackedLength;
- private final Set unfinishedReplicas;
+ // should be backed by a thread safe collection
+ private final Set unfinishedReplicas;
- public Callback(Promise promise, long ackedLength, Collection replicas) {
- this.promise = promise;
+ public Callback(CompletableFuture future, long ackedLength,
+ Collection replicas) {
+ this.future = future;
this.ackedLength = ackedLength;
if (replicas.isEmpty()) {
this.unfinishedReplicas = Collections.emptySet();
} else {
this.unfinishedReplicas =
- Collections.newSetFromMap(new IdentityHashMap(replicas.size()));
- this.unfinishedReplicas.addAll(replicas);
+ Collections.newSetFromMap(new ConcurrentHashMap(replicas.size()));
+ replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
}
}
}
- private final Deque waitingAckQueue = new ArrayDeque<>();
+ private final ConcurrentLinkedDeque waitingAckQueue = new ConcurrentLinkedDeque<>();
+
+ private volatile long ackedBlockLength = 0L;
// this could be different from acked block length because a packet can not start at the middle of
// a chunk.
private long nextPacketOffsetInBlock = 0L;
+ // the length of the trailing partial chunk, this is because the packet start offset must be
+ // aligned with the length of checksum chunk so we need to resend the same data.
+ private int trailingPartialChunkLength = 0;
+
private long nextPacketSeqno = 0L;
private ByteBuf buf;
- // buf's initial capacity - 4KB
- private int capacity = 4 * 1024;
- // LIMIT is 128MB
- private static final int LIMIT = 128 * 1024 * 1024;
+ private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
+ // State for connections to DN
private enum State {
STREAMING, CLOSING, BROKEN, CLOSED
}
- private State state;
+ private volatile State state;
+ // all lock-free to make it run faster
private void completed(Channel channel) {
- if (waitingAckQueue.isEmpty()) {
- return;
- }
- for (Callback c : waitingAckQueue) {
- if (c.unfinishedReplicas.remove(channel)) {
+ for (Iterator iter = waitingAckQueue.iterator(); iter.hasNext();) {
+ Callback c = iter.next();
+ // if the current unfinished replicas does not contain us then it means that we have already
+ // acked this one, let's iterate to find the one we have not acked yet.
+ if (c.unfinishedReplicas.remove(channel.id())) {
if (c.unfinishedReplicas.isEmpty()) {
- c.promise.trySuccess(null);
- // since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas
- // is empty, so this could only happen at the head of waitingAckQueue, so we just call
- // removeFirst here.
- waitingAckQueue.removeFirst();
- // also wake up flush requests which have the same length.
- for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) {
- if (cb.ackedLength == c.ackedLength) {
- cb.promise.trySuccess(null);
- waitingAckQueue.removeFirst();
- } else {
- break;
+ // we need to remove first before complete the future. It is possible that after we
+ // complete the future the upper layer will call close immediately before we remove the
+ // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
+ // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
+ // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
+ // removing the entry returned by calling previous next, so if the entry has already been
+ // removed then it is a no-op, and for the assign, the values are the same so no problem.
+ iter.remove();
+ ackedBlockLength = c.ackedLength;
+ // the future.complete check is to confirm that we are the only one who grabbed the work,
+ // otherwise just give up and return.
+ if (c.future.complete(c.ackedLength)) {
+ // also wake up flush requests which have the same length.
+ while (iter.hasNext()) {
+ Callback maybeDummyCb = iter.next();
+ if (maybeDummyCb.ackedLength == c.ackedLength) {
+ iter.remove();
+ maybeDummyCb.future.complete(c.ackedLength);
+ } else {
+ break;
+ }
}
}
}
@@ -207,13 +217,16 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
}
- private void failed(Channel channel, Supplier errorSupplier) {
+ // this usually does not happen which means it is not on the critical path so make it synchronized
+ // so that the implementation will not burn up our brain as there are multiple state changes and
+ // checks.
+ private synchronized void failed(Channel channel, Supplier errorSupplier) {
if (state == State.BROKEN || state == State.CLOSED) {
return;
}
if (state == State.CLOSING) {
Callback c = waitingAckQueue.peekFirst();
- if (c == null || !c.unfinishedReplicas.contains(channel)) {
+ if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
// nothing, the endBlock request has already finished.
return;
}
@@ -221,8 +234,21 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// disable further write, and fail all pending ack.
state = State.BROKEN;
Throwable error = errorSupplier.get();
- waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error));
- waitingAckQueue.clear();
+ for (Iterator iter = waitingAckQueue.iterator(); iter.hasNext();) {
+ Callback c = iter.next();
+ // find the first sync request which we have not acked yet and fail all the request after it.
+ if (!c.unfinishedReplicas.contains(channel.id())) {
+ continue;
+ }
+ for (;;) {
+ c.future.completeExceptionally(error);
+ if (!iter.hasNext()) {
+ break;
+ }
+ c = iter.next();
+ }
+ break;
+ }
datanodeList.forEach(ch -> ch.close());
}
@@ -239,13 +265,13 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
Status reply = getStatus(ack);
if (reply != Status.SUCCESS) {
- failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block "
- + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
+ failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " +
+ block + " from datanode " + ctx.channel().remoteAddress()));
return;
}
if (PipelineAck.isRestartOOBStatus(reply)) {
- failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
- + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
+ failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " +
+ block + " from datanode " + ctx.channel().remoteAddress()));
return;
}
if (ack.getSeqno() == HEART_BEAT_SEQNO) {
@@ -256,6 +282,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ if (state == State.CLOSED) {
+ return;
+ }
failed(ctx.channel(),
() -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
}
@@ -299,8 +328,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
- LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop,
- List datanodeList, DataChecksum summer, ByteBufAllocator alloc) {
+ LocatedBlock locatedBlock, Encryptor encryptor, List datanodeList,
+ DataChecksum summer, ByteBufAllocator alloc) {
this.conf = conf;
this.fsUtils = fsUtils;
this.dfs = dfs;
@@ -309,81 +338,53 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.fileId = fileId;
this.clientName = clientName;
this.src = src;
- this.locatedBlock = locatedBlock;
+ this.block = locatedBlock.getBlock();
+ this.locations = locatedBlock.getLocations();
this.encryptor = encryptor;
- this.eventLoop = eventLoop;
this.datanodeList = datanodeList;
this.summer = summer;
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
this.alloc = alloc;
- this.buf = alloc.directBuffer(capacity);
+ this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
}
- private void writeInt0(int i) {
+ @Override
+ public void writeInt(int i) {
buf.ensureWritable(4);
buf.writeInt(i);
}
@Override
- public void writeInt(int i) {
- if (eventLoop.inEventLoop()) {
- writeInt0(i);
- } else {
- eventLoop.submit(() -> writeInt0(i));
- }
- }
-
- private void write0(ByteBuffer bb) {
+ public void write(ByteBuffer bb) {
buf.ensureWritable(bb.remaining());
buf.writeBytes(bb);
}
- @Override
- public void write(ByteBuffer bb) {
- if (eventLoop.inEventLoop()) {
- write0(bb);
- } else {
- eventLoop.submit(() -> write0(bb));
- }
- }
-
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
- private void write0(byte[] b, int off, int len) {
+ @Override
+ public void write(byte[] b, int off, int len) {
buf.ensureWritable(len);
buf.writeBytes(b, off, len);
}
- @Override
- public void write(byte[] b, int off, int len) {
- if (eventLoop.inEventLoop()) {
- write0(b, off, len);
- } else {
- eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly();
- }
- }
-
@Override
public int buffered() {
- if (eventLoop.inEventLoop()) {
- return buf.readableBytes();
- } else {
- return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue();
- }
+ return buf.readableBytes();
}
@Override
public DatanodeInfo[] getPipeline() {
- return locatedBlock.getLocations();
+ return locations;
}
- private Promise flushBuffer(ByteBuf dataBuf, long nextPacketOffsetInBlock,
- boolean syncBlock) {
+ private void flushBuffer(CompletableFuture future, ByteBuf dataBuf,
+ long nextPacketOffsetInBlock, boolean syncBlock) {
int dataLen = dataBuf.readableBytes();
int chunkLen = summer.getBytesPerChecksum();
int trailingPartialChunkLen = dataLen % chunkLen;
@@ -398,24 +399,24 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
ByteBuf headerBuf = alloc.buffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
-
- long ackedLength = nextPacketOffsetInBlock + dataLen;
- Promise promise = eventLoop. newPromise().addListener(future -> {
- if (future.isSuccess()) {
- locatedBlock.getBlock().setNumBytes(ackedLength);
- }
- });
- waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList));
- for (Channel ch : datanodeList) {
- ch.write(headerBuf.duplicate().retain());
- ch.write(checksumBuf.duplicate().retain());
- ch.writeAndFlush(dataBuf.duplicate().retain());
+ Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
+ waitingAckQueue.addLast(c);
+ // recheck again after we pushed the callback to queue
+ if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
+ future.completeExceptionally(new IOException("stream already broken"));
+ // it's the one we have just pushed or just a no-op
+ waitingAckQueue.removeFirst();
+ return;
}
+ datanodeList.forEach(ch -> {
+ ch.write(headerBuf.retainedDuplicate());
+ ch.write(checksumBuf.retainedDuplicate());
+ ch.writeAndFlush(dataBuf.retainedDuplicate());
+ });
checksumBuf.release();
headerBuf.release();
dataBuf.release();
nextPacketSeqno++;
- return promise;
}
private void flush0(CompletableFuture future, boolean syncBlock) {
@@ -424,11 +425,43 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
return;
}
int dataLen = buf.readableBytes();
+ if (dataLen == trailingPartialChunkLength) {
+ // no new data
+ long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
+ Callback lastFlush = waitingAckQueue.peekLast();
+ if (lastFlush != null) {
+ Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
+ waitingAckQueue.addLast(c);
+ // recheck here if we have already removed the previous callback from the queue
+ if (waitingAckQueue.peekFirst() == c) {
+ // all previous callbacks have been removed
+ // notice that this does mean we will always win here because the background thread may
+ // have already started to mark the future here as completed in the completed or failed
+ // methods but haven't removed it from the queue yet. That's also why the removeFirst
+ // call below may be a no-op.
+ if (state != State.STREAMING) {
+ future.completeExceptionally(new IOException("stream already broken"));
+ } else {
+ future.complete(lengthAfterFlush);
+ }
+ // it's the one we have just pushed or just a no-op
+ waitingAckQueue.removeFirst();
+ }
+ } else {
+ // we must have acked all the data so the ackedBlockLength must be same with
+ // lengthAfterFlush
+ future.complete(lengthAfterFlush);
+ }
+ return;
+ }
+
if (encryptor != null) {
ByteBuf encryptBuf = alloc.directBuffer(dataLen);
+ buf.readBytes(encryptBuf, trailingPartialChunkLength);
+ int toEncryptLength = dataLen - trailingPartialChunkLength;
try {
- encryptor.encrypt(buf.nioBuffer(buf.readerIndex(), dataLen),
- encryptBuf.nioBuffer(0, dataLen));
+ encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
+ encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
} catch (IOException e) {
encryptBuf.release();
future.completeExceptionally(e);
@@ -438,56 +471,35 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
buf.release();
buf = encryptBuf;
}
- long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
- if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) {
- // no new data, just return
- future.complete(locatedBlock.getBlock().getNumBytes());
- return;
- }
- Callback c = waitingAckQueue.peekLast();
- if (c != null && lengthAfterFlush == c.ackedLength) {
- // just append it to the tail of waiting ack queue,, do not issue new hflush request.
- waitingAckQueue.addLast(new Callback(eventLoop. newPromise().addListener(f -> {
- if (f.isSuccess()) {
- future.complete(lengthAfterFlush);
- } else {
- future.completeExceptionally(f.cause());
- }
- }), lengthAfterFlush, Collections. emptyList()));
- return;
- }
- Promise promise;
+
if (dataLen > maxDataLen) {
// We need to write out the data by multiple packets as the max packet allowed is 16M.
- PromiseCombiner combiner = new PromiseCombiner();
long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
- for (int remaining = dataLen; remaining > 0;) {
- int toWriteDataLen = Math.min(remaining, maxDataLen);
- combiner.add((Future) flushBuffer(buf.readRetainedSlice(toWriteDataLen),
- nextSubPacketOffsetInBlock, syncBlock));
- nextSubPacketOffsetInBlock += toWriteDataLen;
- remaining -= toWriteDataLen;
+ for (int remaining = dataLen;;) {
+ if (remaining < maxDataLen) {
+ flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
+ syncBlock);
+ break;
+ } else {
+ flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
+ nextSubPacketOffsetInBlock, syncBlock);
+ remaining -= maxDataLen;
+ nextSubPacketOffsetInBlock += maxDataLen;
+ }
}
- promise = eventLoop.newPromise();
- combiner.finish(promise);
} else {
- promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock);
+ flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
}
- promise.addListener(f -> {
- if (f.isSuccess()) {
- future.complete(lengthAfterFlush);
- } else {
- future.completeExceptionally(f.cause());
- }
- });
- int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
- ByteBuf newBuf = alloc.directBuffer(guess(dataLen)).ensureWritable(trailingPartialChunkLen);
- if (trailingPartialChunkLen != 0) {
- buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen);
+ trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
+ ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
+ .ensureWritable(trailingPartialChunkLength);
+ if (trailingPartialChunkLength != 0) {
+ buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
+ trailingPartialChunkLength);
}
buf.release();
this.buf = newBuf;
- nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen;
+ nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
}
/**
@@ -497,34 +509,38 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
*/
public CompletableFuture flush(boolean syncBlock) {
CompletableFuture future = new CompletableFuture<>();
- if (eventLoop.inEventLoop()) {
- flush0(future, syncBlock);
- } else {
- eventLoop.execute(() -> flush0(future, syncBlock));
- }
+ flush0(future, syncBlock);
return future;
}
- private void endBlock(Promise promise, long size) {
+ private void endBlock() throws IOException {
+ Preconditions.checkState(waitingAckQueue.isEmpty(),
+ "should call flush first before calling close");
if (state != State.STREAMING) {
- promise.tryFailure(new IOException("stream already broken"));
- return;
- }
- if (!waitingAckQueue.isEmpty()) {
- promise.tryFailure(new IllegalStateException("should call flush first before calling close"));
- return;
+ throw new IOException("stream already broken");
}
state = State.CLOSING;
- PacketHeader header = new PacketHeader(4, size, nextPacketSeqno, true, 0, false);
+ long finalizedLength = ackedBlockLength;
+ PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
buf.release();
buf = null;
int headerLen = header.getSerializedSize();
ByteBuf headerBuf = alloc.directBuffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
- waitingAckQueue.add(new Callback(promise, size, datanodeList));
- datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.duplicate().retain()));
+ CompletableFuture future = new CompletableFuture<>();
+ waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
+ datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
headerBuf.release();
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException().initCause(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new IOException(cause);
+ }
}
/**
@@ -532,7 +548,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
*/
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
- assert !eventLoop.inEventLoop();
+ datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);
fsUtils.recoverFileLease(dfs, new Path(src), conf,
@@ -545,32 +561,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
*/
@Override
public void close() throws IOException {
- assert !eventLoop.inEventLoop();
- Promise promise = eventLoop.newPromise();
- eventLoop.execute(() -> endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes()));
- promise.addListener(f -> datanodeList.forEach(ch -> ch.close())).syncUninterruptibly();
+ endBlock();
+ state = State.CLOSED;
+ datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
- completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId);
- }
-
- @VisibleForTesting
- int guess(int bytesWritten) {
- // if the bytesWritten is greater than the current capacity
- // always increase the capacity in powers of 2.
- if (bytesWritten > this.capacity) {
- // Ensure we don't cross the LIMIT
- if ((this.capacity << 1) <= LIMIT) {
- // increase the capacity in the range of power of 2
- this.capacity = this.capacity << 1;
- }
- } else {
- // if we see that the bytesWritten is lesser we could again decrease
- // the capacity by dividing it by 2 if the bytesWritten is satisfied by
- // that reduction
- if ((this.capacity >> 1) >= bytesWritten) {
- this.capacity = this.capacity >> 1;
- }
- }
- return this.capacity;
+ block.setNumBytes(ackedBlockLength);
+ completeFile(client, namenode, src, clientName, block, fileId);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index d3dc957ec26..61aa97cd087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -17,43 +17,19 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
-import static org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
-import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
import com.google.protobuf.CodedOutputStream;
-import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufOutputStream;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.PooledByteBufAllocator;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFuture;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFutureListener;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInitializer;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelPipeline;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.FutureListener;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
-
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -73,7 +49,6 @@ import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -109,6 +84,32 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFuture;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFutureListener;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInitializer;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelPipeline;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
+import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.FutureListener;
+import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
/**
* Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
@@ -664,10 +665,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
});
}
- private static List> connectToDataNodes(Configuration conf, DFSClient client,
- String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
- BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop,
- Class extends Channel> channelClass) {
+ private static List> connectToDataNodes(Configuration conf,
+ DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd,
+ long latestGS, BlockConstructionStage stage, DataChecksum summer,
+ EventLoopGroup eventLoopGroup, Class extends Channel> channelClass) {
Enum>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname =
@@ -690,10 +691,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
for (int i = 0; i < datanodeInfos.length; i++) {
DatanodeInfo dnInfo = datanodeInfos[i];
Enum> storageType = storageTypes[i];
- Promise promise = eventLoop.newPromise();
+ Promise promise = eventLoopGroup.next().newPromise();
futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
- new Bootstrap().group(eventLoop).channel(channelClass)
+ new Bootstrap().group(eventLoopGroup).channel(channelClass)
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer() {
@Override
@@ -732,7 +733,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
- EventLoop eventLoop, Class extends Channel> channelClass) throws IOException {
+ EventLoopGroup eventLoopGroup, Class extends Channel> channelClass) throws IOException {
Configuration conf = dfs.getConf();
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
DFSClient client = dfs.getClient();
@@ -761,7 +762,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
stat.getFileId(), null);
List datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
- PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass);
+ PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (Future future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed.
@@ -770,7 +771,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
- stat.getFileId(), locatedBlock, encryptor, eventLoop, datanodeList, summer, ALLOC);
+ stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
succ = true;
return output;
} finally {
@@ -796,19 +797,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
/**
* Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
- * inside {@link EventLoop}.
- * @param eventLoop all connections to datanode will use the same event loop.
+ * inside an {@link EventLoop}.
*/
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
- EventLoop eventLoop, Class extends Channel> channelClass) throws IOException {
+ EventLoopGroup eventLoopGroup, Class extends Channel> channelClass) throws IOException {
return new FileSystemLinkResolver() {
@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
- blockSize, eventLoop, channelClass);
+ blockSize, eventLoopGroup, channelClass);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java
new file mode 100644
index 00000000000..2f652440e38
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used to predict the next send buffer size.
+ */
+@InterfaceAudience.Private
+class SendBufSizePredictor {
+
+ // LIMIT is 128MB
+ private static final int LIMIT = 128 * 1024 * 1024;
+
+ // buf's initial capacity - 4KB
+ private int capacity = 4 * 1024;
+
+ int initialSize() {
+ return capacity;
+ }
+
+ int guess(int bytesWritten) {
+ // if the bytesWritten is greater than the current capacity
+ // always increase the capacity in powers of 2.
+ if (bytesWritten > this.capacity) {
+ // Ensure we don't cross the LIMIT
+ if ((this.capacity << 1) <= LIMIT) {
+ // increase the capacity in the range of power of 2
+ this.capacity = this.capacity << 1;
+ }
+ } else {
+ // if we see that the bytesWritten is lesser we could again decrease
+ // the capacity by dividing it by 2 if the bytesWritten is satisfied by
+ // that reduction
+ if ((this.capacity >> 1) >= bytesWritten) {
+ this.capacity = this.capacity >> 1;
+ }
+ }
+ return this.capacity;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 430413748a5..bf3b2adc320 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -17,15 +17,6 @@
*/
package org.apache.hadoop.hbase.wal;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory;
-
import java.io.IOException;
import org.apache.commons.logging.Log;
@@ -33,13 +24,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory;
/**
* A WAL provider that use {@link AsyncFSWAL}.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 4377196ccef..48c1cbf683b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -24,17 +24,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
@@ -56,6 +54,12 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
+
@Category({ MiscTests.class, MediumTests.class })
public class TestFanOutOneBlockAsyncDFSOutput {
@@ -97,9 +101,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
// will fail.
for (;;) {
try {
- FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
- new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(),
- EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
+ FanOutOneBlockAsyncDFSOutput out =
+ FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
+ true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
out.close();
break;
} catch (IOException e) {
@@ -111,17 +115,32 @@ public class TestFanOutOneBlockAsyncDFSOutput {
static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
final FanOutOneBlockAsyncDFSOutput out)
throws IOException, InterruptedException, ExecutionException {
- final byte[] b = new byte[10];
- ThreadLocalRandom.current().nextBytes(b);
- out.write(b, 0, b.length);
- assertEquals(b.length, out.flush(false).get().longValue());
- out.close();
- assertEquals(b.length, dfs.getFileStatus(f).getLen());
- byte[] actual = new byte[b.length];
- try (FSDataInputStream in = dfs.open(f)) {
- in.readFully(actual);
+ List> futures = new ArrayList<>();
+ byte[] b = new byte[10];
+ Random rand = new Random(12345);
+ // test pipelined flush
+ for (int i = 0; i < 10; i++) {
+ rand.nextBytes(b);
+ out.write(b);
+ futures.add(out.flush(false));
+ futures.add(out.flush(false));
+ }
+ for (int i = 0; i < 10; i++) {
+ assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
+ assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
+ }
+ out.close();
+ assertEquals(b.length * 10, dfs.getFileStatus(f).getLen());
+ byte[] actual = new byte[b.length];
+ rand.setSeed(12345);
+ try (FSDataInputStream in = dfs.open(f)) {
+ for (int i = 0; i < 10; i++) {
+ in.readFully(actual);
+ rand.nextBytes(b);
+ assertArrayEquals(b, actual);
+ }
+ assertEquals(-1, in.read());
}
- assertArrayEquals(b, actual);
}
@Test
@@ -133,21 +152,6 @@ public class TestFanOutOneBlockAsyncDFSOutput {
writeAndVerify(eventLoop, FS, f, out);
}
- @Test
- public void testMaxByteBufAllocated() throws Exception {
- Path f = new Path("/" + name.getMethodName());
- EventLoop eventLoop = EVENT_LOOP_GROUP.next();
- FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
- false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
- out.guess(5 * 1024);
- assertEquals(8 * 1024, out.guess(5 * 1024));
- assertEquals(16 * 1024, out.guess(10 * 1024));
- // it wont reduce directly to 4KB
- assertEquals(8 * 1024, out.guess(4 * 1024));
- // This time it will reduece
- assertEquals(4 * 1024, out.guess(4 * 1024));
- }
-
@Test
public void testRecover() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
@@ -216,7 +220,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
xceiverServerDaemonField.setAccessible(true);
Class> xceiverServerClass =
- Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
+ Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
numPeersMethod.setAccessible(true);
// make one datanode broken
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index b0d689c8832..0453f1c88ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -61,7 +61,7 @@ public class TestLocalAsyncOutput {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
- fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), CHANNEL_CLASS);
+ fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS);
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index f1ecd3a9535..0e2b6d4df73 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -31,12 +31,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
@@ -75,6 +69,12 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
+
@RunWith(Parameterized.class)
@Category({ MiscTests.class, LargeTests.class })
public class TestSaslFanOutOneBlockAsyncDFSOutput {
@@ -90,7 +90,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static int READ_TIMEOUT_MS = 200000;
private static final File KEYTAB_FILE =
- new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
+ new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
private static MiniKdc KDC;
@@ -104,8 +104,6 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static String TEST_KEY_NAME = "test_key";
- private static boolean TEST_TRANSPARENT_ENCRYPTION = true;
-
@Rule
public TestName name = new TestName();
@@ -118,20 +116,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
@Parameter(2)
public String cipherSuite;
- @Parameter(3)
- public boolean useTransparentEncryption;
-
- @Parameters(
- name = "{index}: protection={0}, encryption={1}, cipherSuite={2}, transparent_enc={3}")
+ @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
public static Iterable