HADOOP-13702. Add instrumented ReadWriteLock. Contributed by Jingcheng Du
(cherry picked from commit ae8bccd509
)
This commit is contained in:
parent
385c1daa46
commit
25f4327f0b
|
@ -15,7 +15,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -26,14 +26,12 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.util.Timer;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a debugging class that can be used by callers to track
|
* This is a debugging class that can be used by callers to track
|
||||||
* whether a specifc lock is being held for too long and periodically
|
* whether a specific lock is being held for too long and periodically
|
||||||
* log a warning and stack trace, if so.
|
* log a warning and stack trace, if so.
|
||||||
*
|
*
|
||||||
* The logged warnings are throttled so that logs are not spammed.
|
* The logged warnings are throttled so that logs are not spammed.
|
||||||
|
@ -100,19 +98,19 @@ public class InstrumentedLock implements Lock {
|
||||||
@Override
|
@Override
|
||||||
public void lock() {
|
public void lock() {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
lockAcquireTimestamp = clock.monotonicNow();
|
startLockTiming();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void lockInterruptibly() throws InterruptedException {
|
public void lockInterruptibly() throws InterruptedException {
|
||||||
lock.lockInterruptibly();
|
lock.lockInterruptibly();
|
||||||
lockAcquireTimestamp = clock.monotonicNow();
|
startLockTiming();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean tryLock() {
|
public boolean tryLock() {
|
||||||
if (lock.tryLock()) {
|
if (lock.tryLock()) {
|
||||||
lockAcquireTimestamp = clock.monotonicNow();
|
startLockTiming();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -121,7 +119,7 @@ public class InstrumentedLock implements Lock {
|
||||||
@Override
|
@Override
|
||||||
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
||||||
if (lock.tryLock(time, unit)) {
|
if (lock.tryLock(time, unit)) {
|
||||||
lockAcquireTimestamp = clock.monotonicNow();
|
startLockTiming();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -150,6 +148,13 @@ public class InstrumentedLock implements Lock {
|
||||||
StringUtils.getStackTrace(Thread.currentThread())));
|
StringUtils.getStackTrace(Thread.currentThread())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts timing for the instrumented lock.
|
||||||
|
*/
|
||||||
|
protected void startLockTiming() {
|
||||||
|
lockAcquireTimestamp = clock.monotonicNow();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log a warning if the lock was held for too long.
|
* Log a warning if the lock was held for too long.
|
||||||
*
|
*
|
||||||
|
@ -158,7 +163,7 @@ public class InstrumentedLock implements Lock {
|
||||||
* @param acquireTime - timestamp just after acquiring the lock.
|
* @param acquireTime - timestamp just after acquiring the lock.
|
||||||
* @param releaseTime - timestamp just before releasing the lock.
|
* @param releaseTime - timestamp just before releasing the lock.
|
||||||
*/
|
*/
|
||||||
private void check(long acquireTime, long releaseTime) {
|
protected void check(long acquireTime, long releaseTime) {
|
||||||
if (!logger.isWarnEnabled()) {
|
if (!logger.isWarnEnabled()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -182,4 +187,11 @@ public class InstrumentedLock implements Lock {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Lock getLock() {
|
||||||
|
return lock;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Timer getTimer() {
|
||||||
|
return clock;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a wrap class of a <tt>ReadLock</tt>.
|
||||||
|
* It extends the class {@link InstrumentedLock}, and can be used to track
|
||||||
|
* whether a specific read lock is being held for too long and log
|
||||||
|
* warnings if so.
|
||||||
|
*
|
||||||
|
* The logged warnings are throttled so that logs are not spammed.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class InstrumentedReadLock extends InstrumentedLock {
|
||||||
|
|
||||||
|
private final ReentrantReadWriteLock readWriteLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uses the ThreadLocal to keep the time of acquiring locks since
|
||||||
|
* there can be multiple threads that hold the read lock concurrently.
|
||||||
|
*/
|
||||||
|
private final ThreadLocal<Long> readLockHeldTimeStamp =
|
||||||
|
new ThreadLocal<Long>() {
|
||||||
|
@Override
|
||||||
|
protected Long initialValue() {
|
||||||
|
return Long.MAX_VALUE;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
public InstrumentedReadLock(String name, Log logger,
|
||||||
|
ReentrantReadWriteLock readWriteLock,
|
||||||
|
long minLoggingGapMs, long lockWarningThresholdMs) {
|
||||||
|
this(name, logger, readWriteLock, minLoggingGapMs, lockWarningThresholdMs,
|
||||||
|
new Timer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
InstrumentedReadLock(String name, Log logger,
|
||||||
|
ReentrantReadWriteLock readWriteLock,
|
||||||
|
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
|
||||||
|
super(name, logger, readWriteLock.readLock(), minLoggingGapMs,
|
||||||
|
lockWarningThresholdMs, clock);
|
||||||
|
this.readWriteLock = readWriteLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unlock() {
|
||||||
|
boolean needReport = readWriteLock.getReadHoldCount() == 1;
|
||||||
|
long localLockReleaseTime = getTimer().monotonicNow();
|
||||||
|
long localLockAcquireTime = readLockHeldTimeStamp.get();
|
||||||
|
getLock().unlock();
|
||||||
|
if (needReport) {
|
||||||
|
readLockHeldTimeStamp.remove();
|
||||||
|
check(localLockAcquireTime, localLockReleaseTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts timing for the instrumented read lock.
|
||||||
|
* It records the time to ThreadLocal.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void startLockTiming() {
|
||||||
|
if (readWriteLock.getReadHoldCount() == 1) {
|
||||||
|
readLockHeldTimeStamp.set(getTimer().monotonicNow());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a wrap class of a {@link ReentrantReadWriteLock}.
|
||||||
|
* It implements the interface {@link ReadWriteLock}, and can be used to
|
||||||
|
* create instrumented <tt>ReadLock</tt> and <tt>WriteLock</tt>.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class InstrumentedReadWriteLock implements ReadWriteLock {
|
||||||
|
|
||||||
|
private final Lock readLock;
|
||||||
|
private final Lock writeLock;
|
||||||
|
|
||||||
|
InstrumentedReadWriteLock(boolean fair, String name, Log logger,
|
||||||
|
long minLoggingGapMs, long lockWarningThresholdMs) {
|
||||||
|
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
|
||||||
|
readLock = new InstrumentedReadLock(name, logger, readWriteLock,
|
||||||
|
minLoggingGapMs, lockWarningThresholdMs);
|
||||||
|
writeLock = new InstrumentedWriteLock(name, logger, readWriteLock,
|
||||||
|
minLoggingGapMs, lockWarningThresholdMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Lock readLock() {
|
||||||
|
return readLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Lock writeLock() {
|
||||||
|
return writeLock;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a wrap class of a <tt>WriteLock</tt>.
|
||||||
|
* It extends the class {@link InstrumentedLock}, and can be used to track
|
||||||
|
* whether a specific write lock is being held for too long and log
|
||||||
|
* warnings if so.
|
||||||
|
*
|
||||||
|
* The logged warnings are throttled so that logs are not spammed.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class InstrumentedWriteLock extends InstrumentedLock {
|
||||||
|
|
||||||
|
public InstrumentedWriteLock(String name, Log logger,
|
||||||
|
ReentrantReadWriteLock readWriteLock,
|
||||||
|
long minLoggingGapMs, long lockWarningThresholdMs) {
|
||||||
|
this(name, logger, readWriteLock, minLoggingGapMs, lockWarningThresholdMs,
|
||||||
|
new Timer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
InstrumentedWriteLock(String name, Log logger,
|
||||||
|
ReentrantReadWriteLock readWriteLock,
|
||||||
|
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
|
||||||
|
super(name, logger, readWriteLock.writeLock(), minLoggingGapMs,
|
||||||
|
lockWarningThresholdMs, clock);
|
||||||
|
}
|
||||||
|
}
|
|
@ -15,18 +15,14 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
|
||||||
import org.apache.hadoop.util.Timer;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
|
@ -0,0 +1,234 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A test class for InstrumentedReadLock and InstrumentedWriteLock.
|
||||||
|
*/
|
||||||
|
public class TestInstrumentedReadWriteLock {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TestInstrumentedReadWriteLock.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests exclusive access of the write lock.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testWriteLock() throws Exception {
|
||||||
|
String testname = name.getMethodName();
|
||||||
|
final ThreadLocal<Boolean> locked = new ThreadLocal<Boolean>();
|
||||||
|
locked.set(Boolean.FALSE);
|
||||||
|
InstrumentedReadWriteLock readWriteLock = new InstrumentedReadWriteLock(
|
||||||
|
true, testname, LOG, 2000, 300);
|
||||||
|
final AutoCloseableLock writeLock = new AutoCloseableLock(
|
||||||
|
readWriteLock.writeLock()) {
|
||||||
|
@Override
|
||||||
|
public AutoCloseableLock acquire() {
|
||||||
|
AutoCloseableLock lock = super.acquire();
|
||||||
|
locked.set(Boolean.TRUE);
|
||||||
|
return lock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void release() {
|
||||||
|
super.release();
|
||||||
|
locked.set(Boolean.FALSE);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final AutoCloseableLock readLock = new AutoCloseableLock(
|
||||||
|
readWriteLock.readLock());
|
||||||
|
try (AutoCloseableLock lock = writeLock.acquire()) {
|
||||||
|
Thread competingWriteThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
assertFalse(writeLock.tryLock());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
competingWriteThread.start();
|
||||||
|
competingWriteThread.join();
|
||||||
|
Thread competingReadThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
assertFalse(readLock.tryLock());
|
||||||
|
};
|
||||||
|
};
|
||||||
|
competingReadThread.start();
|
||||||
|
competingReadThread.join();
|
||||||
|
}
|
||||||
|
assertFalse(locked.get());
|
||||||
|
locked.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the read lock.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testReadLock() throws Exception {
|
||||||
|
String testname = name.getMethodName();
|
||||||
|
InstrumentedReadWriteLock readWriteLock = new InstrumentedReadWriteLock(
|
||||||
|
true, testname, LOG, 2000, 300);
|
||||||
|
final AutoCloseableLock readLock = new AutoCloseableLock(
|
||||||
|
readWriteLock.readLock());
|
||||||
|
final AutoCloseableLock writeLock = new AutoCloseableLock(
|
||||||
|
readWriteLock.writeLock());
|
||||||
|
try (AutoCloseableLock lock = readLock.acquire()) {
|
||||||
|
Thread competingReadThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
assertTrue(readLock.tryLock());
|
||||||
|
readLock.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
competingReadThread.start();
|
||||||
|
competingReadThread.join();
|
||||||
|
Thread competingWriteThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
assertFalse(writeLock.tryLock());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
competingWriteThread.start();
|
||||||
|
competingWriteThread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the warning when the read lock is held longer than threshold.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testReadLockLongHoldingReport() throws Exception {
|
||||||
|
String testname = name.getMethodName();
|
||||||
|
final AtomicLong time = new AtomicLong(0);
|
||||||
|
Timer mclock = new Timer() {
|
||||||
|
@Override
|
||||||
|
public long monotonicNow() {
|
||||||
|
return time.get();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
final AtomicLong wlogged = new AtomicLong(0);
|
||||||
|
final AtomicLong wsuppresed = new AtomicLong(0);
|
||||||
|
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
|
||||||
|
InstrumentedReadLock readLock = new InstrumentedReadLock(testname, LOG,
|
||||||
|
readWriteLock, 2000, 300, mclock) {
|
||||||
|
@Override
|
||||||
|
protected void logWarning(long lockHeldTime, long suppressed) {
|
||||||
|
wlogged.incrementAndGet();
|
||||||
|
wsuppresed.set(suppressed);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
readLock.lock(); // t = 0
|
||||||
|
time.set(100);
|
||||||
|
readLock.unlock(); // t = 100
|
||||||
|
assertEquals(0, wlogged.get());
|
||||||
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
|
readLock.lock(); // t = 100
|
||||||
|
time.set(500);
|
||||||
|
readLock.unlock(); // t = 500
|
||||||
|
assertEquals(1, wlogged.get());
|
||||||
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
|
// the suppress counting is only changed when
|
||||||
|
// log is needed in the test
|
||||||
|
readLock.lock(); // t = 500
|
||||||
|
time.set(900);
|
||||||
|
readLock.unlock(); // t = 900
|
||||||
|
assertEquals(1, wlogged.get());
|
||||||
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
|
readLock.lock(); // t = 900
|
||||||
|
time.set(3000);
|
||||||
|
readLock.unlock(); // t = 3000
|
||||||
|
assertEquals(2, wlogged.get());
|
||||||
|
assertEquals(1, wsuppresed.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the warning when the write lock is held longer than threshold.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testWriteLockLongHoldingReport() throws Exception {
|
||||||
|
String testname = name.getMethodName();
|
||||||
|
final AtomicLong time = new AtomicLong(0);
|
||||||
|
Timer mclock = new Timer() {
|
||||||
|
@Override
|
||||||
|
public long monotonicNow() {
|
||||||
|
return time.get();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
final AtomicLong wlogged = new AtomicLong(0);
|
||||||
|
final AtomicLong wsuppresed = new AtomicLong(0);
|
||||||
|
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
|
||||||
|
InstrumentedWriteLock writeLock = new InstrumentedWriteLock(testname, LOG,
|
||||||
|
readWriteLock, 2000, 300, mclock) {
|
||||||
|
@Override
|
||||||
|
protected void logWarning(long lockHeldTime, long suppressed) {
|
||||||
|
wlogged.incrementAndGet();
|
||||||
|
wsuppresed.set(suppressed);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
writeLock.lock(); // t = 0
|
||||||
|
time.set(100);
|
||||||
|
writeLock.unlock(); // t = 100
|
||||||
|
assertEquals(0, wlogged.get());
|
||||||
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
|
writeLock.lock(); // t = 100
|
||||||
|
time.set(500);
|
||||||
|
writeLock.unlock(); // t = 500
|
||||||
|
assertEquals(1, wlogged.get());
|
||||||
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
|
// the suppress counting is only changed when
|
||||||
|
// log is needed in the test
|
||||||
|
writeLock.lock(); // t = 500
|
||||||
|
time.set(900);
|
||||||
|
writeLock.unlock(); // t = 900
|
||||||
|
assertEquals(1, wlogged.get());
|
||||||
|
assertEquals(0, wsuppresed.get());
|
||||||
|
|
||||||
|
writeLock.lock(); // t = 900
|
||||||
|
time.set(3000);
|
||||||
|
writeLock.unlock(); // t = 3000
|
||||||
|
assertEquals(2, wlogged.get());
|
||||||
|
assertEquals(1, wsuppresed.get());
|
||||||
|
}
|
||||||
|
}
|
|
@ -64,7 +64,6 @@ import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.InstrumentedLock;
|
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
|
@ -121,6 +120,7 @@ import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
|
import org.apache.hadoop.util.InstrumentedLock;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.Timer;
|
import org.apache.hadoop.util.Timer;
|
||||||
|
|
Loading…
Reference in New Issue