HBASE-15709 Handle large edits for asynchronous WAL

This commit is contained in:
zhangduo 2016-10-25 11:25:10 +08:00
parent 9875c699af
commit caa1a8494a
3 changed files with 112 additions and 48 deletions

View File

@ -42,6 +42,7 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -103,6 +104,10 @@ import org.apache.hadoop.util.DataChecksum;
@InterfaceAudience.Private
public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a
// smaller limit for data size.
private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
private final Configuration conf;
private final FSUtils fsUtils;
@ -129,6 +134,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private final DataChecksum summer;
private final int maxDataLen;
private final ByteBufAllocator alloc;
private static final class Callback {
@ -271,8 +278,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause)
throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
@ -336,6 +342,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.eventLoop = eventLoop;
this.datanodeList = datanodeList;
this.summer = summer;
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
this.alloc = alloc;
this.buf = alloc.directBuffer();
this.state = State.STREAMING;
@ -393,45 +400,15 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
return locatedBlock.getLocations();
}
private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler,
private Promise<Void> flushBuffer(ByteBuf dataBuf, long nextPacketOffsetInBlock,
boolean syncBlock) {
if (state != State.STREAMING) {
handler.failed(new IOException("stream already broken"), attachment);
return;
}
int dataLen = buf.readableBytes();
final long ackedLength = nextPacketOffsetInBlock + dataLen;
if (ackedLength == locatedBlock.getBlock().getNumBytes()) {
// no new data, just return
handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
return;
}
Promise<Void> promise = eventLoop.newPromise();
promise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
locatedBlock.getBlock().setNumBytes(ackedLength);
handler.completed(ackedLength, attachment);
} else {
handler.failed(future.cause(), attachment);
}
}
});
Callback c = waitingAckQueue.peekLast();
if (c != null && ackedLength == c.ackedLength) {
// just append it to the tail of waiting ack queue,, do not issue new hflush request.
waitingAckQueue
.addLast(new Callback(promise, ackedLength, Collections.<Channel> emptyList()));
return;
}
int dataLen = dataBuf.readableBytes();
int chunkLen = summer.getBytesPerChecksum();
int trailingPartialChunkLen = dataLen % chunkLen;
int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
int checksumLen = numChecks * summer.getChecksumSize();
ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
summer.calculateChunkedSums(buf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
checksumBuf.writerIndex(checksumLen);
PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
nextPacketSeqno, false, dataLen, syncBlock);
@ -440,14 +417,75 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
long ackedLength = nextPacketOffsetInBlock + dataLen;
Promise<Void> promise = eventLoop.<Void> newPromise().addListener(future -> {
if (future.isSuccess()) {
locatedBlock.getBlock().setNumBytes(ackedLength);
}
});
waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList));
for (Channel ch : datanodeList) {
ch.write(headerBuf.duplicate().retain());
ch.write(checksumBuf.duplicate().retain());
ch.writeAndFlush(buf.duplicate().retain());
ch.writeAndFlush(dataBuf.duplicate().retain());
}
checksumBuf.release();
headerBuf.release();
dataBuf.release();
nextPacketSeqno++;
return promise;
}
private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler,
boolean syncBlock) {
if (state != State.STREAMING) {
handler.failed(new IOException("stream already broken"), attachment);
return;
}
int dataLen = buf.readableBytes();
final long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) {
// no new data, just return
handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
return;
}
Callback c = waitingAckQueue.peekLast();
if (c != null && lengthAfterFlush == c.ackedLength) {
// just append it to the tail of waiting ack queue,, do not issue new hflush request.
waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(future -> {
if (future.isSuccess()) {
handler.completed(lengthAfterFlush, attachment);
} else {
handler.failed(future.cause(), attachment);
}
}), lengthAfterFlush, Collections.<Channel> emptyList()));
return;
}
Promise<Void> promise;
if (dataLen > maxDataLen) {
// We need to write out the data by multiple packets as the max packet allowed is 16M.
PromiseCombiner combiner = new PromiseCombiner();
long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
for (int remaining = dataLen; remaining > 0;) {
int toWriteDataLen = Math.min(remaining, maxDataLen);
combiner.add(flushBuffer(buf.readRetainedSlice(toWriteDataLen), nextSubPacketOffsetInBlock,
syncBlock));
nextSubPacketOffsetInBlock += toWriteDataLen;
remaining -= toWriteDataLen;
}
promise = eventLoop.newPromise();
combiner.finish(promise);
} else {
promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock);
}
promise.addListener(future -> {
if (future.isSuccess()) {
handler.completed(lengthAfterFlush, attachment);
} else {
handler.failed(future.cause(), attachment);
}
});
int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen);
if (trailingPartialChunkLen != 0) {
buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen);
@ -455,7 +493,6 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
buf.release();
this.buf = newBuf;
nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen;
nextPacketSeqno++;
}
/**

View File

@ -91,9 +91,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
// will fail.
for (;;) {
try {
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(),
EVENT_LOOP_GROUP.next());
FanOutOneBlockAsyncDFSOutput out =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP.next());
out.close();
break;
} catch (IOException e) {
@ -104,10 +104,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
final FanOutOneBlockAsyncDFSOutput out)
throws IOException, InterruptedException, ExecutionException {
throws IOException, InterruptedException, ExecutionException {
final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
new FanOutOneBlockAsyncDFSOutputFlushHandler();
eventLoop.execute(new Runnable() {
@Override
@ -143,7 +144,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
new FanOutOneBlockAsyncDFSOutputFlushHandler();
eventLoop.execute(new Runnable() {
@Override
@ -218,8 +220,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
InvocationTargetException, InterruptedException, NoSuchFieldException {
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
xceiverServerDaemonField.setAccessible(true);
Class<?> xceiverServerClass = Class
.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Class<?> xceiverServerClass =
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
numPeersMethod.setAccessible(true);
// make one datanode broken
@ -243,4 +245,32 @@ public class TestFanOutOneBlockAsyncDFSOutput {
ensureAllDatanodeAlive();
}
}
@Test
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, 1024 * 1024 * 1024, eventLoop);
byte[] b = new byte[50 * 1024 * 1024];
ThreadLocalRandom.current().nextBytes(b);
FanOutOneBlockAsyncDFSOutputFlushHandler handler =
new FanOutOneBlockAsyncDFSOutputFlushHandler();
eventLoop.execute(new Runnable() {
@Override
public void run() {
out.write(b);
out.flush(null, handler, false);
}
});
assertEquals(b.length, handler.get());
out.close();
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = FS.open(f)) {
in.readFully(actual);
}
assertArrayEquals(b, actual);
}
}

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -79,8 +78,6 @@ public class TestMobStoreScanner {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100 * 1024 * 1024);
// TODO: AsyncFSWAL can not handle large edits right now, remove this after we fix the issue.
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem");
TEST_UTIL.startMiniCluster(1);
}