From a7243de1a1e54d18c708f973e9380d3e707ab96a Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 9 Nov 2015 22:59:58 -0800 Subject: [PATCH] HADOOP-12482. Race condition in JMX cache update. (Tony Wu via lei) (cherry picked from commit 0eb9c60c5bec79f531da8cb3226d7e8b1d7e6639) Conflicts: hadoop-common-project/hadoop-common/CHANGES.txt --- .../hadoop-common/CHANGES.txt | 2 + .../metrics2/impl/MetricsSourceAdapter.java | 11 +- .../impl/TestMetricsSourceAdapter.java | 195 +++++++++++++++++- 3 files changed, 205 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index fff26db29be..c77808e641e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -710,6 +710,8 @@ Release 2.8.0 - UNRELEASED HADOOP-12526. there are duplicate dependency definitions in pom's (sjlee) + HADOOP-12482. Race condition in JMX cache update. (Tony Wu via lei) + OPTIMIZATIONS HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString() diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java index d27bdcb3880..706ef7eb4f9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java @@ -60,6 +60,7 @@ class MetricsSourceAdapter implements DynamicMBean { private final Iterable injectedTags; private Iterable lastRecs; + private boolean lastRecsCleared; private long jmxCacheTS = 0; private long jmxCacheTTL; private MBeanInfo infoCache; @@ -80,6 +81,9 @@ class MetricsSourceAdapter implements DynamicMBean { this.metricFilter = metricFilter; this.jmxCacheTTL = checkArg(jmxCacheTTL, jmxCacheTTL > 0, "jmxCacheTTL"); this.startMBeans = startMBeans; + // Initialize to true so we always trigger update MBeanInfo cache the first + // time calling updateJmxCache + this.lastRecsCleared = true; } MetricsSourceAdapter(String prefix, String name, String description, @@ -158,8 +162,12 @@ class MetricsSourceAdapter implements DynamicMBean { if (Time.now() - jmxCacheTS >= jmxCacheTTL) { // temporarilly advance the expiry while updating the cache jmxCacheTS = Time.now() + jmxCacheTTL; - if (lastRecs == null) { + // lastRecs might have been set to an object already by another thread. + // Track the fact that lastRecs has been reset once to make sure refresh + // is correctly triggered. + if (lastRecsCleared) { getAllMetrics = true; + lastRecsCleared = false; } } else { return; @@ -176,6 +184,7 @@ class MetricsSourceAdapter implements DynamicMBean { } jmxCacheTS = Time.now(); lastRecs = null; // in case regular interval update is not running + lastRecsCleared = true; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSourceAdapter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSourceAdapter.java index 22b594aecf7..3fdf445d664 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSourceAdapter.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSourceAdapter.java @@ -22,7 +22,13 @@ import static org.junit.Assert.*; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; @@ -36,13 +42,14 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import static org.apache.hadoop.metrics2.lib.Interns.info; import static org.junit.Assert.assertEquals; +import org.apache.log4j.Logger; import org.junit.Test; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; public class TestMetricsSourceAdapter { - + private static final int RACE_TEST_RUNTIME = 10000; // 10 seconds @Test public void testPurgeOldMetrics() throws Exception { @@ -73,7 +80,7 @@ public class TestMetricsSourceAdapter { } //generate a new key per each call - class PurgableSource implements MetricsSource { + private static class PurgableSource implements MetricsSource { int nextKey = 0; String lastKeyName = null; @Override @@ -135,4 +142,188 @@ public class TestMetricsSourceAdapter { c1.incr(); } } + + /** + * Test a race condition when updating the JMX cache (HADOOP-12482): + * 1. Thread A reads the JMX metric every 2 JMX cache TTL. It marks the JMX + * cache to be updated by marking lastRecs to null. After this it adds a + * new key to the metrics. The next call to read should pick up this new + * key. + * 2. Thread B triggers JMX metric update every 1 JMX cache TTL. It assigns + * lastRecs to a new object (not null any more). + * 3. Thread A tries to read JMX metric again, sees lastRecs is not null and + * does not update JMX cache. As a result the read does not pickup the new + * metric. + * @throws Exception + */ + @Test + public void testMetricCacheUpdateRace() throws Exception { + // Create test source with a single metric counter of value 1. + TestMetricsSource source = new TestMetricsSource(); + MetricsSourceBuilder sourceBuilder = + MetricsAnnotations.newSourceBuilder(source); + + final long JMX_CACHE_TTL = 250; // ms + List injectedTags = new ArrayList<>(); + MetricsSourceAdapter sourceAdapter = + new MetricsSourceAdapter("test", "test", + "test JMX cache update race condition", sourceBuilder.build(), + injectedTags, null, null, JMX_CACHE_TTL, false); + + ScheduledExecutorService updaterExecutor = + Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build()); + ScheduledExecutorService readerExecutor = + Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build()); + + final AtomicBoolean hasError = new AtomicBoolean(false); + + // Wake up every 1 JMX cache TTL to set lastRecs before updateJmxCache() is + // called. + SourceUpdater srcUpdater = new SourceUpdater(sourceAdapter, hasError); + ScheduledFuture updaterFuture = + updaterExecutor.scheduleAtFixedRate(srcUpdater, + sourceAdapter.getJmxCacheTTL(), sourceAdapter.getJmxCacheTTL(), + TimeUnit.MILLISECONDS); + srcUpdater.setFuture(updaterFuture); + + // Wake up every 2 JMX cache TTL so updateJmxCache() will try to update + // JMX cache. + SourceReader srcReader = new SourceReader(source, sourceAdapter, hasError); + ScheduledFuture readerFuture = + readerExecutor.scheduleAtFixedRate(srcReader, + 0, // set JMX info cache at the beginning + 2 * sourceAdapter.getJmxCacheTTL(), TimeUnit.MILLISECONDS); + srcReader.setFuture(readerFuture); + + // Let the threads do their work. + Thread.sleep(RACE_TEST_RUNTIME); + + assertFalse("Hit error", hasError.get()); + + // cleanup + updaterExecutor.shutdownNow(); + readerExecutor.shutdownNow(); + updaterExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); + readerExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + + /** + * Thread safe source: stores a key value pair. Allows thread safe key-value + * pair reads/writes. + */ + private static class TestMetricsSource implements MetricsSource { + private String key = "key0"; + private int val = 0; + + synchronized String getKey() { + return key; + } + + synchronized void setKV(final String newKey, final int newVal) { + key = newKey; + val = newVal; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder rb = + collector.addRecord("TestMetricsSource").setContext("test"); + synchronized(this) { + rb.addGauge(info(key, "TestMetricsSource key"), val); + } + } + } + + /** + * An thread that updates the metrics source every 1 JMX cache TTL + */ + private static class SourceUpdater implements Runnable { + private MetricsSourceAdapter sa = null; + private ScheduledFuture future = null; + private AtomicBoolean hasError = null; + private static final Logger LOG = Logger.getLogger(SourceUpdater.class); + + public SourceUpdater(MetricsSourceAdapter sourceAdapter, + AtomicBoolean err) { + sa = sourceAdapter; + hasError = err; + } + + public void setFuture(ScheduledFuture f) { + future = f; + } + + @Override + public void run() { + MetricsCollectorImpl builder = new MetricsCollectorImpl(); + try { + // This resets lastRecs. + sa.getMetrics(builder, true); + LOG.info("reset lastRecs"); + } catch (Exception e) { + // catch all errors + hasError.set(true); + LOG.error(e.getStackTrace()); + } finally { + if (hasError.get()) { + LOG.error("Hit error, stopping now"); + future.cancel(false); + } + } + } + } + + /** + * An thread that reads the metrics source every JMX cache TTL. After each + * read it updates the metric source to report a new key. The next read must + * be able to pick up this new key. + */ + private static class SourceReader implements Runnable { + private MetricsSourceAdapter sa = null; + private TestMetricsSource src = null; + private int cnt = 0; + private ScheduledFuture future = null; + private AtomicBoolean hasError = null; + private static final Logger LOG = Logger.getLogger(SourceReader.class); + + public SourceReader( + TestMetricsSource source, MetricsSourceAdapter sourceAdapter, + AtomicBoolean err) { + src = source; + sa = sourceAdapter; + hasError = err; + } + + public void setFuture(ScheduledFuture f) { + future = f; + } + + @Override + public void run() { + try { + // This will trigger updateJmxCache(). + MBeanInfo info = sa.getMBeanInfo(); + final String key = src.getKey(); + for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) { + // Found the new key, update the metric source and move on. + if (mBeanAttributeInfo.getName().equals(key)) { + LOG.info("found key/val=" + cnt + "/" + cnt); + cnt++; + src.setKV("key" + cnt, cnt); + return; + } + } + LOG.error("key=" + key + " not found. Stopping now."); + hasError.set(true); + } catch (Exception e) { + // catch other errors + hasError.set(true); + LOG.error(e.getStackTrace()); + } finally { + if (hasError.get()) { + future.cancel(false); + } + } + } + } }