HDFS-10923. Make InstrumentedLock require ReentrantLock.

This commit is contained in:
Arpit Agarwal 2016-09-30 23:00:06 -07:00
parent b1eb21e940
commit 9d4e0168d5
3 changed files with 65 additions and 44 deletions

View File

@ -33,7 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
/** /**
* This is a debugging class that can be used by callers to track * This is a debugging class that can be used by callers to track
* whether a specifc lock is being held for too long and periodically * whether a specific lock is being held for too long and periodically
* log a warning and stack trace, if so. * log a warning and stack trace, if so.
* *
* The logged warnings are throttled so that logs are not spammed. * The logged warnings are throttled so that logs are not spammed.
@ -43,9 +43,10 @@ import com.google.common.annotations.VisibleForTesting;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class InstrumentedLock implements Lock { public class InstrumentedReentrantLock implements Lock {
private final Lock lock; @VisibleForTesting
final ReentrantLock lock;
private final Log logger; private final Log logger;
private final String name; private final String name;
private final Timer clock; private final Timer clock;
@ -72,20 +73,23 @@ public class InstrumentedLock implements Lock {
* @param lockWarningThresholdMs the time threshold to view lock held * @param lockWarningThresholdMs the time threshold to view lock held
* time as being "too long" * time as being "too long"
*/ */
public InstrumentedLock(String name, Log logger, long minLoggingGapMs, public InstrumentedReentrantLock(
String name, Log logger, long minLoggingGapMs,
long lockWarningThresholdMs) { long lockWarningThresholdMs) {
this(name, logger, new ReentrantLock(), this(name, logger, new ReentrantLock(),
minLoggingGapMs, lockWarningThresholdMs); minLoggingGapMs, lockWarningThresholdMs);
} }
public InstrumentedLock(String name, Log logger, Lock lock, public InstrumentedReentrantLock(
String name, Log logger, ReentrantLock lock,
long minLoggingGapMs, long lockWarningThresholdMs) { long minLoggingGapMs, long lockWarningThresholdMs) {
this(name, logger, lock, this(name, logger, lock,
minLoggingGapMs, lockWarningThresholdMs, new Timer()); minLoggingGapMs, lockWarningThresholdMs, new Timer());
} }
@VisibleForTesting @VisibleForTesting
InstrumentedLock(String name, Log logger, Lock lock, InstrumentedReentrantLock(
String name, Log logger, ReentrantLock lock,
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) { long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
this.name = name; this.name = name;
this.lock = lock; this.lock = lock;
@ -100,18 +104,22 @@ public class InstrumentedLock implements Lock {
@Override @Override
public void lock() { public void lock() {
lock.lock(); lock.lock();
lockAcquireTimestamp = clock.monotonicNow(); if (lock.getHoldCount() == 1) {
lockAcquireTimestamp = clock.monotonicNow();
}
} }
@Override @Override
public void lockInterruptibly() throws InterruptedException { public void lockInterruptibly() throws InterruptedException {
lock.lockInterruptibly(); lock.lockInterruptibly();
lockAcquireTimestamp = clock.monotonicNow(); if (lock.getHoldCount() == 1) {
lockAcquireTimestamp = clock.monotonicNow();
}
} }
@Override @Override
public boolean tryLock() { public boolean tryLock() {
if (lock.tryLock()) { if (lock.tryLock() && lock.getHoldCount() == 1) {
lockAcquireTimestamp = clock.monotonicNow(); lockAcquireTimestamp = clock.monotonicNow();
return true; return true;
} }
@ -120,7 +128,7 @@ public class InstrumentedLock implements Lock {
@Override @Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (lock.tryLock(time, unit)) { if (lock.tryLock(time, unit) && lock.getHoldCount() == 1) {
lockAcquireTimestamp = clock.monotonicNow(); lockAcquireTimestamp = clock.monotonicNow();
return true; return true;
} }
@ -129,10 +137,13 @@ public class InstrumentedLock implements Lock {
@Override @Override
public void unlock() { public void unlock() {
final boolean needReport = (lock.getHoldCount() == 1);
long localLockReleaseTime = clock.monotonicNow(); long localLockReleaseTime = clock.monotonicNow();
long localLockAcquireTime = lockAcquireTimestamp; long localLockAcquireTime = lockAcquireTimestamp;
lock.unlock(); lock.unlock();
check(localLockAcquireTime, localLockReleaseTime); if (needReport) {
check(localLockAcquireTime, localLockReleaseTime);
}
} }
@Override @Override
@ -181,5 +192,4 @@ public class InstrumentedLock implements Lock {
logWarning(lockHeldTime, suppressed); logWarning(lockHeldTime, suppressed);
} }
} }
} }

View File

@ -64,7 +64,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.InstrumentedLock; import org.apache.hadoop.hdfs.InstrumentedReentrantLock;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -286,7 +286,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.conf = conf; this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetLock = new AutoCloseableLock( this.datasetLock = new AutoCloseableLock(
new InstrumentedLock(getClass().getName(), LOG, new InstrumentedReentrantLock(getClass().getName(), LOG,
conf.getTimeDuration( conf.getTimeDuration(
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,

View File

@ -20,9 +20,10 @@ package org.apache.hadoop.hdfs;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Timer; import org.apache.hadoop.util.FakeTimer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -34,11 +35,11 @@ import static org.mockito.Mockito.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
* A test class for InstrumentedLock. * A test class for {@link InstrumentedReentrantLock}.
*/ */
public class TestInstrumentedLock { public class TestInstrumentedReentrantLock {
static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class); static final Log LOG = LogFactory.getLog(TestInstrumentedReentrantLock.class);
@Rule public TestName name = new TestName(); @Rule public TestName name = new TestName();
@ -49,7 +50,8 @@ public class TestInstrumentedLock {
@Test(timeout=10000) @Test(timeout=10000)
public void testMultipleThread() throws Exception { public void testMultipleThread() throws Exception {
String testname = name.getMethodName(); String testname = name.getMethodName();
final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300); final InstrumentedReentrantLock lock =
new InstrumentedReentrantLock(testname, LOG, 0, 300);
lock.lock(); lock.lock();
try { try {
Thread competingThread = new Thread() { Thread competingThread = new Thread() {
@ -73,7 +75,7 @@ public class TestInstrumentedLock {
public void testTryWithResourceSyntax() throws Exception { public void testTryWithResourceSyntax() throws Exception {
String testname = name.getMethodName(); String testname = name.getMethodName();
final AtomicReference<Thread> lockThread = new AtomicReference<>(null); final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) { final Lock lock = new InstrumentedReentrantLock(testname, LOG, 0, 300) {
@Override @Override
public void lock() { public void lock() {
super.lock(); super.lock();
@ -110,19 +112,15 @@ public class TestInstrumentedLock {
@Test(timeout=10000) @Test(timeout=10000)
public void testLockLongHoldingReport() throws Exception { public void testLockLongHoldingReport() throws Exception {
String testname = name.getMethodName(); String testname = name.getMethodName();
final AtomicLong time = new AtomicLong(0); FakeTimer mclock = new FakeTimer();
Timer mclock = new Timer() { final int warningThreshold = 500;
@Override final int minLoggingGap = warningThreshold * 10;
public long monotonicNow() {
return time.get();
}
};
Lock mlock = mock(Lock.class);
final AtomicLong wlogged = new AtomicLong(0); final AtomicLong wlogged = new AtomicLong(0);
final AtomicLong wsuppresed = new AtomicLong(0); final AtomicLong wsuppresed = new AtomicLong(0);
InstrumentedLock lock = new InstrumentedLock( InstrumentedReentrantLock lock = new InstrumentedReentrantLock(
testname, LOG, mlock, 2000, 300, mclock) { testname, LOG, new ReentrantLock(), minLoggingGap,
warningThreshold, mclock) {
@Override @Override
void logWarning(long lockHeldTime, long suppressed) { void logWarning(long lockHeldTime, long suppressed) {
wlogged.incrementAndGet(); wlogged.incrementAndGet();
@ -130,37 +128,50 @@ public class TestInstrumentedLock {
} }
}; };
// do not log warning when the lock held time is short // do not log warning when the lock held time is <= warningThreshold.
lock.lock(); // t = 0 lock.lock();
time.set(200); mclock.advance(warningThreshold);
lock.unlock(); // t = 200 lock.unlock();
assertEquals(0, wlogged.get()); assertEquals(0, wlogged.get());
assertEquals(0, wsuppresed.get()); assertEquals(0, wsuppresed.get());
lock.lock(); // t = 200 // log a warning when the lock held time exceeds the threshold.
time.set(700); lock.lock();
lock.unlock(); // t = 700 mclock.advance(warningThreshold + 1);
assertEquals(1, lock.lock.getHoldCount());
lock.unlock();
assertEquals(1, wlogged.get()); assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get()); assertEquals(0, wsuppresed.get());
// despite the lock held time is greater than threshold // despite the lock held time is greater than threshold
// suppress the log warning due to the logging gap // suppress the log warning due to the logging gap
// (not recorded in wsuppressed until next log message) // (not recorded in wsuppressed until next log message)
lock.lock(); // t = 700 lock.lock();
time.set(1100); mclock.advance(warningThreshold + 1);
lock.unlock(); // t = 1100 lock.unlock();
assertEquals(1, wlogged.get()); assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get()); assertEquals(0, wsuppresed.get());
// log a warning message when the lock held time is greater the threshold // log a warning message when the lock held time is greater the threshold
// and the logging time gap is satisfied. Also should display suppressed // and the logging time gap is satisfied. Also should display suppressed
// previous warnings. // previous warnings.
time.set(2400); lock.lock();
lock.lock(); // t = 2400 mclock.advance(minLoggingGap + 1);
time.set(2800);
lock.unlock(); // t = 2800 lock.unlock(); // t = 2800
assertEquals(2, wlogged.get()); assertEquals(2, wlogged.get());
assertEquals(1, wsuppresed.get()); assertEquals(1, wsuppresed.get());
}
// Ensure that nested acquisitions do not log.
wlogged.set(0);
wsuppresed.set(0);
lock.lock();
lock.lock();
mclock.advance(minLoggingGap + 1);
lock.unlock();
assertEquals(0, wlogged.get()); // No warnings on nested release.
assertEquals(0, wsuppresed.get());
lock.unlock();
assertEquals(1, wlogged.get()); // Last release immediately logs.
assertEquals(0, wsuppresed.get());
}
} }