HADOOP-13702. Add instrumented ReadWriteLock. Contributed by Jingcheng Du

This commit is contained in:
Chris Douglas 2016-10-21 11:28:11 -07:00
parent 44eb2bd7ae
commit ae8bccd509
7 changed files with 461 additions and 15 deletions

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.util;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -26,14 +26,12 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
/** /**
* This is a debugging class that can be used by callers to track * This is a debugging class that can be used by callers to track
* whether a specifc lock is being held for too long and periodically * whether a specific lock is being held for too long and periodically
* log a warning and stack trace, if so. * log a warning and stack trace, if so.
* *
* The logged warnings are throttled so that logs are not spammed. * The logged warnings are throttled so that logs are not spammed.
@ -100,19 +98,19 @@ public class InstrumentedLock implements Lock {
@Override @Override
public void lock() { public void lock() {
lock.lock(); lock.lock();
lockAcquireTimestamp = clock.monotonicNow(); startLockTiming();
} }
@Override @Override
public void lockInterruptibly() throws InterruptedException { public void lockInterruptibly() throws InterruptedException {
lock.lockInterruptibly(); lock.lockInterruptibly();
lockAcquireTimestamp = clock.monotonicNow(); startLockTiming();
} }
@Override @Override
public boolean tryLock() { public boolean tryLock() {
if (lock.tryLock()) { if (lock.tryLock()) {
lockAcquireTimestamp = clock.monotonicNow(); startLockTiming();
return true; return true;
} }
return false; return false;
@ -121,7 +119,7 @@ public class InstrumentedLock implements Lock {
@Override @Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (lock.tryLock(time, unit)) { if (lock.tryLock(time, unit)) {
lockAcquireTimestamp = clock.monotonicNow(); startLockTiming();
return true; return true;
} }
return false; return false;
@ -150,6 +148,13 @@ public class InstrumentedLock implements Lock {
StringUtils.getStackTrace(Thread.currentThread()))); StringUtils.getStackTrace(Thread.currentThread())));
} }
/**
* Starts timing for the instrumented lock.
*/
protected void startLockTiming() {
lockAcquireTimestamp = clock.monotonicNow();
}
/** /**
* Log a warning if the lock was held for too long. * Log a warning if the lock was held for too long.
* *
@ -158,7 +163,7 @@ public class InstrumentedLock implements Lock {
* @param acquireTime - timestamp just after acquiring the lock. * @param acquireTime - timestamp just after acquiring the lock.
* @param releaseTime - timestamp just before releasing the lock. * @param releaseTime - timestamp just before releasing the lock.
*/ */
private void check(long acquireTime, long releaseTime) { protected void check(long acquireTime, long releaseTime) {
if (!logger.isWarnEnabled()) { if (!logger.isWarnEnabled()) {
return; return;
} }
@ -182,4 +187,11 @@ public class InstrumentedLock implements Lock {
} }
} }
protected Lock getLock() {
return lock;
}
protected Timer getTimer() {
return clock;
}
} }

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
/**
* This is a wrap class of a <tt>ReadLock</tt>.
* It extends the class {@link InstrumentedLock}, and can be used to track
* whether a specific read lock is being held for too long and log
* warnings if so.
*
* The logged warnings are throttled so that logs are not spammed.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InstrumentedReadLock extends InstrumentedLock {
private final ReentrantReadWriteLock readWriteLock;
/**
* Uses the ThreadLocal to keep the time of acquiring locks since
* there can be multiple threads that hold the read lock concurrently.
*/
private final ThreadLocal<Long> readLockHeldTimeStamp =
new ThreadLocal<Long>() {
@Override
protected Long initialValue() {
return Long.MAX_VALUE;
};
};
public InstrumentedReadLock(String name, Log logger,
ReentrantReadWriteLock readWriteLock,
long minLoggingGapMs, long lockWarningThresholdMs) {
this(name, logger, readWriteLock, minLoggingGapMs, lockWarningThresholdMs,
new Timer());
}
@VisibleForTesting
InstrumentedReadLock(String name, Log logger,
ReentrantReadWriteLock readWriteLock,
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
super(name, logger, readWriteLock.readLock(), minLoggingGapMs,
lockWarningThresholdMs, clock);
this.readWriteLock = readWriteLock;
}
@Override
public void unlock() {
boolean needReport = readWriteLock.getReadHoldCount() == 1;
long localLockReleaseTime = getTimer().monotonicNow();
long localLockAcquireTime = readLockHeldTimeStamp.get();
getLock().unlock();
if (needReport) {
readLockHeldTimeStamp.remove();
check(localLockAcquireTime, localLockReleaseTime);
}
}
/**
* Starts timing for the instrumented read lock.
* It records the time to ThreadLocal.
*/
@Override
protected void startLockTiming() {
if (readWriteLock.getReadHoldCount() == 1) {
readLockHeldTimeStamp.set(getTimer().monotonicNow());
}
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This is a wrap class of a {@link ReentrantReadWriteLock}.
* It implements the interface {@link ReadWriteLock}, and can be used to
* create instrumented <tt>ReadLock</tt> and <tt>WriteLock</tt>.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InstrumentedReadWriteLock implements ReadWriteLock {
private final Lock readLock;
private final Lock writeLock;
InstrumentedReadWriteLock(boolean fair, String name, Log logger,
long minLoggingGapMs, long lockWarningThresholdMs) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
readLock = new InstrumentedReadLock(name, logger, readWriteLock,
minLoggingGapMs, lockWarningThresholdMs);
writeLock = new InstrumentedWriteLock(name, logger, readWriteLock,
minLoggingGapMs, lockWarningThresholdMs);
}
@Override
public Lock readLock() {
return readLock;
}
@Override
public Lock writeLock() {
return writeLock;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
/**
* This is a wrap class of a <tt>WriteLock</tt>.
* It extends the class {@link InstrumentedLock}, and can be used to track
* whether a specific write lock is being held for too long and log
* warnings if so.
*
* The logged warnings are throttled so that logs are not spammed.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InstrumentedWriteLock extends InstrumentedLock {
public InstrumentedWriteLock(String name, Log logger,
ReentrantReadWriteLock readWriteLock,
long minLoggingGapMs, long lockWarningThresholdMs) {
this(name, logger, readWriteLock, minLoggingGapMs, lockWarningThresholdMs,
new Timer());
}
@VisibleForTesting
InstrumentedWriteLock(String name, Log logger,
ReentrantReadWriteLock readWriteLock,
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
super(name, logger, readWriteLock.writeLock(), minLoggingGapMs,
lockWarningThresholdMs, clock);
}
}

View File

@ -15,18 +15,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.util;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Timer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName; import org.junit.rules.TestName;

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
/**
* A test class for InstrumentedReadLock and InstrumentedWriteLock.
*/
public class TestInstrumentedReadWriteLock {
static final Log LOG = LogFactory.getLog(TestInstrumentedReadWriteLock.class);
@Rule
public TestName name = new TestName();
/**
* Tests exclusive access of the write lock.
* @throws Exception
*/
@Test(timeout=10000)
public void testWriteLock() throws Exception {
String testname = name.getMethodName();
final ThreadLocal<Boolean> locked = new ThreadLocal<Boolean>();
locked.set(Boolean.FALSE);
InstrumentedReadWriteLock readWriteLock = new InstrumentedReadWriteLock(
true, testname, LOG, 2000, 300);
final AutoCloseableLock writeLock = new AutoCloseableLock(
readWriteLock.writeLock()) {
@Override
public AutoCloseableLock acquire() {
AutoCloseableLock lock = super.acquire();
locked.set(Boolean.TRUE);
return lock;
}
@Override
public void release() {
super.release();
locked.set(Boolean.FALSE);
}
};
final AutoCloseableLock readLock = new AutoCloseableLock(
readWriteLock.readLock());
try (AutoCloseableLock lock = writeLock.acquire()) {
Thread competingWriteThread = new Thread() {
@Override
public void run() {
assertFalse(writeLock.tryLock());
}
};
competingWriteThread.start();
competingWriteThread.join();
Thread competingReadThread = new Thread() {
@Override
public void run() {
assertFalse(readLock.tryLock());
};
};
competingReadThread.start();
competingReadThread.join();
}
assertFalse(locked.get());
locked.remove();
}
/**
* Tests the read lock.
* @throws Exception
*/
@Test(timeout=10000)
public void testReadLock() throws Exception {
String testname = name.getMethodName();
InstrumentedReadWriteLock readWriteLock = new InstrumentedReadWriteLock(
true, testname, LOG, 2000, 300);
final AutoCloseableLock readLock = new AutoCloseableLock(
readWriteLock.readLock());
final AutoCloseableLock writeLock = new AutoCloseableLock(
readWriteLock.writeLock());
try (AutoCloseableLock lock = readLock.acquire()) {
Thread competingReadThread = new Thread() {
@Override
public void run() {
assertTrue(readLock.tryLock());
readLock.release();
}
};
competingReadThread.start();
competingReadThread.join();
Thread competingWriteThread = new Thread() {
@Override
public void run() {
assertFalse(writeLock.tryLock());
}
};
competingWriteThread.start();
competingWriteThread.join();
}
}
/**
* Tests the warning when the read lock is held longer than threshold.
* @throws Exception
*/
@Test(timeout=10000)
public void testReadLockLongHoldingReport() throws Exception {
String testname = name.getMethodName();
final AtomicLong time = new AtomicLong(0);
Timer mclock = new Timer() {
@Override
public long monotonicNow() {
return time.get();
}
};
final AtomicLong wlogged = new AtomicLong(0);
final AtomicLong wsuppresed = new AtomicLong(0);
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
InstrumentedReadLock readLock = new InstrumentedReadLock(testname, LOG,
readWriteLock, 2000, 300, mclock) {
@Override
protected void logWarning(long lockHeldTime, long suppressed) {
wlogged.incrementAndGet();
wsuppresed.set(suppressed);
}
};
readLock.lock(); // t = 0
time.set(100);
readLock.unlock(); // t = 100
assertEquals(0, wlogged.get());
assertEquals(0, wsuppresed.get());
readLock.lock(); // t = 100
time.set(500);
readLock.unlock(); // t = 500
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
// the suppress counting is only changed when
// log is needed in the test
readLock.lock(); // t = 500
time.set(900);
readLock.unlock(); // t = 900
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
readLock.lock(); // t = 900
time.set(3000);
readLock.unlock(); // t = 3000
assertEquals(2, wlogged.get());
assertEquals(1, wsuppresed.get());
}
/**
* Tests the warning when the write lock is held longer than threshold.
* @throws Exception
*/
@Test(timeout=10000)
public void testWriteLockLongHoldingReport() throws Exception {
String testname = name.getMethodName();
final AtomicLong time = new AtomicLong(0);
Timer mclock = new Timer() {
@Override
public long monotonicNow() {
return time.get();
}
};
final AtomicLong wlogged = new AtomicLong(0);
final AtomicLong wsuppresed = new AtomicLong(0);
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
InstrumentedWriteLock writeLock = new InstrumentedWriteLock(testname, LOG,
readWriteLock, 2000, 300, mclock) {
@Override
protected void logWarning(long lockHeldTime, long suppressed) {
wlogged.incrementAndGet();
wsuppresed.set(suppressed);
}
};
writeLock.lock(); // t = 0
time.set(100);
writeLock.unlock(); // t = 100
assertEquals(0, wlogged.get());
assertEquals(0, wsuppresed.get());
writeLock.lock(); // t = 100
time.set(500);
writeLock.unlock(); // t = 500
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
// the suppress counting is only changed when
// log is needed in the test
writeLock.lock(); // t = 500
time.set(900);
writeLock.unlock(); // t = 900
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
writeLock.lock(); // t = 900
time.set(3000);
writeLock.unlock(); // t = 3000
assertEquals(2, wlogged.get());
assertEquals(1, wsuppresed.get());
}
}

View File

@ -58,7 +58,6 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -109,6 +108,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.InstrumentedLock;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer; import org.apache.hadoop.util.Timer;