HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.

This commit is contained in:
Arpit Agarwal 2016-09-10 18:01:37 -07:00
parent d5ea508ca2
commit 04f620c4d0
6 changed files with 394 additions and 8 deletions

View File

@ -17,22 +17,33 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
/** /**
* This is a wrap class of a ReentrantLock. Extending AutoCloseable * This is a wrap class of a ReentrantLock. Extending AutoCloseable
* interface such that the users can use a try-with-resource syntax. * interface such that the users can use a try-with-resource syntax.
*/ */
public class AutoCloseableLock implements AutoCloseable { public class AutoCloseableLock implements AutoCloseable {
private final ReentrantLock lock; private final Lock lock;
/** /**
* Creates an instance of {@code AutoCloseableLock}, initializes * Creates an instance of {@code AutoCloseableLock}, initializes
* the underlying {@code ReentrantLock} object. * the underlying lock instance with a new {@code ReentrantLock}.
*/ */
public AutoCloseableLock() { public AutoCloseableLock() {
this.lock = new ReentrantLock(); this(new ReentrantLock());
}
/**
* Wrap provided Lock instance.
* @param lock Lock instance to wrap in AutoCloseable API.
*/
public AutoCloseableLock(Lock lock) {
this.lock = lock;
} }
/** /**
@ -86,7 +97,7 @@ public class AutoCloseableLock implements AutoCloseable {
/** /**
* A wrapper method that makes a call to {@code tryLock()} of * A wrapper method that makes a call to {@code tryLock()} of
* the underlying {@code ReentrantLock} object. * the underlying {@code Lock} object.
* *
* If the lock is not held by another thread, acquires the lock, set the * If the lock is not held by another thread, acquires the lock, set the
* hold count to one and returns {@code true}. * hold count to one and returns {@code true}.
@ -116,7 +127,12 @@ public class AutoCloseableLock implements AutoCloseable {
* @return {@code true} if any thread holds this lock and * @return {@code true} if any thread holds this lock and
* {@code false} otherwise * {@code false} otherwise
*/ */
public boolean isLocked() { @VisibleForTesting
return lock.isLocked(); boolean isLocked() {
if (lock instanceof ReentrantLock) {
return ((ReentrantLock)lock).isLocked();
} }
throw new UnsupportedOperationException();
}
} }

View File

@ -383,6 +383,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY = public static final String DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.namenode.read-lock-reporting-threshold-ms"; "dfs.namenode.read-lock-reporting-threshold-ms";
public static final long DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L; public static final long DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
// Threshold for how long the lock warnings must be suppressed
public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY =
"dfs.lock.suppress.warning.interval";
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
10000; //ms
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor"; public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT; public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting;
/**
* This is a debugging class that can be used by callers to track
* whether a specifc lock is being held for too long and periodically
* log a warning and stack trace, if so.
*
* The logged warnings are throttled so that logs are not spammed.
*
* A new instance of InstrumentedLock can be created for each object
* that needs to be instrumented.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InstrumentedLock implements Lock {
private final Lock lock;
private final Log logger;
private final String name;
private final Timer clock;
/** Minimum gap between two lock warnings. */
private final long minLoggingGap;
/** Threshold for detecting long lock held time. */
private final long lockWarningThreshold;
// Tracking counters for lock statistics.
private volatile long lockAcquireTimestamp;
private final AtomicLong lastLogTimestamp;
private final AtomicLong warningsSuppressed = new AtomicLong(0);
/**
* Create a instrumented lock instance which logs a warning message
* when lock held time is above given threshold.
*
* @param name the identifier of the lock object
* @param logger this class does not have its own logger, will log to the
* given logger instead
* @param minLoggingGapMs the minimum time gap between two log messages,
* this is to avoid spamming to many logs
* @param lockWarningThresholdMs the time threshold to view lock held
* time as being "too long"
*/
public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
long lockWarningThresholdMs) {
this(name, logger, new ReentrantLock(),
minLoggingGapMs, lockWarningThresholdMs);
}
public InstrumentedLock(String name, Log logger, Lock lock,
long minLoggingGapMs, long lockWarningThresholdMs) {
this(name, logger, lock,
minLoggingGapMs, lockWarningThresholdMs, new Timer());
}
@VisibleForTesting
InstrumentedLock(String name, Log logger, Lock lock,
long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
this.name = name;
this.lock = lock;
this.clock = clock;
this.logger = logger;
minLoggingGap = minLoggingGapMs;
lockWarningThreshold = lockWarningThresholdMs;
lastLogTimestamp = new AtomicLong(
clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
}
@Override
public void lock() {
lock.lock();
lockAcquireTimestamp = clock.monotonicNow();
}
@Override
public void lockInterruptibly() throws InterruptedException {
lock.lockInterruptibly();
lockAcquireTimestamp = clock.monotonicNow();
}
@Override
public boolean tryLock() {
if (lock.tryLock()) {
lockAcquireTimestamp = clock.monotonicNow();
return true;
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (lock.tryLock(time, unit)) {
lockAcquireTimestamp = clock.monotonicNow();
return true;
}
return false;
}
@Override
public void unlock() {
long localLockReleaseTime = clock.monotonicNow();
long localLockAcquireTime = lockAcquireTimestamp;
lock.unlock();
check(localLockAcquireTime, localLockReleaseTime);
}
@Override
public Condition newCondition() {
return lock.newCondition();
}
@VisibleForTesting
void logWarning(long lockHeldTime, long suppressed) {
logger.warn(String.format("Lock held time above threshold: " +
"lock identifier: %s " +
"lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
"The stack trace is: %s" ,
name, lockHeldTime, suppressed,
StringUtils.getStackTrace(Thread.currentThread())));
}
/**
* Log a warning if the lock was held for too long.
*
* Should be invoked by the caller immediately AFTER releasing the lock.
*
* @param acquireTime - timestamp just after acquiring the lock.
* @param releaseTime - timestamp just before releasing the lock.
*/
private void check(long acquireTime, long releaseTime) {
if (!logger.isWarnEnabled()) {
return;
}
final long lockHeldTime = releaseTime - acquireTime;
if (lockWarningThreshold - lockHeldTime < 0) {
long now;
long localLastLogTs;
do {
now = clock.monotonicNow();
localLastLogTs = lastLogTimestamp.get();
long deltaSinceLastLog = now - localLastLogTs;
// check should print log or not
if (deltaSinceLastLog - minLoggingGap < 0) {
warningsSuppressed.incrementAndGet();
return;
}
} while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
long suppressed = warningsSuppressed.getAndSet(0);
logWarning(lockHeldTime, suppressed);
}
}
}

View File

@ -42,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.*; import java.util.*;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -62,7 +63,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -280,7 +281,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage; this.dataStorage = storage;
this.conf = conf; this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.datasetLock = new AutoCloseableLock(); this.datasetLock = new AutoCloseableLock(
new InstrumentedLock(getClass().getName(), LOG,
conf.getTimeDuration(
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS),
300));
// The number of volumes required for operation is the total number // The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate. // of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();

View File

@ -3046,4 +3046,11 @@
Keytab based login can be enabled with dfs.balancer.keytab.enabled. Keytab based login can be enabled with dfs.balancer.keytab.enabled.
</description> </description>
</property> </property>
<property>
<name>dfs.lock.suppress.warning.interval</name>
<value>10s</value>
<description>Instrumentation reporting long critical sections will suppress
consecutive warnings within this interval.</description>
</property>
</configuration> </configuration>

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Timer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
/**
* A test class for InstrumentedLock.
*/
public class TestInstrumentedLock {
static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
@Rule public TestName name = new TestName();
/**
* Test exclusive access of the lock.
* @throws Exception
*/
@Test(timeout=10000)
public void testMultipleThread() throws Exception {
String testname = name.getMethodName();
final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
lock.lock();
try {
Thread competingThread = new Thread() {
@Override
public void run() {
assertFalse(lock.tryLock());
}
};
competingThread.start();
competingThread.join();
} finally {
lock.unlock();
}
}
/**
* Test the correctness with try-with-resource syntax.
* @throws Exception
*/
@Test(timeout=10000)
public void testTryWithResourceSyntax() throws Exception {
String testname = name.getMethodName();
final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
@Override
public void lock() {
super.lock();
lockThread.set(Thread.currentThread());
}
@Override
public void unlock() {
super.unlock();
lockThread.set(null);
}
};
AutoCloseableLock acl = new AutoCloseableLock(lock);
try (AutoCloseable localLock = acl.acquire()) {
assertEquals(acl, localLock);
Thread competingThread = new Thread() {
@Override
public void run() {
assertNotEquals(Thread.currentThread(), lockThread.get());
assertFalse(lock.tryLock());
}
};
competingThread.start();
competingThread.join();
assertEquals(Thread.currentThread(), lockThread.get());
}
assertNull(lockThread.get());
}
/**
* Test the lock logs warning when lock held time is greater than threshold
* and not log warning otherwise.
* @throws Exception
*/
@Test(timeout=10000)
public void testLockLongHoldingReport() throws Exception {
String testname = name.getMethodName();
final AtomicLong time = new AtomicLong(0);
Timer mclock = new Timer() {
@Override
public long monotonicNow() {
return time.get();
}
};
Lock mlock = mock(Lock.class);
final AtomicLong wlogged = new AtomicLong(0);
final AtomicLong wsuppresed = new AtomicLong(0);
InstrumentedLock lock = new InstrumentedLock(
testname, LOG, mlock, 2000, 300, mclock) {
@Override
void logWarning(long lockHeldTime, long suppressed) {
wlogged.incrementAndGet();
wsuppresed.set(suppressed);
}
};
// do not log warning when the lock held time is short
lock.lock(); // t = 0
time.set(200);
lock.unlock(); // t = 200
assertEquals(0, wlogged.get());
assertEquals(0, wsuppresed.get());
lock.lock(); // t = 200
time.set(700);
lock.unlock(); // t = 700
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
// despite the lock held time is greater than threshold
// suppress the log warning due to the logging gap
// (not recorded in wsuppressed until next log message)
lock.lock(); // t = 700
time.set(1100);
lock.unlock(); // t = 1100
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
// log a warning message when the lock held time is greater the threshold
// and the logging time gap is satisfied. Also should display suppressed
// previous warnings.
time.set(2400);
lock.lock(); // t = 2400
time.set(2800);
lock.unlock(); // t = 2800
assertEquals(2, wlogged.get());
assertEquals(1, wsuppresed.get());
}
}