HADOOP-12482. Race condition in JMX cache update. (Tony Wu via lei)
(cherry picked from commit 0eb9c60c5b
)
Conflicts:
hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
parent
e5420feb09
commit
a7243de1a1
|
@ -710,6 +710,8 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
HADOOP-12526. there are duplicate dependency definitions in pom's (sjlee)
|
||||
|
||||
HADOOP-12482. Race condition in JMX cache update. (Tony Wu via lei)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()
|
||||
|
|
|
@ -60,6 +60,7 @@ class MetricsSourceAdapter implements DynamicMBean {
|
|||
private final Iterable<MetricsTag> injectedTags;
|
||||
|
||||
private Iterable<MetricsRecordImpl> lastRecs;
|
||||
private boolean lastRecsCleared;
|
||||
private long jmxCacheTS = 0;
|
||||
private long jmxCacheTTL;
|
||||
private MBeanInfo infoCache;
|
||||
|
@ -80,6 +81,9 @@ class MetricsSourceAdapter implements DynamicMBean {
|
|||
this.metricFilter = metricFilter;
|
||||
this.jmxCacheTTL = checkArg(jmxCacheTTL, jmxCacheTTL > 0, "jmxCacheTTL");
|
||||
this.startMBeans = startMBeans;
|
||||
// Initialize to true so we always trigger update MBeanInfo cache the first
|
||||
// time calling updateJmxCache
|
||||
this.lastRecsCleared = true;
|
||||
}
|
||||
|
||||
MetricsSourceAdapter(String prefix, String name, String description,
|
||||
|
@ -158,8 +162,12 @@ class MetricsSourceAdapter implements DynamicMBean {
|
|||
if (Time.now() - jmxCacheTS >= jmxCacheTTL) {
|
||||
// temporarilly advance the expiry while updating the cache
|
||||
jmxCacheTS = Time.now() + jmxCacheTTL;
|
||||
if (lastRecs == null) {
|
||||
// lastRecs might have been set to an object already by another thread.
|
||||
// Track the fact that lastRecs has been reset once to make sure refresh
|
||||
// is correctly triggered.
|
||||
if (lastRecsCleared) {
|
||||
getAllMetrics = true;
|
||||
lastRecsCleared = false;
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
|
@ -176,6 +184,7 @@ class MetricsSourceAdapter implements DynamicMBean {
|
|||
}
|
||||
jmxCacheTS = Time.now();
|
||||
lastRecs = null; // in case regular interval update is not running
|
||||
lastRecsCleared = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,13 @@ import static org.junit.Assert.*;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
|
@ -36,13 +42,14 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.management.MBeanAttributeInfo;
|
||||
import javax.management.MBeanInfo;
|
||||
|
||||
public class TestMetricsSourceAdapter {
|
||||
|
||||
private static final int RACE_TEST_RUNTIME = 10000; // 10 seconds
|
||||
|
||||
@Test
|
||||
public void testPurgeOldMetrics() throws Exception {
|
||||
|
@ -73,7 +80,7 @@ public class TestMetricsSourceAdapter {
|
|||
}
|
||||
|
||||
//generate a new key per each call
|
||||
class PurgableSource implements MetricsSource {
|
||||
private static class PurgableSource implements MetricsSource {
|
||||
int nextKey = 0;
|
||||
String lastKeyName = null;
|
||||
@Override
|
||||
|
@ -135,4 +142,188 @@ public class TestMetricsSourceAdapter {
|
|||
c1.incr();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a race condition when updating the JMX cache (HADOOP-12482):
|
||||
* 1. Thread A reads the JMX metric every 2 JMX cache TTL. It marks the JMX
|
||||
* cache to be updated by marking lastRecs to null. After this it adds a
|
||||
* new key to the metrics. The next call to read should pick up this new
|
||||
* key.
|
||||
* 2. Thread B triggers JMX metric update every 1 JMX cache TTL. It assigns
|
||||
* lastRecs to a new object (not null any more).
|
||||
* 3. Thread A tries to read JMX metric again, sees lastRecs is not null and
|
||||
* does not update JMX cache. As a result the read does not pickup the new
|
||||
* metric.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testMetricCacheUpdateRace() throws Exception {
|
||||
// Create test source with a single metric counter of value 1.
|
||||
TestMetricsSource source = new TestMetricsSource();
|
||||
MetricsSourceBuilder sourceBuilder =
|
||||
MetricsAnnotations.newSourceBuilder(source);
|
||||
|
||||
final long JMX_CACHE_TTL = 250; // ms
|
||||
List<MetricsTag> injectedTags = new ArrayList<>();
|
||||
MetricsSourceAdapter sourceAdapter =
|
||||
new MetricsSourceAdapter("test", "test",
|
||||
"test JMX cache update race condition", sourceBuilder.build(),
|
||||
injectedTags, null, null, JMX_CACHE_TTL, false);
|
||||
|
||||
ScheduledExecutorService updaterExecutor =
|
||||
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
|
||||
ScheduledExecutorService readerExecutor =
|
||||
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
|
||||
|
||||
final AtomicBoolean hasError = new AtomicBoolean(false);
|
||||
|
||||
// Wake up every 1 JMX cache TTL to set lastRecs before updateJmxCache() is
|
||||
// called.
|
||||
SourceUpdater srcUpdater = new SourceUpdater(sourceAdapter, hasError);
|
||||
ScheduledFuture<?> updaterFuture =
|
||||
updaterExecutor.scheduleAtFixedRate(srcUpdater,
|
||||
sourceAdapter.getJmxCacheTTL(), sourceAdapter.getJmxCacheTTL(),
|
||||
TimeUnit.MILLISECONDS);
|
||||
srcUpdater.setFuture(updaterFuture);
|
||||
|
||||
// Wake up every 2 JMX cache TTL so updateJmxCache() will try to update
|
||||
// JMX cache.
|
||||
SourceReader srcReader = new SourceReader(source, sourceAdapter, hasError);
|
||||
ScheduledFuture<?> readerFuture =
|
||||
readerExecutor.scheduleAtFixedRate(srcReader,
|
||||
0, // set JMX info cache at the beginning
|
||||
2 * sourceAdapter.getJmxCacheTTL(), TimeUnit.MILLISECONDS);
|
||||
srcReader.setFuture(readerFuture);
|
||||
|
||||
// Let the threads do their work.
|
||||
Thread.sleep(RACE_TEST_RUNTIME);
|
||||
|
||||
assertFalse("Hit error", hasError.get());
|
||||
|
||||
// cleanup
|
||||
updaterExecutor.shutdownNow();
|
||||
readerExecutor.shutdownNow();
|
||||
updaterExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
readerExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread safe source: stores a key value pair. Allows thread safe key-value
|
||||
* pair reads/writes.
|
||||
*/
|
||||
private static class TestMetricsSource implements MetricsSource {
|
||||
private String key = "key0";
|
||||
private int val = 0;
|
||||
|
||||
synchronized String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
synchronized void setKV(final String newKey, final int newVal) {
|
||||
key = newKey;
|
||||
val = newVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
MetricsRecordBuilder rb =
|
||||
collector.addRecord("TestMetricsSource").setContext("test");
|
||||
synchronized(this) {
|
||||
rb.addGauge(info(key, "TestMetricsSource key"), val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An thread that updates the metrics source every 1 JMX cache TTL
|
||||
*/
|
||||
private static class SourceUpdater implements Runnable {
|
||||
private MetricsSourceAdapter sa = null;
|
||||
private ScheduledFuture<?> future = null;
|
||||
private AtomicBoolean hasError = null;
|
||||
private static final Logger LOG = Logger.getLogger(SourceUpdater.class);
|
||||
|
||||
public SourceUpdater(MetricsSourceAdapter sourceAdapter,
|
||||
AtomicBoolean err) {
|
||||
sa = sourceAdapter;
|
||||
hasError = err;
|
||||
}
|
||||
|
||||
public void setFuture(ScheduledFuture<?> f) {
|
||||
future = f;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
MetricsCollectorImpl builder = new MetricsCollectorImpl();
|
||||
try {
|
||||
// This resets lastRecs.
|
||||
sa.getMetrics(builder, true);
|
||||
LOG.info("reset lastRecs");
|
||||
} catch (Exception e) {
|
||||
// catch all errors
|
||||
hasError.set(true);
|
||||
LOG.error(e.getStackTrace());
|
||||
} finally {
|
||||
if (hasError.get()) {
|
||||
LOG.error("Hit error, stopping now");
|
||||
future.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An thread that reads the metrics source every JMX cache TTL. After each
|
||||
* read it updates the metric source to report a new key. The next read must
|
||||
* be able to pick up this new key.
|
||||
*/
|
||||
private static class SourceReader implements Runnable {
|
||||
private MetricsSourceAdapter sa = null;
|
||||
private TestMetricsSource src = null;
|
||||
private int cnt = 0;
|
||||
private ScheduledFuture<?> future = null;
|
||||
private AtomicBoolean hasError = null;
|
||||
private static final Logger LOG = Logger.getLogger(SourceReader.class);
|
||||
|
||||
public SourceReader(
|
||||
TestMetricsSource source, MetricsSourceAdapter sourceAdapter,
|
||||
AtomicBoolean err) {
|
||||
src = source;
|
||||
sa = sourceAdapter;
|
||||
hasError = err;
|
||||
}
|
||||
|
||||
public void setFuture(ScheduledFuture<?> f) {
|
||||
future = f;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// This will trigger updateJmxCache().
|
||||
MBeanInfo info = sa.getMBeanInfo();
|
||||
final String key = src.getKey();
|
||||
for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
|
||||
// Found the new key, update the metric source and move on.
|
||||
if (mBeanAttributeInfo.getName().equals(key)) {
|
||||
LOG.info("found key/val=" + cnt + "/" + cnt);
|
||||
cnt++;
|
||||
src.setKV("key" + cnt, cnt);
|
||||
return;
|
||||
}
|
||||
}
|
||||
LOG.error("key=" + key + " not found. Stopping now.");
|
||||
hasError.set(true);
|
||||
} catch (Exception e) {
|
||||
// catch other errors
|
||||
hasError.set(true);
|
||||
LOG.error(e.getStackTrace());
|
||||
} finally {
|
||||
if (hasError.get()) {
|
||||
future.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue