HDFS-10303. DataStreamer#ResponseProcessor calculates packet ack latency incorrectly. Contributed by Surendra Singh Lilhore.

This commit is contained in:
Kihwal Lee 2016-05-17 08:53:31 -05:00
parent 08ea07f1b8
commit 4a5819dae2
2 changed files with 105 additions and 8 deletions

View File

@ -32,10 +32,12 @@ import java.net.Socket;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -395,6 +397,7 @@ class DataStreamer extends Daemon {
private volatile boolean appendChunk = false; private volatile boolean appendChunk = false;
// both dataQueue and ackQueue are protected by dataQueue lock // both dataQueue and ackQueue are protected by dataQueue lock
protected final LinkedList<DFSPacket> dataQueue = new LinkedList<>(); protected final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
private final Map<Long, Long> packetSendTime = new HashMap<>();
private final LinkedList<DFSPacket> ackQueue = new LinkedList<>(); private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
private final AtomicReference<CachingStrategy> cachingStrategy; private final AtomicReference<CachingStrategy> cachingStrategy;
private final ByteArrayManager byteArrayManager; private final ByteArrayManager byteArrayManager;
@ -644,6 +647,7 @@ class DataStreamer extends Daemon {
scope = null; scope = null;
dataQueue.removeFirst(); dataQueue.removeFirst();
ackQueue.addLast(one); ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
dataQueue.notifyAll(); dataQueue.notifyAll();
} }
} }
@ -957,15 +961,21 @@ class DataStreamer extends Daemon {
// process responses from datanodes. // process responses from datanodes.
try { try {
// read an ack from the pipeline // read an ack from the pipeline
long begin = Time.monotonicNow();
ack.readFields(blockReplyStream); ack.readFields(blockReplyStream);
if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
Long begin = packetSendTime.get(ack.getSeqno());
if (begin != null) {
long duration = Time.monotonicNow() - begin; long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs if (duration > dfsclientSlowLogThresholdMs) {
&& ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { LOG.info("Slow ReadProcessor read fields for block " + block
LOG.warn("Slow ReadProcessor read fields took " + duration + " took " + duration + "ms (threshold="
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + dfsclientSlowLogThresholdMs + "ms); ack: " + ack
+ ack + ", targets: " + Arrays.asList(targets)); + ", targets: " + Arrays.asList(targets));
} else { }
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("DFSClient {}", ack); LOG.debug("DFSClient {}", ack);
} }
@ -1047,6 +1057,7 @@ class DataStreamer extends Daemon {
lastAckedSeqno = seqno; lastAckedSeqno = seqno;
pipelineRecoveryCount = 0; pipelineRecoveryCount = 0;
ackQueue.removeFirst(); ackQueue.removeFirst();
packetSendTime.remove(seqno);
dataQueue.notifyAll(); dataQueue.notifyAll();
one.releaseBuffer(byteArrayManager); one.releaseBuffer(byteArrayManager);
@ -1105,6 +1116,7 @@ class DataStreamer extends Daemon {
synchronized (dataQueue) { synchronized (dataQueue) {
dataQueue.addAll(0, ackQueue); dataQueue.addAll(0, ackQueue);
ackQueue.clear(); ackQueue.clear();
packetSendTime.clear();
} }
// If we had to recover the pipeline five times in a row for the // If we had to recover the pipeline five times in a row for the

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestDataStream {
static MiniDFSCluster cluster;
static int PACKET_SIZE = 1024;
@BeforeClass
public static void setup() throws IOException {
Configuration conf = new Configuration();
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
PACKET_SIZE);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
10000);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000);
cluster = new MiniDFSCluster.Builder(conf).build();
}
@Test(timeout = 60000)
public void testDfsClient() throws IOException, InterruptedException {
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory
.getLog(DataStreamer.class));
byte[] toWrite = new byte[PACKET_SIZE];
new Random(1).nextBytes(toWrite);
final Path path = new Path("/file1");
final DistributedFileSystem dfs = cluster.getFileSystem();
FSDataOutputStream out = null;
out = dfs.create(path, false);
out.write(toWrite);
out.write(toWrite);
out.hflush();
//Wait to cross slow IO warning threshold
Thread.sleep(15 * 1000);
out.write(toWrite);
out.write(toWrite);
out.hflush();
//Wait for capturing logs in busy cluster
Thread.sleep(5 * 1000);
out.close();
logs.stopCapturing();
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
"Slow ReadProcessor read fields for block");
}
@AfterClass
public static void tearDown() {
cluster.shutdown();
}
}