HBASE-17048 Calcuate suitable ByteBuf size when allocating send buffer in

FanOutOneBlockAsyncDFSOutput (Ram)
This commit is contained in:
Ramkrishna 2016-11-29 09:12:47 +05:30
parent cc03f7ad53
commit 51d9bac42b
2 changed files with 45 additions and 2 deletions

View File

@ -71,6 +71,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
/**
* An asynchronous HDFS output stream implementation which fans out data to datanode and only
* supports writing file with only one block.
@ -164,6 +166,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private long nextPacketSeqno = 0L;
private ByteBuf buf;
// buf's initial capacity - 4KB
private int capacity = 4 * 1024;
// LIMIT is 128MB
private final int LIMIT = 128 * 1024 * 1024;
private enum State {
STREAMING, CLOSING, BROKEN, CLOSED
@ -307,7 +314,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.summer = summer;
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
this.alloc = alloc;
this.buf = alloc.directBuffer();
this.buf = alloc.directBuffer(capacity);
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
}
@ -472,7 +479,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
});
int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen);
ByteBuf newBuf = alloc.directBuffer(guess(dataLen)).ensureWritable(trailingPartialChunkLen);
if (trailingPartialChunkLen != 0) {
buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen);
}
@ -543,4 +550,25 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId);
}
@VisibleForTesting
int guess(int bytesWritten) {
// if the bytesWritten is greater than the current capacity
// always increase the capacity in powers of 2.
if (bytesWritten > this.capacity) {
// Ensure we don't cross the LIMIT
if ((this.capacity << 1) <= LIMIT) {
// increase the capacity in the range of power of 2
this.capacity = this.capacity << 1;
}
} else {
// if we see that the bytesWritten is lesser we could again decrease
// the capacity by dividing it by 2 if the bytesWritten is satisfied by
// that reduction
if ((this.capacity >> 1) >= bytesWritten) {
this.capacity = this.capacity >> 1;
}
}
return this.capacity;
}
}

View File

@ -127,6 +127,21 @@ public class TestFanOutOneBlockAsyncDFSOutput {
writeAndVerify(eventLoop, FS, f, out);
}
@Test
public void testMaxByteBufAllocated() throws Exception {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
out.guess(5 * 1024);
assertEquals(8 * 1024, out.guess(5 * 1024));
assertEquals(16 * 1024, out.guess(10 * 1024));
// it wont reduce directly to 4KB
assertEquals(8 * 1024, out.guess(4 * 1024));
// This time it will reduece
assertEquals(4 * 1024, out.guess(4 * 1024));
}
@Test
public void testRecover() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());