diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 9aab9245d1b..e1303815499 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -42,6 +42,7 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.PromiseCombiner; import java.io.IOException; import java.nio.ByteBuffer; @@ -103,6 +104,10 @@ import org.apache.hadoop.util.DataChecksum; @InterfaceAudience.Private public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { + // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a + // smaller limit for data size. + private static final int MAX_DATA_LEN = 12 * 1024 * 1024; + private final Configuration conf; private final FSUtils fsUtils; @@ -129,6 +134,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final DataChecksum summer; + private final int maxDataLen; + private final ByteBufAllocator alloc; private static final class Callback { @@ -271,8 +278,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } @Override - public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) - throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception { failed(ctx.channel(), new Supplier() { @Override @@ -336,6 +342,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.eventLoop = eventLoop; this.datanodeList = datanodeList; this.summer = summer; + this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); this.alloc = alloc; this.buf = alloc.directBuffer(); this.state = State.STREAMING; @@ -393,45 +400,15 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { return locatedBlock.getLocations(); } - private void flush0(final A attachment, final CompletionHandler handler, + private Promise flushBuffer(ByteBuf dataBuf, long nextPacketOffsetInBlock, boolean syncBlock) { - if (state != State.STREAMING) { - handler.failed(new IOException("stream already broken"), attachment); - return; - } - int dataLen = buf.readableBytes(); - final long ackedLength = nextPacketOffsetInBlock + dataLen; - if (ackedLength == locatedBlock.getBlock().getNumBytes()) { - // no new data, just return - handler.completed(locatedBlock.getBlock().getNumBytes(), attachment); - return; - } - Promise promise = eventLoop.newPromise(); - promise.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - locatedBlock.getBlock().setNumBytes(ackedLength); - handler.completed(ackedLength, attachment); - } else { - handler.failed(future.cause(), attachment); - } - } - }); - Callback c = waitingAckQueue.peekLast(); - if (c != null && ackedLength == c.ackedLength) { - // just append it to the tail of waiting ack queue,, do not issue new hflush request. - waitingAckQueue - .addLast(new Callback(promise, ackedLength, Collections. emptyList())); - return; - } + int dataLen = dataBuf.readableBytes(); int chunkLen = summer.getBytesPerChecksum(); int trailingPartialChunkLen = dataLen % chunkLen; int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); int checksumLen = numChecks * summer.getChecksumSize(); ByteBuf checksumBuf = alloc.directBuffer(checksumLen); - summer.calculateChunkedSums(buf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); + summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); checksumBuf.writerIndex(checksumLen); PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, nextPacketSeqno, false, dataLen, syncBlock); @@ -440,14 +417,75 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); headerBuf.writerIndex(headerLen); + long ackedLength = nextPacketOffsetInBlock + dataLen; + Promise promise = eventLoop. newPromise().addListener(future -> { + if (future.isSuccess()) { + locatedBlock.getBlock().setNumBytes(ackedLength); + } + }); waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList)); for (Channel ch : datanodeList) { ch.write(headerBuf.duplicate().retain()); ch.write(checksumBuf.duplicate().retain()); - ch.writeAndFlush(buf.duplicate().retain()); + ch.writeAndFlush(dataBuf.duplicate().retain()); } checksumBuf.release(); headerBuf.release(); + dataBuf.release(); + nextPacketSeqno++; + return promise; + } + + private void flush0(final A attachment, final CompletionHandler handler, + boolean syncBlock) { + if (state != State.STREAMING) { + handler.failed(new IOException("stream already broken"), attachment); + return; + } + int dataLen = buf.readableBytes(); + final long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; + if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) { + // no new data, just return + handler.completed(locatedBlock.getBlock().getNumBytes(), attachment); + return; + } + Callback c = waitingAckQueue.peekLast(); + if (c != null && lengthAfterFlush == c.ackedLength) { + // just append it to the tail of waiting ack queue,, do not issue new hflush request. + waitingAckQueue.addLast(new Callback(eventLoop. newPromise().addListener(future -> { + if (future.isSuccess()) { + handler.completed(lengthAfterFlush, attachment); + } else { + handler.failed(future.cause(), attachment); + } + }), lengthAfterFlush, Collections. emptyList())); + return; + } + Promise promise; + if (dataLen > maxDataLen) { + // We need to write out the data by multiple packets as the max packet allowed is 16M. + PromiseCombiner combiner = new PromiseCombiner(); + long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; + for (int remaining = dataLen; remaining > 0;) { + int toWriteDataLen = Math.min(remaining, maxDataLen); + combiner.add(flushBuffer(buf.readRetainedSlice(toWriteDataLen), nextSubPacketOffsetInBlock, + syncBlock)); + nextSubPacketOffsetInBlock += toWriteDataLen; + remaining -= toWriteDataLen; + } + promise = eventLoop.newPromise(); + combiner.finish(promise); + } else { + promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock); + } + promise.addListener(future -> { + if (future.isSuccess()) { + handler.completed(lengthAfterFlush, attachment); + } else { + handler.failed(future.cause(), attachment); + } + }); + int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum(); ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen); if (trailingPartialChunkLen != 0) { buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen); @@ -455,7 +493,6 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { buf.release(); this.buf = newBuf; nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen; - nextPacketSeqno++; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 2be3b281744..a6d317740b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -91,9 +91,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { // will fail. for (;;) { try { - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(), - EVENT_LOOP_GROUP.next()); + FanOutOneBlockAsyncDFSOutput out = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"), + true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP.next()); out.close(); break; } catch (IOException e) { @@ -104,10 +104,11 @@ public class TestFanOutOneBlockAsyncDFSOutput { static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f, final FanOutOneBlockAsyncDFSOutput out) - throws IOException, InterruptedException, ExecutionException { + throws IOException, InterruptedException, ExecutionException { final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = + new FanOutOneBlockAsyncDFSOutputFlushHandler(); eventLoop.execute(new Runnable() { @Override @@ -143,7 +144,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = + new FanOutOneBlockAsyncDFSOutputFlushHandler(); eventLoop.execute(new Runnable() { @Override @@ -218,8 +220,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { InvocationTargetException, InterruptedException, NoSuchFieldException { Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); xceiverServerDaemonField.setAccessible(true); - Class xceiverServerClass = Class - .forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); + Class xceiverServerClass = + Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); numPeersMethod.setAccessible(true); // make one datanode broken @@ -243,4 +245,32 @@ public class TestFanOutOneBlockAsyncDFSOutput { ensureAllDatanodeAlive(); } } + + @Test + public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, 1024 * 1024 * 1024, eventLoop); + byte[] b = new byte[50 * 1024 * 1024]; + ThreadLocalRandom.current().nextBytes(b); + FanOutOneBlockAsyncDFSOutputFlushHandler handler = + new FanOutOneBlockAsyncDFSOutputFlushHandler(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + out.write(b); + out.flush(null, handler, false); + } + }); + assertEquals(b.length, handler.get()); + out.close(); + assertEquals(b.length, FS.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = FS.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java index 6a4aceb6cc3..c93c9194f57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; -import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -79,8 +78,6 @@ public class TestMobStoreScanner { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100 * 1024 * 1024); - // TODO: AsyncFSWAL can not handle large edits right now, remove this after we fix the issue. - TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem"); TEST_UTIL.startMiniCluster(1); }