HDFS-4049. Fix hflush performance regression due to nagling delays. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1398590 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-10-16 00:54:47 +00:00
parent 2b73c49d81
commit 7a9f21aeaf
4 changed files with 233 additions and 73 deletions

View File

@ -106,6 +106,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files.
(Binglin Chang via suresh)
HDFS-4049. Fix hflush performance regression due to nagling delays
(todd)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES

View File

@ -53,14 +53,8 @@ public class PacketReceiver implements Closeable {
private final boolean useDirectBuffers;
/**
* Internal buffer for reading the length prefixes at the start of
* the packet.
*/
private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
PacketHeader.PKT_LENGTHS_LEN);
/**
* The entirety of the most recently read packet, excepting the
* The entirety of the most recently read packet.
* The first PKT_LENGTHS_LEN bytes of this buffer are the
* length prefixes.
*/
private ByteBuffer curPacketBuf = null;
@ -82,6 +76,7 @@ public class PacketReceiver implements Closeable {
public PacketReceiver(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers;
reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
}
public PacketHeader getHeader() {
@ -133,11 +128,12 @@ private void doRead(ReadableByteChannel ch, InputStream in)
// checksums were not requested
// DATA the actual block data
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
lengthPrefixBuf.clear();
doReadFully(ch, in, lengthPrefixBuf);
lengthPrefixBuf.flip();
int payloadLen = lengthPrefixBuf.getInt();
curPacketBuf.clear();
curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip();
int payloadLen = curPacketBuf.getInt();
if (payloadLen < Ints.BYTES) {
// The "payload length" includes its own length. Therefore it
@ -146,7 +142,7 @@ private void doRead(ReadableByteChannel ch, InputStream in)
payloadLen);
}
int dataPlusChecksumLen = payloadLen - Ints.BYTES;
int headerLen = lengthPrefixBuf.getShort();
int headerLen = curPacketBuf.getShort();
if (headerLen < 0) {
throw new IOException("Invalid header length " + headerLen);
}
@ -166,13 +162,17 @@ private void doRead(ReadableByteChannel ch, InputStream in)
// Make sure we have space for the whole packet, and
// read it.
reallocPacketBuf(dataPlusChecksumLen + headerLen);
reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
dataPlusChecksumLen + headerLen);
curPacketBuf.clear();
curPacketBuf.limit(dataPlusChecksumLen + headerLen);
curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
dataPlusChecksumLen + headerLen);
doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip();
curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
// Extract the header from the front of the buffer.
// Extract the header from the front of the buffer (after the length prefixes)
byte[] headerBuf = new byte[headerLen];
curPacketBuf.get(headerBuf);
if (curHeader == null) {
@ -197,10 +197,6 @@ private void doRead(ReadableByteChannel ch, InputStream in)
public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
Preconditions.checkState(!useDirectBuffers,
"Currently only supported for non-direct buffers");
assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
mirrorOut.write(lengthPrefixBuf.array(),
lengthPrefixBuf.arrayOffset(),
lengthPrefixBuf.capacity());
mirrorOut.write(curPacketBuf.array(),
curPacketBuf.arrayOffset(),
curPacketBuf.remaining());
@ -223,23 +219,36 @@ private static void doReadFully(ReadableByteChannel ch, InputStream in,
private void reslicePacket(
int headerLen, int checksumsLen, int dataLen) {
// Packet structure (refer to doRead() for details):
// PLEN HLEN HEADER CHECKSUMS DATA
// 32-bit 16-bit <protobuf> <variable length>
// |--- lenThroughHeader ----|
// |----------- lenThroughChecksums ----|
// |------------------- lenThroughData ------|
int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
int lenThroughChecksums = lenThroughHeader + checksumsLen;
int lenThroughData = lenThroughChecksums + dataLen;
assert dataLen >= 0 : "invalid datalen: " + dataLen;
assert curPacketBuf.position() == headerLen;
assert checksumsLen + dataLen == curPacketBuf.remaining() :
assert curPacketBuf.position() == lenThroughHeader;
assert curPacketBuf.limit() == lenThroughData :
"headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
" rem=" + curPacketBuf.remaining();
curPacketBuf.position(headerLen);
curPacketBuf.limit(headerLen + checksumsLen);
// Slice the checksums.
curPacketBuf.position(lenThroughHeader);
curPacketBuf.limit(lenThroughChecksums);
curChecksumSlice = curPacketBuf.slice();
curPacketBuf.position(headerLen + checksumsLen);
curPacketBuf.limit(headerLen + checksumsLen + dataLen);
// Slice the data.
curPacketBuf.position(lenThroughChecksums);
curPacketBuf.limit(lenThroughData);
curDataSlice = curPacketBuf.slice();
// Reset buffer to point to the entirety of the packet (including
// length prefixes)
curPacketBuf.position(0);
curPacketBuf.limit(headerLen + checksumsLen + dataLen);
curPacketBuf.limit(lenThroughData);
}
@ -258,12 +267,21 @@ private void reallocPacketBuf(int atLeastCapacity) {
// one.
if (curPacketBuf == null ||
curPacketBuf.capacity() < atLeastCapacity) {
returnPacketBufToPool();
ByteBuffer newBuf;
if (useDirectBuffers) {
curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
newBuf = bufferPool.getBuffer(atLeastCapacity);
} else {
curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
newBuf = ByteBuffer.allocate(atLeastCapacity);
}
// If reallocing an existing buffer, copy the old packet length
// prefixes over
if (curPacketBuf != null) {
curPacketBuf.flip();
newBuf.put(curPacketBuf);
}
returnPacketBufToPool();
curPacketBuf = newBuf;
}
}

View File

@ -20,45 +20,40 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.metrics2.util.SampleQuantiles;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import com.google.common.base.Stopwatch;
/**
* This class tests hflushing concurrently from many threads.
*/
public class TestMultiThreadedHflush {
static final int blockSize = 1024*1024;
static final int numBlocks = 10;
static final int fileSize = numBlocks * blockSize + 1;
private static final int NUM_THREADS = 10;
private static final int WRITE_SIZE = 517;
private static final int NUM_WRITES_PER_THREAD = 1000;
private byte[] toWrite = null;
{
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
}
private final SampleQuantiles quantiles = new SampleQuantiles(
new Quantile[] {
new Quantile(0.50, 0.050),
new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) });
/*
* creates a file but does not close it
@ -104,8 +99,11 @@ public void run() {
}
private void doAWrite() throws IOException {
Stopwatch sw = new Stopwatch().start();
stm.write(toWrite);
stm.hflush();
long micros = sw.elapsedTime(TimeUnit.MICROSECONDS);
quantiles.insert(micros);
}
}
@ -115,14 +113,28 @@ private void doAWrite() throws IOException {
* They all finish before the file is closed.
*/
@Test
public void testMultipleHflushers() throws Exception {
public void testMultipleHflushersRepl1() throws Exception {
doTestMultipleHflushers(1);
}
@Test
public void testMultipleHflushersRepl3() throws Exception {
doTestMultipleHflushers(3);
}
private void doTestMultipleHflushers(int repl) throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(repl)
.build();
FileSystem fs = cluster.getFileSystem();
Path p = new Path("/multiple-hflushers.dat");
try {
doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD);
doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE,
NUM_WRITES_PER_THREAD, repl);
System.out.println("Latency quantiles (in microseconds):\n" +
quantiles);
} finally {
fs.close();
cluster.shutdown();
@ -200,13 +212,13 @@ public void run() {
}
public void doMultithreadedWrites(
Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
throws Exception {
Configuration conf, Path p, int numThreads, int bufferSize, int numWrites,
int replication) throws Exception {
initBuffer(bufferSize);
// create a new file.
FileSystem fs = p.getFileSystem(conf);
FSDataOutputStream stm = createFile(fs, p, 1);
FSDataOutputStream stm = createFile(fs, p, replication);
System.out.println("Created file simpleFlush.dat");
// There have been a couple issues with flushing empty buffers, so do
@ -240,20 +252,41 @@ public void doMultithreadedWrites(
}
public static void main(String args[]) throws Exception {
if (args.length != 1) {
System.err.println(
"usage: " + TestMultiThreadedHflush.class.getSimpleName() +
" <path to test file> ");
System.exit(1);
}
TestMultiThreadedHflush test = new TestMultiThreadedHflush();
Configuration conf = new Configuration();
Path p = new Path(args[0]);
long st = System.nanoTime();
test.doMultithreadedWrites(conf, p, 10, 511, 50000);
long et = System.nanoTime();
System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
System.exit(ToolRunner.run(new CLIBenchmark(), args));
}
private static class CLIBenchmark extends Configured implements Tool {
public int run(String args[]) throws Exception {
if (args.length != 1) {
System.err.println(
"usage: " + TestMultiThreadedHflush.class.getSimpleName() +
" <path to test file> ");
System.err.println(
"Configurations settable by -D options:\n" +
" num.threads [default 10] - how many threads to run\n" +
" write.size [default 511] - bytes per write\n" +
" num.writes [default 50000] - how many writes to perform");
System.exit(1);
}
TestMultiThreadedHflush test = new TestMultiThreadedHflush();
Configuration conf = getConf();
Path p = new Path(args[0]);
int numThreads = conf.getInt("num.threads", 10);
int writeSize = conf.getInt("write.size", 511);
int numWrites = conf.getInt("num.writes", 50000);
int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
Stopwatch sw = new Stopwatch().start();
test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
replication);
sw.stop();
System.out.println("Finished in " + sw.elapsedMillis() + "ms");
System.out.println("Latency quantiles (in microseconds):\n" +
test.quantiles);
return 0;
}
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.*;
public class TestPacketReceiver {
private static long OFFSET_IN_BLOCK = 12345L;
private static int SEQNO = 54321;
private byte[] prepareFakePacket(byte[] data, byte[] sums) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
int packetLen = data.length + sums.length + 4;
PacketHeader header = new PacketHeader(
packetLen, OFFSET_IN_BLOCK, SEQNO, false, data.length, false);
header.write(dos);
dos.write(sums);
dos.write(data);
dos.flush();
return baos.toByteArray();
}
private static byte[] remainingAsArray(ByteBuffer buf) {
byte[] b = new byte[buf.remaining()];
buf.get(b);
return b;
}
@Test
public void testReceiveAndMirror() throws IOException {
PacketReceiver pr = new PacketReceiver(false);
// Test three different lengths, to force reallocing
// the buffer as it grows.
doTestReceiveAndMirror(pr, 100, 10);
doTestReceiveAndMirror(pr, 50, 10);
doTestReceiveAndMirror(pr, 150, 10);
pr.close();
}
private void doTestReceiveAndMirror(PacketReceiver pr,
int dataLen, int checksumsLen) throws IOException {
final byte[] DATA = AppendTestUtil.initBuffer(dataLen);
final byte[] CHECKSUMS = AppendTestUtil.initBuffer(checksumsLen);
byte[] packet = prepareFakePacket(DATA, CHECKSUMS);
ByteArrayInputStream in = new ByteArrayInputStream(packet);
pr.receiveNextPacket(in);
ByteBuffer parsedData = pr.getDataSlice();
assertArrayEquals(DATA, remainingAsArray(parsedData));
ByteBuffer parsedChecksums = pr.getChecksumSlice();
assertArrayEquals(CHECKSUMS, remainingAsArray(parsedChecksums));
PacketHeader header = pr.getHeader();
assertEquals(SEQNO, header.getSeqno());
assertEquals(OFFSET_IN_BLOCK, header.getOffsetInBlock());
// Mirror the packet to an output stream and make sure it matches
// the packet we sent.
ByteArrayOutputStream mirrored = new ByteArrayOutputStream();
mirrored = Mockito.spy(mirrored);
pr.mirrorPacketTo(new DataOutputStream(mirrored));
// The write should be done in a single call. Otherwise we may hit
// nasty interactions with nagling (eg HDFS-4049).
Mockito.verify(mirrored, Mockito.times(1))
.write(Mockito.<byte[]>any(), Mockito.anyInt(),
Mockito.eq(packet.length));
Mockito.verifyNoMoreInteractions(mirrored);
assertArrayEquals(packet, mirrored.toByteArray());
}
}