HDFS-10923. Make InstrumentedLock require ReentrantLock.
This commit is contained in:
parent
3a3697deab
commit
c7ce6fdc20
|
@ -33,7 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a debugging class that can be used by callers to track
|
* This is a debugging class that can be used by callers to track
|
||||||
* whether a specifc lock is being held for too long and periodically
|
* whether a specific lock is being held for too long and periodically
|
||||||
* log a warning and stack trace, if so.
|
* log a warning and stack trace, if so.
|
||||||
*
|
*
|
||||||
* The logged warnings are throttled so that logs are not spammed.
|
* The logged warnings are throttled so that logs are not spammed.
|
||||||
|
@ -43,9 +43,10 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class InstrumentedLock implements Lock {
|
public class InstrumentedReentrantLock implements Lock {
|
||||||
|
|
||||||
private final Lock lock;
|
@VisibleForTesting
|
||||||
|
final ReentrantLock lock;
|
||||||
private final Log logger;
|
private final Log logger;
|
||||||
private final String name;
|
private final String name;
|
||||||
private final Timer clock;
|
private final Timer clock;
|
||||||
|
@ -72,20 +73,23 @@ public class InstrumentedLock implements Lock {
|
||||||
* @param lockWarningThresholdMs the time threshold to view lock held
|
* @param lockWarningThresholdMs the time threshold to view lock held
|
||||||
* time as being "too long"
|
* time as being "too long"
|
||||||
*/
|
*/
|
||||||
public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
|
public InstrumentedReentrantLock(
|
||||||
|
String name, Log logger, long minLoggingGapMs,
|
||||||
long lockWarningThresholdMs) {
|
long lockWarningThresholdMs) {
|
||||||
this(name, logger, new ReentrantLock(),
|
this(name, logger, new ReentrantLock(),
|
||||||
minLoggingGapMs, lockWarningThresholdMs);
|
minLoggingGapMs, lockWarningThresholdMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public InstrumentedLock(String name, Log logger, Lock lock,
|
public InstrumentedReentrantLock(
|
||||||
|
String name, Log logger, ReentrantLock lock,
|
||||||
long minLoggingGapMs, long lockWarningThresholdMs) {
|
long minLoggingGapMs, long lockWarningThresholdMs) {
|
||||||
this(name, logger, lock,
|
this(name, logger, lock,
|
||||||
minLoggingGapMs, lockWarningThresholdMs, new Timer());
|
minLoggingGapMs, lockWarningThresholdMs, new Timer());
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
InstrumentedLock(String name, Log logger, Lock lock,
|
InstrumentedReentrantLock(
|
||||||
|
String name, Log logger, ReentrantLock lock,
|
||||||
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
|
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.lock = lock;
|
this.lock = lock;
|
||||||
|
@ -100,18 +104,22 @@ public class InstrumentedLock implements Lock {
|
||||||
@Override
|
@Override
|
||||||
public void lock() {
|
public void lock() {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
|
if (lock.getHoldCount() == 1) {
|
||||||
lockAcquireTimestamp = clock.monotonicNow();
|
lockAcquireTimestamp = clock.monotonicNow();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void lockInterruptibly() throws InterruptedException {
|
public void lockInterruptibly() throws InterruptedException {
|
||||||
lock.lockInterruptibly();
|
lock.lockInterruptibly();
|
||||||
|
if (lock.getHoldCount() == 1) {
|
||||||
lockAcquireTimestamp = clock.monotonicNow();
|
lockAcquireTimestamp = clock.monotonicNow();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean tryLock() {
|
public boolean tryLock() {
|
||||||
if (lock.tryLock()) {
|
if (lock.tryLock() && lock.getHoldCount() == 1) {
|
||||||
lockAcquireTimestamp = clock.monotonicNow();
|
lockAcquireTimestamp = clock.monotonicNow();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -120,7 +128,7 @@ public class InstrumentedLock implements Lock {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
||||||
if (lock.tryLock(time, unit)) {
|
if (lock.tryLock(time, unit) && lock.getHoldCount() == 1) {
|
||||||
lockAcquireTimestamp = clock.monotonicNow();
|
lockAcquireTimestamp = clock.monotonicNow();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -129,11 +137,14 @@ public class InstrumentedLock implements Lock {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void unlock() {
|
public void unlock() {
|
||||||
|
final boolean needReport = (lock.getHoldCount() == 1);
|
||||||
long localLockReleaseTime = clock.monotonicNow();
|
long localLockReleaseTime = clock.monotonicNow();
|
||||||
long localLockAcquireTime = lockAcquireTimestamp;
|
long localLockAcquireTime = lockAcquireTimestamp;
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
if (needReport) {
|
||||||
check(localLockAcquireTime, localLockReleaseTime);
|
check(localLockAcquireTime, localLockReleaseTime);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Condition newCondition() {
|
public Condition newCondition() {
|
||||||
|
@ -181,5 +192,4 @@ public class InstrumentedLock implements Lock {
|
||||||
logWarning(lockHeldTime, suppressed);
|
logWarning(lockHeldTime, suppressed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -58,7 +58,7 @@ import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.InstrumentedLock;
|
import org.apache.hadoop.hdfs.InstrumentedReentrantLock;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
|
@ -266,7 +266,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
||||||
this.datasetLock = new AutoCloseableLock(
|
this.datasetLock = new AutoCloseableLock(
|
||||||
new InstrumentedLock(getClass().getName(), LOG,
|
new InstrumentedReentrantLock(getClass().getName(), LOG,
|
||||||
conf.getTimeDuration(
|
conf.getTimeDuration(
|
||||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
|
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
|
||||||
|
|
|
@ -20,9 +20,10 @@ package org.apache.hadoop.hdfs;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
import org.apache.hadoop.util.Timer;
|
import org.apache.hadoop.util.FakeTimer;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -34,11 +35,11 @@ import static org.mockito.Mockito.*;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A test class for InstrumentedLock.
|
* A test class for {@link InstrumentedReentrantLock}.
|
||||||
*/
|
*/
|
||||||
public class TestInstrumentedLock {
|
public class TestInstrumentedReentrantLock {
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
|
static final Log LOG = LogFactory.getLog(TestInstrumentedReentrantLock.class);
|
||||||
|
|
||||||
@Rule public TestName name = new TestName();
|
@Rule public TestName name = new TestName();
|
||||||
|
|
||||||
|
@ -49,7 +50,8 @@ public class TestInstrumentedLock {
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testMultipleThread() throws Exception {
|
public void testMultipleThread() throws Exception {
|
||||||
String testname = name.getMethodName();
|
String testname = name.getMethodName();
|
||||||
InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
|
InstrumentedReentrantLock lock =
|
||||||
|
new InstrumentedReentrantLock(testname, LOG, 0, 300);
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
Thread competingThread = new Thread() {
|
Thread competingThread = new Thread() {
|
||||||
|
@ -73,7 +75,7 @@ public class TestInstrumentedLock {
|
||||||
public void testTryWithResourceSyntax() throws Exception {
|
public void testTryWithResourceSyntax() throws Exception {
|
||||||
String testname = name.getMethodName();
|
String testname = name.getMethodName();
|
||||||
final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
|
final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
|
||||||
Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
|
Lock lock = new InstrumentedReentrantLock(testname, LOG, 0, 300) {
|
||||||
@Override
|
@Override
|
||||||
public void lock() {
|
public void lock() {
|
||||||
super.lock();
|
super.lock();
|
||||||
|
@ -110,19 +112,15 @@ public class TestInstrumentedLock {
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testLockLongHoldingReport() throws Exception {
|
public void testLockLongHoldingReport() throws Exception {
|
||||||
String testname = name.getMethodName();
|
String testname = name.getMethodName();
|
||||||
final AtomicLong time = new AtomicLong(0);
|
FakeTimer mclock = new FakeTimer();
|
||||||
Timer mclock = new Timer() {
|
final int warningThreshold = 500;
|
||||||
@Override
|
final int minLoggingGap = warningThreshold * 10;
|
||||||
public long monotonicNow() {
|
|
||||||
return time.get();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Lock mlock = mock(Lock.class);
|
|
||||||
|
|
||||||
final AtomicLong wlogged = new AtomicLong(0);
|
final AtomicLong wlogged = new AtomicLong(0);
|
||||||
final AtomicLong wsuppresed = new AtomicLong(0);
|
final AtomicLong wsuppresed = new AtomicLong(0);
|
||||||
InstrumentedLock lock = new InstrumentedLock(
|
InstrumentedReentrantLock lock = new InstrumentedReentrantLock(
|
||||||
testname, LOG, mlock, 2000, 300, mclock) {
|
testname, LOG, new ReentrantLock(), minLoggingGap,
|
||||||
|
warningThreshold, mclock) {
|
||||||
@Override
|
@Override
|
||||||
void logWarning(long lockHeldTime, long suppressed) {
|
void logWarning(long lockHeldTime, long suppressed) {
|
||||||
wlogged.incrementAndGet();
|
wlogged.incrementAndGet();
|
||||||
|
@ -130,37 +128,50 @@ public class TestInstrumentedLock {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// do not log warning when the lock held time is short
|
// do not log warning when the lock held time is <= warningThreshold.
|
||||||
lock.lock(); // t = 0
|
lock.lock();
|
||||||
time.set(200);
|
mclock.advance(warningThreshold);
|
||||||
lock.unlock(); // t = 200
|
lock.unlock();
|
||||||
assertEquals(0, wlogged.get());
|
assertEquals(0, wlogged.get());
|
||||||
assertEquals(0, wsuppresed.get());
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
lock.lock(); // t = 200
|
// log a warning when the lock held time exceeds the threshold.
|
||||||
time.set(700);
|
lock.lock();
|
||||||
lock.unlock(); // t = 700
|
mclock.advance(warningThreshold + 1);
|
||||||
|
assertEquals(1, lock.lock.getHoldCount());
|
||||||
|
lock.unlock();
|
||||||
assertEquals(1, wlogged.get());
|
assertEquals(1, wlogged.get());
|
||||||
assertEquals(0, wsuppresed.get());
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
// despite the lock held time is greater than threshold
|
// despite the lock held time is greater than threshold
|
||||||
// suppress the log warning due to the logging gap
|
// suppress the log warning due to the logging gap
|
||||||
// (not recorded in wsuppressed until next log message)
|
// (not recorded in wsuppressed until next log message)
|
||||||
lock.lock(); // t = 700
|
lock.lock();
|
||||||
time.set(1100);
|
mclock.advance(warningThreshold + 1);
|
||||||
lock.unlock(); // t = 1100
|
lock.unlock();
|
||||||
assertEquals(1, wlogged.get());
|
assertEquals(1, wlogged.get());
|
||||||
assertEquals(0, wsuppresed.get());
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
// log a warning message when the lock held time is greater the threshold
|
// log a warning message when the lock held time is greater the threshold
|
||||||
// and the logging time gap is satisfied. Also should display suppressed
|
// and the logging time gap is satisfied. Also should display suppressed
|
||||||
// previous warnings.
|
// previous warnings.
|
||||||
time.set(2400);
|
lock.lock();
|
||||||
lock.lock(); // t = 2400
|
mclock.advance(minLoggingGap + 1);
|
||||||
time.set(2800);
|
|
||||||
lock.unlock(); // t = 2800
|
lock.unlock(); // t = 2800
|
||||||
assertEquals(2, wlogged.get());
|
assertEquals(2, wlogged.get());
|
||||||
assertEquals(1, wsuppresed.get());
|
assertEquals(1, wsuppresed.get());
|
||||||
}
|
|
||||||
|
|
||||||
|
// Ensure that nested acquisitions do not log.
|
||||||
|
wlogged.set(0);
|
||||||
|
wsuppresed.set(0);
|
||||||
|
lock.lock();
|
||||||
|
lock.lock();
|
||||||
|
mclock.advance(minLoggingGap + 1);
|
||||||
|
lock.unlock();
|
||||||
|
assertEquals(0, wlogged.get()); // No warnings on nested release.
|
||||||
|
assertEquals(0, wsuppresed.get());
|
||||||
|
lock.unlock();
|
||||||
|
assertEquals(1, wlogged.get()); // Last release immediately logs.
|
||||||
|
assertEquals(0, wsuppresed.get());
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue