HADOOP-12482. Race condition in JMX cache update. (Tony Wu via lei)

This commit is contained in:
Lei Xu 2015-11-09 22:59:58 -08:00
parent 2fda45b9dc
commit 0eb9c60c5b
3 changed files with 205 additions and 3 deletions

View File

@ -1329,6 +1329,8 @@ Release 2.8.0 - UNRELEASED
fails intermittently due to assumption that a lease error will be thrown.
(Gaurav Kanade via cnauroth)
HADOOP-12482. Race condition in JMX cache update. (Tony Wu via lei)
OPTIMIZATIONS
HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()

View File

@ -60,6 +60,7 @@ class MetricsSourceAdapter implements DynamicMBean {
private final Iterable<MetricsTag> injectedTags;
private Iterable<MetricsRecordImpl> lastRecs;
private boolean lastRecsCleared;
private long jmxCacheTS = 0;
private long jmxCacheTTL;
private MBeanInfo infoCache;
@ -80,6 +81,9 @@ class MetricsSourceAdapter implements DynamicMBean {
this.metricFilter = metricFilter;
this.jmxCacheTTL = checkArg(jmxCacheTTL, jmxCacheTTL > 0, "jmxCacheTTL");
this.startMBeans = startMBeans;
// Initialize to true so we always trigger update MBeanInfo cache the first
// time calling updateJmxCache
this.lastRecsCleared = true;
}
MetricsSourceAdapter(String prefix, String name, String description,
@ -158,8 +162,12 @@ class MetricsSourceAdapter implements DynamicMBean {
if (Time.now() - jmxCacheTS >= jmxCacheTTL) {
// temporarilly advance the expiry while updating the cache
jmxCacheTS = Time.now() + jmxCacheTTL;
if (lastRecs == null) {
// lastRecs might have been set to an object already by another thread.
// Track the fact that lastRecs has been reset once to make sure refresh
// is correctly triggered.
if (lastRecsCleared) {
getAllMetrics = true;
lastRecsCleared = false;
}
} else {
return;
@ -176,6 +184,7 @@ class MetricsSourceAdapter implements DynamicMBean {
}
jmxCacheTS = Time.now();
lastRecs = null; // in case regular interval update is not running
lastRecsCleared = true;
}
}

View File

@ -22,7 +22,13 @@ import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
@ -36,13 +42,14 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.junit.Assert.assertEquals;
import org.apache.log4j.Logger;
import org.junit.Test;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
public class TestMetricsSourceAdapter {
private static final int RACE_TEST_RUNTIME = 10000; // 10 seconds
@Test
public void testPurgeOldMetrics() throws Exception {
@ -73,7 +80,7 @@ public class TestMetricsSourceAdapter {
}
//generate a new key per each call
class PurgableSource implements MetricsSource {
private static class PurgableSource implements MetricsSource {
int nextKey = 0;
String lastKeyName = null;
@Override
@ -135,4 +142,188 @@ public class TestMetricsSourceAdapter {
c1.incr();
}
}
/**
* Test a race condition when updating the JMX cache (HADOOP-12482):
* 1. Thread A reads the JMX metric every 2 JMX cache TTL. It marks the JMX
* cache to be updated by marking lastRecs to null. After this it adds a
* new key to the metrics. The next call to read should pick up this new
* key.
* 2. Thread B triggers JMX metric update every 1 JMX cache TTL. It assigns
* lastRecs to a new object (not null any more).
* 3. Thread A tries to read JMX metric again, sees lastRecs is not null and
* does not update JMX cache. As a result the read does not pickup the new
* metric.
* @throws Exception
*/
@Test
public void testMetricCacheUpdateRace() throws Exception {
// Create test source with a single metric counter of value 1.
TestMetricsSource source = new TestMetricsSource();
MetricsSourceBuilder sourceBuilder =
MetricsAnnotations.newSourceBuilder(source);
final long JMX_CACHE_TTL = 250; // ms
List<MetricsTag> injectedTags = new ArrayList<>();
MetricsSourceAdapter sourceAdapter =
new MetricsSourceAdapter("test", "test",
"test JMX cache update race condition", sourceBuilder.build(),
injectedTags, null, null, JMX_CACHE_TTL, false);
ScheduledExecutorService updaterExecutor =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
ScheduledExecutorService readerExecutor =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
final AtomicBoolean hasError = new AtomicBoolean(false);
// Wake up every 1 JMX cache TTL to set lastRecs before updateJmxCache() is
// called.
SourceUpdater srcUpdater = new SourceUpdater(sourceAdapter, hasError);
ScheduledFuture<?> updaterFuture =
updaterExecutor.scheduleAtFixedRate(srcUpdater,
sourceAdapter.getJmxCacheTTL(), sourceAdapter.getJmxCacheTTL(),
TimeUnit.MILLISECONDS);
srcUpdater.setFuture(updaterFuture);
// Wake up every 2 JMX cache TTL so updateJmxCache() will try to update
// JMX cache.
SourceReader srcReader = new SourceReader(source, sourceAdapter, hasError);
ScheduledFuture<?> readerFuture =
readerExecutor.scheduleAtFixedRate(srcReader,
0, // set JMX info cache at the beginning
2 * sourceAdapter.getJmxCacheTTL(), TimeUnit.MILLISECONDS);
srcReader.setFuture(readerFuture);
// Let the threads do their work.
Thread.sleep(RACE_TEST_RUNTIME);
assertFalse("Hit error", hasError.get());
// cleanup
updaterExecutor.shutdownNow();
readerExecutor.shutdownNow();
updaterExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
readerExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
/**
* Thread safe source: stores a key value pair. Allows thread safe key-value
* pair reads/writes.
*/
private static class TestMetricsSource implements MetricsSource {
private String key = "key0";
private int val = 0;
synchronized String getKey() {
return key;
}
synchronized void setKV(final String newKey, final int newVal) {
key = newKey;
val = newVal;
}
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder rb =
collector.addRecord("TestMetricsSource").setContext("test");
synchronized(this) {
rb.addGauge(info(key, "TestMetricsSource key"), val);
}
}
}
/**
* An thread that updates the metrics source every 1 JMX cache TTL
*/
private static class SourceUpdater implements Runnable {
private MetricsSourceAdapter sa = null;
private ScheduledFuture<?> future = null;
private AtomicBoolean hasError = null;
private static final Logger LOG = Logger.getLogger(SourceUpdater.class);
public SourceUpdater(MetricsSourceAdapter sourceAdapter,
AtomicBoolean err) {
sa = sourceAdapter;
hasError = err;
}
public void setFuture(ScheduledFuture<?> f) {
future = f;
}
@Override
public void run() {
MetricsCollectorImpl builder = new MetricsCollectorImpl();
try {
// This resets lastRecs.
sa.getMetrics(builder, true);
LOG.info("reset lastRecs");
} catch (Exception e) {
// catch all errors
hasError.set(true);
LOG.error(e.getStackTrace());
} finally {
if (hasError.get()) {
LOG.error("Hit error, stopping now");
future.cancel(false);
}
}
}
}
/**
* An thread that reads the metrics source every JMX cache TTL. After each
* read it updates the metric source to report a new key. The next read must
* be able to pick up this new key.
*/
private static class SourceReader implements Runnable {
private MetricsSourceAdapter sa = null;
private TestMetricsSource src = null;
private int cnt = 0;
private ScheduledFuture<?> future = null;
private AtomicBoolean hasError = null;
private static final Logger LOG = Logger.getLogger(SourceReader.class);
public SourceReader(
TestMetricsSource source, MetricsSourceAdapter sourceAdapter,
AtomicBoolean err) {
src = source;
sa = sourceAdapter;
hasError = err;
}
public void setFuture(ScheduledFuture<?> f) {
future = f;
}
@Override
public void run() {
try {
// This will trigger updateJmxCache().
MBeanInfo info = sa.getMBeanInfo();
final String key = src.getKey();
for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
// Found the new key, update the metric source and move on.
if (mBeanAttributeInfo.getName().equals(key)) {
LOG.info("found key/val=" + cnt + "/" + cnt);
cnt++;
src.setKV("key" + cnt, cnt);
return;
}
}
LOG.error("key=" + key + " not found. Stopping now.");
hasError.set(true);
} catch (Exception e) {
// catch other errors
hasError.set(true);
LOG.error(e.getStackTrace());
} finally {
if (hasError.get()) {
future.cancel(false);
}
}
}
}
}