HDFS-10303. DataStreamer#ResponseProcessor calculates packet ack latency incorrectly. Contributed by Surendra Singh Lilhore.
(cherry picked from commit 4a5819dae2
)
This commit is contained in:
parent
dd0a8201bc
commit
48be0ff67a
|
@ -33,10 +33,12 @@ import java.net.Socket;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -375,6 +377,7 @@ class DataStreamer extends Daemon {
|
||||||
private volatile boolean appendChunk = false;
|
private volatile boolean appendChunk = false;
|
||||||
// both dataQueue and ackQueue are protected by dataQueue lock
|
// both dataQueue and ackQueue are protected by dataQueue lock
|
||||||
private final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
|
private final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
|
||||||
|
private final Map<Long, Long> packetSendTime = new HashMap<>();
|
||||||
private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
|
private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
|
||||||
private final AtomicReference<CachingStrategy> cachingStrategy;
|
private final AtomicReference<CachingStrategy> cachingStrategy;
|
||||||
private final ByteArrayManager byteArrayManager;
|
private final ByteArrayManager byteArrayManager;
|
||||||
|
@ -621,6 +624,7 @@ class DataStreamer extends Daemon {
|
||||||
scope = null;
|
scope = null;
|
||||||
dataQueue.removeFirst();
|
dataQueue.removeFirst();
|
||||||
ackQueue.addLast(one);
|
ackQueue.addLast(one);
|
||||||
|
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
|
||||||
dataQueue.notifyAll();
|
dataQueue.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -931,15 +935,21 @@ class DataStreamer extends Daemon {
|
||||||
// process responses from datanodes.
|
// process responses from datanodes.
|
||||||
try {
|
try {
|
||||||
// read an ack from the pipeline
|
// read an ack from the pipeline
|
||||||
long begin = Time.monotonicNow();
|
|
||||||
ack.readFields(blockReplyStream);
|
ack.readFields(blockReplyStream);
|
||||||
|
if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
|
||||||
|
Long begin = packetSendTime.get(ack.getSeqno());
|
||||||
|
if (begin != null) {
|
||||||
long duration = Time.monotonicNow() - begin;
|
long duration = Time.monotonicNow() - begin;
|
||||||
if (duration > dfsclientSlowLogThresholdMs
|
if (duration > dfsclientSlowLogThresholdMs) {
|
||||||
&& ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
|
LOG.info("Slow ReadProcessor read fields for block " + block
|
||||||
LOG.warn("Slow ReadProcessor read fields took " + duration
|
+ " took " + duration + "ms (threshold="
|
||||||
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
|
+ dfsclientSlowLogThresholdMs + "ms); ack: " + ack
|
||||||
+ ack + ", targets: " + Arrays.asList(targets));
|
+ ", targets: " + Arrays.asList(targets));
|
||||||
} else {
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("DFSClient {}", ack);
|
LOG.debug("DFSClient {}", ack);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1021,6 +1031,7 @@ class DataStreamer extends Daemon {
|
||||||
lastAckedSeqno = seqno;
|
lastAckedSeqno = seqno;
|
||||||
pipelineRecoveryCount = 0;
|
pipelineRecoveryCount = 0;
|
||||||
ackQueue.removeFirst();
|
ackQueue.removeFirst();
|
||||||
|
packetSendTime.remove(seqno);
|
||||||
dataQueue.notifyAll();
|
dataQueue.notifyAll();
|
||||||
|
|
||||||
one.releaseBuffer(byteArrayManager);
|
one.releaseBuffer(byteArrayManager);
|
||||||
|
@ -1074,6 +1085,7 @@ class DataStreamer extends Daemon {
|
||||||
synchronized (dataQueue) {
|
synchronized (dataQueue) {
|
||||||
dataQueue.addAll(0, ackQueue);
|
dataQueue.addAll(0, ackQueue);
|
||||||
ackQueue.clear();
|
ackQueue.clear();
|
||||||
|
packetSendTime.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we had to recover the pipeline five times in a row for the
|
// If we had to recover the pipeline five times in a row for the
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestDataStream {
|
||||||
|
static MiniDFSCluster cluster;
|
||||||
|
static int PACKET_SIZE = 1024;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||||
|
PACKET_SIZE);
|
||||||
|
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
|
||||||
|
10000);
|
||||||
|
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testDfsClient() throws IOException, InterruptedException {
|
||||||
|
LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory
|
||||||
|
.getLog(DataStreamer.class));
|
||||||
|
byte[] toWrite = new byte[PACKET_SIZE];
|
||||||
|
new Random(1).nextBytes(toWrite);
|
||||||
|
final Path path = new Path("/file1");
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
FSDataOutputStream out = null;
|
||||||
|
out = dfs.create(path, false);
|
||||||
|
|
||||||
|
out.write(toWrite);
|
||||||
|
out.write(toWrite);
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
//Wait to cross slow IO warning threshold
|
||||||
|
Thread.sleep(15 * 1000);
|
||||||
|
|
||||||
|
out.write(toWrite);
|
||||||
|
out.write(toWrite);
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
//Wait for capturing logs in busy cluster
|
||||||
|
Thread.sleep(5 * 1000);
|
||||||
|
|
||||||
|
out.close();
|
||||||
|
logs.stopCapturing();
|
||||||
|
GenericTestUtils.assertDoesNotMatch(logs.getOutput(),
|
||||||
|
"Slow ReadProcessor read fields for block");
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue