diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java index 363ddd2ed54..5133a96db10 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java @@ -48,19 +48,15 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl } private void register(MetricsTableSource source) { - synchronized (this) { - source.registerMetrics(); - } + source.registerMetrics(); } @Override public void deleteTableSource(String table) { try { - synchronized (this) { - MetricsTableSource source = tableSources.remove(table); - if (source != null) { - source.close(); - } + MetricsTableSource source = tableSources.remove(table); + if (source != null) { + source.close(); } } catch (Exception e) { // Ignored. If this errors out it means that someone is double @@ -76,17 +72,13 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl if (source != null) { return source; } - source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) - .createTable(table, wrapper); - MetricsTableSource prev = tableSources.putIfAbsent(table, source); - - if (prev != null) { - return prev; - } else { + MetricsTableSource newSource = CompatibilitySingletonFactory + .getInstance(MetricsRegionServerSourceFactory.class).createTable(table, wrapper); + return tableSources.computeIfAbsent(table, k -> { // register the new metrics now - register(source); - } - return source; + newSource.registerMetrics(); + return newSource; + }); } /** diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java index da78a2c8f97..08074790f92 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java @@ -15,21 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.regionserver; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.metrics.Interns; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.metrics2.MetricHistogram; -import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableFastCounter; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES_DESC; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES; @@ -74,6 +61,17 @@ import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPL import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.metrics.Interns; +import org.apache.hadoop.metrics2.MetricHistogram; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + @InterfaceAudience.Private public class MetricsTableSourceImpl implements MetricsTableSource { @@ -123,7 +121,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource { public MetricsTableSourceImpl(String tblName, MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) { - LOG.debug("Creating new MetricsTableSourceImpl for table "); + LOG.debug("Creating new MetricsTableSourceImpl for table '{}'", tblName); this.tableName = TableName.valueOf(tblName); this.agg = aggregate; @@ -240,17 +238,11 @@ public class MetricsTableSourceImpl implements MetricsTableSource { if (!(source instanceof MetricsTableSourceImpl)) { return -1; } - MetricsTableSourceImpl impl = (MetricsTableSourceImpl) source; - if (impl == null) { - return -1; - } - return Long.compare(hashCode, impl.hashCode); } void snapshot(MetricsRecordBuilder mrb, boolean ignored) { - // If there is a close that started be double extra sure // that we're not getting any locks and not putting data // into the metrics that should be removed. So early out @@ -263,7 +255,6 @@ public class MetricsTableSourceImpl implements MetricsTableSource { // This ensures that removes of the metrics // can't happen while we are putting them back in. synchronized (this) { - // It's possible that a close happened between checking // the closed variable and getting the lock. if (closed.get()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java index b19d4b080ae..71c3e71882f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java @@ -17,7 +17,14 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertTrue; + import java.io.IOException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompatibilityFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -29,15 +36,19 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -@Category({RegionServerTests.class, SmallTests.class}) +@Category({ RegionServerTests.class, SmallTests.class }) public class TestMetricsTableAggregate { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMetricsTableAggregate.class); - public static MetricsAssertHelper HELPER = + private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class); + + private static MetricsAssertHelper HELPER = CompatibilityFactory.getInstance(MetricsAssertHelper.class); private String tableName = "testTableMetrics"; @@ -87,6 +98,7 @@ public class TestMetricsTableAggregate { HELPER.assertGauge(pre + "averageRegionSize", 88, agg); } + @Test public void testFlush() { rsm.updateFlush(tableName, 1, 2, 3); HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg); @@ -139,4 +151,32 @@ public class TestMetricsTableAggregate { HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg); } + private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) { + try { + for (int i = 0; i < round; i++) { + String tn = tableName + "-" + i; + barrier.await(10, TimeUnit.SECONDS); + rsm.updateFlush(tn, 100, 1000, 500); + } + } catch (Exception e) { + LOG.warn("Failed to update metrics", e); + succ.set(false); + } + } + + @Test + public void testConcurrentUpdate() throws InterruptedException { + int threadNumber = 10; + int round = 100; + AtomicBoolean succ = new AtomicBoolean(true); + CyclicBarrier barrier = new CyclicBarrier(threadNumber); + Thread[] threads = IntStream.range(0, threadNumber) + .mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i)) + .toArray(Thread[]::new); + Stream.of(threads).forEach(Thread::start); + for (Thread t : threads) { + t.join(); + } + assertTrue(succ.get()); + } }