HBASE-6110 adding more slow action log in critical write path (Liang Xie via stack)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1597634 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2014-05-26 19:44:02 +00:00
parent 2fe6e4ccfe
commit 4206ef2611
7 changed files with 97 additions and 8 deletions

View File

@ -126,6 +126,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6435. Add support for specifying a static uid/gid mapping for the NFS HDFS-6435. Add support for specifying a static uid/gid mapping for the NFS
gateway. (atm via wang) gateway. (atm via wang)
HDFS-6110 adding more slow action log in critical write path
(Liang Xie via stack)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

View File

@ -276,6 +276,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
final int retryTimesForGetLastBlockLength; final int retryTimesForGetLastBlockLength;
final int retryIntervalForGetLastBlockLength; final int retryIntervalForGetLastBlockLength;
final long datanodeRestartTimeout; final long datanodeRestartTimeout;
final long dfsclientSlowIoWarningThresholdMs;
final boolean useLegacyBlockReader; final boolean useLegacyBlockReader;
final boolean useLegacyBlockReaderLocal; final boolean useLegacyBlockReaderLocal;
@ -430,6 +431,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
datanodeRestartTimeout = conf.getLong( datanodeRestartTimeout = conf.getLong(
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
dfsclientSlowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
} }
public boolean isUseLegacyBlockReaderLocal() { public boolean isUseLegacyBlockReaderLocal() {

View File

@ -642,5 +642,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned. public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
public static final String DFS_NFS_ALLOW_INSECURE_PORTS_KEY = "dfs.nfs.allow.insecure.ports"; public static final String DFS_NFS_ALLOW_INSECURE_PORTS_KEY = "dfs.nfs.allow.insecure.ports";
public static final boolean DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT = true; public static final boolean DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT = true;
// Slow io warning log threshold settings for dfsclient and datanode.
public static final String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.client.slow.io.warning.threshold.ms";
public static final long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.datanode.slow.io.warning.threshold.ms";
public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
} }

View File

@ -121,6 +121,7 @@ import com.google.common.cache.RemovalNotification;
public class DFSOutputStream extends FSOutputSummer public class DFSOutputStream extends FSOutputSummer
implements Syncable, CanSetDropBehind { implements Syncable, CanSetDropBehind {
private final DFSClient dfsClient; private final DFSClient dfsClient;
private final long dfsclientSlowLogThresholdMs;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s; private Socket s;
// closed is accessed by different threads under different locks. // closed is accessed by different threads under different locks.
@ -793,11 +794,19 @@ public class DFSOutputStream extends FSOutputSummer
// process responses from datanodes. // process responses from datanodes.
try { try {
// read an ack from the pipeline // read an ack from the pipeline
long begin = Time.monotonicNow();
ack.readFields(blockReplyStream); ack.readFields(blockReplyStream);
if (DFSClient.LOG.isDebugEnabled()) { long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs
&& ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {
DFSClient.LOG
.warn("Slow ReadProcessor read fields took " + duration
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
+ ack + ", targets: " + Arrays.asList(targets));
} else if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient " + ack); DFSClient.LOG.debug("DFSClient " + ack);
} }
long seqno = ack.getSeqno(); long seqno = ack.getSeqno();
// processes response status from datanodes. // processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
@ -1574,6 +1583,8 @@ public class DFSOutputStream extends FSOutputSummer
} }
this.checksum = checksum; this.checksum = checksum;
this.dfsclientSlowLogThresholdMs =
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
} }
/** Construct a new output stream for creating a file. */ /** Construct a new output stream for creating a file. */
@ -2010,6 +2021,7 @@ public class DFSOutputStream extends FSOutputSummer
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waiting for ack for: " + seqno); DFSClient.LOG.debug("Waiting for ack for: " + seqno);
} }
long begin = Time.monotonicNow();
try { try {
synchronized (dataQueue) { synchronized (dataQueue) {
while (!closed) { while (!closed) {
@ -2029,6 +2041,11 @@ public class DFSOutputStream extends FSOutputSummer
checkClosed(); checkClosed();
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
} }
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs) {
DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
}
} }
private synchronized void start() { private synchronized void start() {

View File

@ -69,7 +69,7 @@ class BlockReceiver implements Closeable {
@VisibleForTesting @VisibleForTesting
static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024; static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private final long datanodeSlowLogThresholdMs;
private DataInputStream in = null; // from where data are read private DataInputStream in = null; // from where data are read
private DataChecksum clientChecksum; // checksum used by client private DataChecksum clientChecksum; // checksum used by client
private DataChecksum diskChecksum; // checksum we write to disk private DataChecksum diskChecksum; // checksum we write to disk
@ -140,7 +140,7 @@ class BlockReceiver implements Closeable {
this.isDatanode = clientname.length() == 0; this.isDatanode = clientname.length() == 0;
this.isClient = !this.isDatanode; this.isClient = !this.isDatanode;
this.restartBudget = datanode.getDnConf().restartReplicaExpiry; this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
//for datanode, we have //for datanode, we have
//1: clientName.length() == 0, and //1: clientName.length() == 0, and
//2: stage == null or PIPELINE_SETUP_CREATE //2: stage == null or PIPELINE_SETUP_CREATE
@ -335,6 +335,7 @@ class BlockReceiver implements Closeable {
*/ */
void flushOrSync(boolean isSync) throws IOException { void flushOrSync(boolean isSync) throws IOException {
long flushTotalNanos = 0; long flushTotalNanos = 0;
long begin = Time.monotonicNow();
if (checksumOut != null) { if (checksumOut != null) {
long flushStartNanos = System.nanoTime(); long flushStartNanos = System.nanoTime();
checksumOut.flush(); checksumOut.flush();
@ -363,6 +364,12 @@ class BlockReceiver implements Closeable {
datanode.metrics.incrFsyncCount(); datanode.metrics.incrFsyncCount();
} }
} }
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
+ datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
+ flushTotalNanos + "ns");
}
} }
/** /**
@ -488,8 +495,14 @@ class BlockReceiver implements Closeable {
//First write the packet to the mirror: //First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) { if (mirrorOut != null && !mirrorError) {
try { try {
long begin = Time.monotonicNow();
packetReceiver.mirrorPacketTo(mirrorOut); packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush(); mirrorOut.flush();
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
} catch (IOException e) { } catch (IOException e) {
handleMirrorOutError(e); handleMirrorOutError(e);
} }
@ -572,7 +585,13 @@ class BlockReceiver implements Closeable {
int numBytesToDisk = (int)(offsetInBlock-onDiskLen); int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk. // Write data to disk.
long begin = Time.monotonicNow();
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
// If this is a partial chunk, then verify that this is the only // If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk. // chunk in the packet. Calculate new crc for this chunk.
@ -638,6 +657,7 @@ class BlockReceiver implements Closeable {
try { try {
if (outFd != null && if (outFd != null &&
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) { offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
long begin = Time.monotonicNow();
// //
// For SYNC_FILE_RANGE_WRITE, we want to sync from // For SYNC_FILE_RANGE_WRITE, we want to sync from
// lastCacheManagementOffset to a position "two windows ago" // lastCacheManagementOffset to a position "two windows ago"
@ -670,6 +690,11 @@ class BlockReceiver implements Closeable {
NativeIO.POSIX.POSIX_FADV_DONTNEED); NativeIO.POSIX.POSIX_FADV_DONTNEED);
} }
lastCacheManagementOffset = offsetInBlock; lastCacheManagementOffset = offsetInBlock;
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow manageWriterOsCache took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
} }
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Error managing cache for writer of block " + block, t); LOG.warn("Error managing cache for writer of block " + block, t);
@ -1299,9 +1324,15 @@ class BlockReceiver implements Closeable {
replicaInfo.setBytesAcked(offsetInBlock); replicaInfo.setBytesAcked(offsetInBlock);
} }
// send my ack back to upstream datanode // send my ack back to upstream datanode
long begin = Time.monotonicNow();
replyAck.write(upstreamOut); replyAck.write(upstreamOut);
upstreamOut.flush(); upstreamOut.flush();
if (LOG.isDebugEnabled()) { long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow PacketResponder send ack to upstream took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
+ ", replyAck=" + replyAck);
} else if (LOG.isDebugEnabled()) {
LOG.debug(myString + ", replyAck=" + replyAck); LOG.debug(myString + ", replyAck=" + replyAck);
} }

View File

@ -79,6 +79,8 @@ public class DNConf {
final long deleteReportInterval; final long deleteReportInterval;
final long initialBlockReportDelay; final long initialBlockReportDelay;
final long cacheReportInterval; final long cacheReportInterval;
final long dfsclientSlowIoWarningThresholdMs;
final long datanodeSlowIoWarningThresholdMs;
final int writePacketSize; final int writePacketSize;
final String minimumNameNodeVersion; final String minimumNameNodeVersion;
@ -129,7 +131,14 @@ public class DNConf {
DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT); DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
this.dfsclientSlowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
this.datanodeSlowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
long initBRDelay = conf.getLong( long initBRDelay = conf.getLong(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY, DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
@ -168,7 +177,7 @@ public class DNConf {
DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
} }
// We get minimumNameNodeVersion via a method so it can be mocked out in tests. // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
String getMinimumNameNodeVersion() { String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion; return this.minimumNameNodeVersion;

View File

@ -1906,4 +1906,22 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.slow.io.warning.threshold.ms</name>
<value>30000</value>
<description>The threshold in milliseconds at which we will log a slow
io warning in a dfsclient. By default, this parameter is set to 30000
milliseconds (30 seconds).
</description>
</property>
<property>
<name>dfs.datanode.slow.io.warning.threshold.ms</name>
<value>300</value>
<description>The threshold in milliseconds at which we will log a slow
io warning in a datanode. By default, this parameter is set to 300
milliseconds.
</description>
</property>
</configuration> </configuration>