HDFS-4621. Additional logging to help diagnose slow QJM syncs. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461777 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d0bbff6c32
commit
81192e4e41
|
@ -371,6 +371,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
|
|
||||||
HDFS-4635. Move BlockManager#computeCapacity to LightWeightGSet. (suresh)
|
HDFS-4635. Move BlockManager#computeCapacity to LightWeightGSet. (suresh)
|
||||||
|
|
||||||
|
HDFS-4621. Additional logging to help diagnose slow QJM syncs. (todd)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -133,6 +133,8 @@ public class IPCLoggerChannel implements AsyncLogger {
|
||||||
private Stopwatch lastHeartbeatStopwatch = new Stopwatch();
|
private Stopwatch lastHeartbeatStopwatch = new Stopwatch();
|
||||||
|
|
||||||
private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
|
private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
|
||||||
|
|
||||||
|
private static final long WARN_JOURNAL_MILLIS_THRESHOLD = 1000;
|
||||||
|
|
||||||
static final Factory FACTORY = new AsyncLogger.Factory() {
|
static final Factory FACTORY = new AsyncLogger.Factory() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -371,6 +373,12 @@ public class IPCLoggerChannel implements AsyncLogger {
|
||||||
now - submitNanos, TimeUnit.NANOSECONDS);
|
now - submitNanos, TimeUnit.NANOSECONDS);
|
||||||
metrics.addWriteEndToEndLatency(endToEndTime);
|
metrics.addWriteEndToEndLatency(endToEndTime);
|
||||||
metrics.addWriteRpcLatency(rpcTime);
|
metrics.addWriteRpcLatency(rpcTime);
|
||||||
|
if (rpcTime / 1000 > WARN_JOURNAL_MILLIS_THRESHOLD) {
|
||||||
|
QuorumJournalManager.LOG.warn(
|
||||||
|
"Took " + (rpcTime / 1000) + "ms to send a batch of " +
|
||||||
|
numTxns + " edits (" + data.length + " bytes) to " +
|
||||||
|
"remote journal " + IPCLoggerChannel.this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
synchronized (IPCLoggerChannel.this) {
|
synchronized (IPCLoggerChannel.this) {
|
||||||
highestAckedTxId = firstTxnId + numTxns - 1;
|
highestAckedTxId = firstTxnId + numTxns - 1;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
@ -120,6 +121,15 @@ class QuorumCall<KEY, RESULT> {
|
||||||
String msg = String.format(
|
String msg = String.format(
|
||||||
"Waited %s ms (timeout=%s ms) for a response for %s",
|
"Waited %s ms (timeout=%s ms) for a response for %s",
|
||||||
waited, millis, operationName);
|
waited, millis, operationName);
|
||||||
|
if (!successes.isEmpty()) {
|
||||||
|
msg += ". Succeeded so far: [" + Joiner.on(",").join(successes.keySet()) + "]";
|
||||||
|
}
|
||||||
|
if (!exceptions.isEmpty()) {
|
||||||
|
msg += ". Exceptions so far: [" + getExceptionMapString() + "]";
|
||||||
|
}
|
||||||
|
if (successes.isEmpty() && exceptions.isEmpty()) {
|
||||||
|
msg += ". No responses yet.";
|
||||||
|
}
|
||||||
if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
|
if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
|
||||||
QuorumJournalManager.LOG.warn(msg);
|
QuorumJournalManager.LOG.warn(msg);
|
||||||
} else {
|
} else {
|
||||||
|
@ -227,4 +237,22 @@ class QuorumCall<KEY, RESULT> {
|
||||||
}
|
}
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a string suitable for displaying to the user, containing
|
||||||
|
* any exceptions that have been received so far.
|
||||||
|
*/
|
||||||
|
private String getExceptionMapString() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
boolean first = true;
|
||||||
|
for (Map.Entry<KEY, Throwable> e : exceptions.entrySet()) {
|
||||||
|
if (!first) {
|
||||||
|
sb.append(", ");
|
||||||
|
}
|
||||||
|
first = false;
|
||||||
|
sb.append(e.getKey()).append(": ")
|
||||||
|
.append(e.getValue().getLocalizedMessage());
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,6 +128,10 @@ class Journal implements Closeable {
|
||||||
|
|
||||||
private final JournalMetrics metrics;
|
private final JournalMetrics metrics;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time threshold for sync calls, beyond which a warning should be logged to the console.
|
||||||
|
*/
|
||||||
|
private static final int WARN_SYNC_MILLIS_THRESHOLD = 1000;
|
||||||
|
|
||||||
Journal(File logDir, String journalId,
|
Journal(File logDir, String journalId,
|
||||||
StorageErrorReporter errorReporter) throws IOException {
|
StorageErrorReporter errorReporter) throws IOException {
|
||||||
|
@ -370,6 +374,10 @@ class Journal implements Closeable {
|
||||||
sw.stop();
|
sw.stop();
|
||||||
|
|
||||||
metrics.addSync(sw.elapsedTime(TimeUnit.MICROSECONDS));
|
metrics.addSync(sw.elapsedTime(TimeUnit.MICROSECONDS));
|
||||||
|
if (sw.elapsedTime(TimeUnit.MILLISECONDS) > WARN_SYNC_MILLIS_THRESHOLD) {
|
||||||
|
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
|
||||||
|
" took " + sw.elapsedTime(TimeUnit.MILLISECONDS) + "ms");
|
||||||
|
}
|
||||||
|
|
||||||
if (isLagging) {
|
if (isLagging) {
|
||||||
// This batch of edits has already been committed on a quorum of other
|
// This batch of edits has already been committed on a quorum of other
|
||||||
|
|
Loading…
Reference in New Issue