HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang
This commit is contained in:
parent
1192781a78
commit
011f3b24d4
|
@ -17,22 +17,33 @@
|
|||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This is a wrap class of a ReentrantLock. Extending AutoCloseable
|
||||
* interface such that the users can use a try-with-resource syntax.
|
||||
*/
|
||||
public class AutoCloseableLock implements AutoCloseable {
|
||||
|
||||
private final ReentrantLock lock;
|
||||
private final Lock lock;
|
||||
|
||||
/**
|
||||
* Creates an instance of {@code AutoCloseableLock}, initializes
|
||||
* the underlying {@code ReentrantLock} object.
|
||||
* the underlying lock instance with a new {@code ReentrantLock}.
|
||||
*/
|
||||
public AutoCloseableLock() {
|
||||
this.lock = new ReentrantLock();
|
||||
this(new ReentrantLock());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap provided Lock instance.
|
||||
* @param lock Lock instance to wrap in AutoCloseable API.
|
||||
*/
|
||||
public AutoCloseableLock(Lock lock) {
|
||||
this.lock = lock;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -86,7 +97,7 @@ public class AutoCloseableLock implements AutoCloseable {
|
|||
|
||||
/**
|
||||
* A wrapper method that makes a call to {@code tryLock()} of
|
||||
* the underlying {@code ReentrantLock} object.
|
||||
* the underlying {@code Lock} object.
|
||||
*
|
||||
* If the lock is not held by another thread, acquires the lock, set the
|
||||
* hold count to one and returns {@code true}.
|
||||
|
@ -116,7 +127,12 @@ public class AutoCloseableLock implements AutoCloseable {
|
|||
* @return {@code true} if any thread holds this lock and
|
||||
* {@code false} otherwise
|
||||
*/
|
||||
public boolean isLocked() {
|
||||
return lock.isLocked();
|
||||
@VisibleForTesting
|
||||
boolean isLocked() {
|
||||
if (lock instanceof ReentrantLock) {
|
||||
return ((ReentrantLock)lock).isLocked();
|
||||
}
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -419,6 +419,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY =
|
||||
"dfs.namenode.read-lock-reporting-threshold-ms";
|
||||
public static final long DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
|
||||
// Threshold for how long the lock warnings must be suppressed
|
||||
public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY =
|
||||
"dfs.lock.suppress.warning.interval";
|
||||
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
|
||||
10000; //ms
|
||||
|
||||
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
|
||||
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
|
||||
|
|
|
@ -0,0 +1,185 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This is a debugging class that can be used by callers to track
|
||||
* whether a specifc lock is being held for too long and periodically
|
||||
* log a warning and stack trace, if so.
|
||||
*
|
||||
* The logged warnings are throttled so that logs are not spammed.
|
||||
*
|
||||
* A new instance of InstrumentedLock can be created for each object
|
||||
* that needs to be instrumented.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class InstrumentedLock implements Lock {
|
||||
|
||||
private final Lock lock;
|
||||
private final Log logger;
|
||||
private final String name;
|
||||
private final Timer clock;
|
||||
|
||||
/** Minimum gap between two lock warnings. */
|
||||
private final long minLoggingGap;
|
||||
/** Threshold for detecting long lock held time. */
|
||||
private final long lockWarningThreshold;
|
||||
|
||||
// Tracking counters for lock statistics.
|
||||
private volatile long lockAcquireTimestamp;
|
||||
private final AtomicLong lastLogTimestamp;
|
||||
private final AtomicLong warningsSuppressed = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* Create a instrumented lock instance which logs a warning message
|
||||
* when lock held time is above given threshold.
|
||||
*
|
||||
* @param name the identifier of the lock object
|
||||
* @param logger this class does not have its own logger, will log to the
|
||||
* given logger instead
|
||||
* @param minLoggingGapMs the minimum time gap between two log messages,
|
||||
* this is to avoid spamming to many logs
|
||||
* @param lockWarningThresholdMs the time threshold to view lock held
|
||||
* time as being "too long"
|
||||
*/
|
||||
public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
|
||||
long lockWarningThresholdMs) {
|
||||
this(name, logger, new ReentrantLock(),
|
||||
minLoggingGapMs, lockWarningThresholdMs);
|
||||
}
|
||||
|
||||
public InstrumentedLock(String name, Log logger, Lock lock,
|
||||
long minLoggingGapMs, long lockWarningThresholdMs) {
|
||||
this(name, logger, lock,
|
||||
minLoggingGapMs, lockWarningThresholdMs, new Timer());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
InstrumentedLock(String name, Log logger, Lock lock,
|
||||
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
|
||||
this.name = name;
|
||||
this.lock = lock;
|
||||
this.clock = clock;
|
||||
this.logger = logger;
|
||||
minLoggingGap = minLoggingGapMs;
|
||||
lockWarningThreshold = lockWarningThresholdMs;
|
||||
lastLogTimestamp = new AtomicLong(
|
||||
clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lock() {
|
||||
lock.lock();
|
||||
lockAcquireTimestamp = clock.monotonicNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lockInterruptibly() throws InterruptedException {
|
||||
lock.lockInterruptibly();
|
||||
lockAcquireTimestamp = clock.monotonicNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock() {
|
||||
if (lock.tryLock()) {
|
||||
lockAcquireTimestamp = clock.monotonicNow();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
||||
if (lock.tryLock(time, unit)) {
|
||||
lockAcquireTimestamp = clock.monotonicNow();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock() {
|
||||
long localLockReleaseTime = clock.monotonicNow();
|
||||
long localLockAcquireTime = lockAcquireTimestamp;
|
||||
lock.unlock();
|
||||
check(localLockAcquireTime, localLockReleaseTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Condition newCondition() {
|
||||
return lock.newCondition();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void logWarning(long lockHeldTime, long suppressed) {
|
||||
logger.warn(String.format("Lock held time above threshold: " +
|
||||
"lock identifier: %s " +
|
||||
"lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
|
||||
"The stack trace is: %s" ,
|
||||
name, lockHeldTime, suppressed,
|
||||
StringUtils.getStackTrace(Thread.currentThread())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a warning if the lock was held for too long.
|
||||
*
|
||||
* Should be invoked by the caller immediately AFTER releasing the lock.
|
||||
*
|
||||
* @param acquireTime - timestamp just after acquiring the lock.
|
||||
* @param releaseTime - timestamp just before releasing the lock.
|
||||
*/
|
||||
private void check(long acquireTime, long releaseTime) {
|
||||
if (!logger.isWarnEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long lockHeldTime = releaseTime - acquireTime;
|
||||
if (lockWarningThreshold - lockHeldTime < 0) {
|
||||
long now;
|
||||
long localLastLogTs;
|
||||
do {
|
||||
now = clock.monotonicNow();
|
||||
localLastLogTs = lastLogTimestamp.get();
|
||||
long deltaSinceLastLog = now - localLastLogTs;
|
||||
// check should print log or not
|
||||
if (deltaSinceLastLog - minLoggingGap < 0) {
|
||||
warningsSuppressed.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
} while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
|
||||
long suppressed = warningsSuppressed.getAndSet(0);
|
||||
logWarning(lockHeldTime, suppressed);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -40,6 +40,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
|
@ -60,6 +61,7 @@ import org.apache.hadoop.fs.StorageType;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.InstrumentedLock;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
|
@ -278,7 +280,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
this.dataStorage = storage;
|
||||
this.conf = conf;
|
||||
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
||||
this.datasetLock = new AutoCloseableLock();
|
||||
this.datasetLock = new AutoCloseableLock(
|
||||
new InstrumentedLock(getClass().getName(), LOG,
|
||||
conf.getTimeDuration(
|
||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS),
|
||||
300));
|
||||
// The number of volumes required for operation is the total number
|
||||
// of volumes minus the number of failed volumes we can tolerate.
|
||||
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
|
||||
|
|
|
@ -4273,4 +4273,12 @@
|
|||
a plan.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.lock.suppress.warning.interval</name>
|
||||
<value>10s</value>
|
||||
<description>Instrumentation reporting long critical sections will suppress
|
||||
consecutive warnings within this interval.</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* A test class for InstrumentedLock.
|
||||
*/
|
||||
public class TestInstrumentedLock {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
|
||||
|
||||
@Rule public TestName name = new TestName();
|
||||
|
||||
/**
|
||||
* Test exclusive access of the lock.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=10000)
|
||||
public void testMultipleThread() throws Exception {
|
||||
String testname = name.getMethodName();
|
||||
InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
|
||||
lock.lock();
|
||||
try {
|
||||
Thread competingThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertFalse(lock.tryLock());
|
||||
}
|
||||
};
|
||||
competingThread.start();
|
||||
competingThread.join();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the correctness with try-with-resource syntax.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=10000)
|
||||
public void testTryWithResourceSyntax() throws Exception {
|
||||
String testname = name.getMethodName();
|
||||
final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
|
||||
Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
|
||||
@Override
|
||||
public void lock() {
|
||||
super.lock();
|
||||
lockThread.set(Thread.currentThread());
|
||||
}
|
||||
@Override
|
||||
public void unlock() {
|
||||
super.unlock();
|
||||
lockThread.set(null);
|
||||
}
|
||||
};
|
||||
AutoCloseableLock acl = new AutoCloseableLock(lock);
|
||||
try (AutoCloseable localLock = acl.acquire()) {
|
||||
assertEquals(acl, localLock);
|
||||
Thread competingThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertNotEquals(Thread.currentThread(), lockThread.get());
|
||||
assertFalse(lock.tryLock());
|
||||
}
|
||||
};
|
||||
competingThread.start();
|
||||
competingThread.join();
|
||||
assertEquals(Thread.currentThread(), lockThread.get());
|
||||
}
|
||||
assertNull(lockThread.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the lock logs warning when lock held time is greater than threshold
|
||||
* and not log warning otherwise.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=10000)
|
||||
public void testLockLongHoldingReport() throws Exception {
|
||||
String testname = name.getMethodName();
|
||||
final AtomicLong time = new AtomicLong(0);
|
||||
Timer mclock = new Timer() {
|
||||
@Override
|
||||
public long monotonicNow() {
|
||||
return time.get();
|
||||
}
|
||||
};
|
||||
Lock mlock = mock(Lock.class);
|
||||
|
||||
final AtomicLong wlogged = new AtomicLong(0);
|
||||
final AtomicLong wsuppresed = new AtomicLong(0);
|
||||
InstrumentedLock lock = new InstrumentedLock(
|
||||
testname, LOG, mlock, 2000, 300, mclock) {
|
||||
@Override
|
||||
void logWarning(long lockHeldTime, long suppressed) {
|
||||
wlogged.incrementAndGet();
|
||||
wsuppresed.set(suppressed);
|
||||
}
|
||||
};
|
||||
|
||||
// do not log warning when the lock held time is short
|
||||
lock.lock(); // t = 0
|
||||
time.set(200);
|
||||
lock.unlock(); // t = 200
|
||||
assertEquals(0, wlogged.get());
|
||||
assertEquals(0, wsuppresed.get());
|
||||
|
||||
lock.lock(); // t = 200
|
||||
time.set(700);
|
||||
lock.unlock(); // t = 700
|
||||
assertEquals(1, wlogged.get());
|
||||
assertEquals(0, wsuppresed.get());
|
||||
|
||||
// despite the lock held time is greater than threshold
|
||||
// suppress the log warning due to the logging gap
|
||||
// (not recorded in wsuppressed until next log message)
|
||||
lock.lock(); // t = 700
|
||||
time.set(1100);
|
||||
lock.unlock(); // t = 1100
|
||||
assertEquals(1, wlogged.get());
|
||||
assertEquals(0, wsuppresed.get());
|
||||
|
||||
// log a warning message when the lock held time is greater the threshold
|
||||
// and the logging time gap is satisfied. Also should display suppressed
|
||||
// previous warnings.
|
||||
time.set(2400);
|
||||
lock.lock(); // t = 2400
|
||||
time.set(2800);
|
||||
lock.unlock(); // t = 2800
|
||||
assertEquals(2, wlogged.get());
|
||||
assertEquals(1, wsuppresed.get());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue