HBASE-21136 NPE in MetricsTableSourceImpl.updateFlushTime

This commit is contained in:
zhangduo 2018-09-01 20:00:35 +08:00
parent 9c09efc0df
commit dc79029966
3 changed files with 64 additions and 41 deletions

View File

@ -48,20 +48,16 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
} }
private void register(MetricsTableSource source) { private void register(MetricsTableSource source) {
synchronized (this) {
source.registerMetrics(); source.registerMetrics();
} }
}
@Override @Override
public void deleteTableSource(String table) { public void deleteTableSource(String table) {
try { try {
synchronized (this) {
MetricsTableSource source = tableSources.remove(table); MetricsTableSource source = tableSources.remove(table);
if (source != null) { if (source != null) {
source.close(); source.close();
} }
}
} catch (Exception e) { } catch (Exception e) {
// Ignored. If this errors out it means that someone is double // Ignored. If this errors out it means that someone is double
// closing the user source and the user metrics is already nulled out. // closing the user source and the user metrics is already nulled out.
@ -76,17 +72,13 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
if (source != null) { if (source != null) {
return source; return source;
} }
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class) MetricsTableSource newSource = CompatibilitySingletonFactory
.createTable(table, wrapper); .getInstance(MetricsRegionServerSourceFactory.class).createTable(table, wrapper);
MetricsTableSource prev = tableSources.putIfAbsent(table, source); return tableSources.computeIfAbsent(table, k -> {
if (prev != null) {
return prev;
} else {
// register the new metrics now // register the new metrics now
register(source); newSource.registerMetrics();
} return newSource;
return source; });
} }
/** /**

View File

@ -15,21 +15,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES_DESC; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES_DESC;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES;
@ -74,6 +61,17 @@ import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPL
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY; import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private @InterfaceAudience.Private
public class MetricsTableSourceImpl implements MetricsTableSource { public class MetricsTableSourceImpl implements MetricsTableSource {
@ -123,7 +121,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
public MetricsTableSourceImpl(String tblName, public MetricsTableSourceImpl(String tblName,
MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) { MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) {
LOG.debug("Creating new MetricsTableSourceImpl for table "); LOG.debug("Creating new MetricsTableSourceImpl for table '{}'", tblName);
this.tableName = TableName.valueOf(tblName); this.tableName = TableName.valueOf(tblName);
this.agg = aggregate; this.agg = aggregate;
@ -240,17 +238,11 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
if (!(source instanceof MetricsTableSourceImpl)) { if (!(source instanceof MetricsTableSourceImpl)) {
return -1; return -1;
} }
MetricsTableSourceImpl impl = (MetricsTableSourceImpl) source; MetricsTableSourceImpl impl = (MetricsTableSourceImpl) source;
if (impl == null) {
return -1;
}
return Long.compare(hashCode, impl.hashCode); return Long.compare(hashCode, impl.hashCode);
} }
void snapshot(MetricsRecordBuilder mrb, boolean ignored) { void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
// If there is a close that started be double extra sure // If there is a close that started be double extra sure
// that we're not getting any locks and not putting data // that we're not getting any locks and not putting data
// into the metrics that should be removed. So early out // into the metrics that should be removed. So early out
@ -263,7 +255,6 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
// This ensures that removes of the metrics // This ensures that removes of the metrics
// can't happen while we are putting them back in. // can't happen while we are putting them back in.
synchronized (this) { synchronized (this) {
// It's possible that a close happened between checking // It's possible that a close happened between checking
// the closed variable and getting the lock. // the closed variable and getting the lock.
if (closed.get()) { if (closed.get()) {

View File

@ -17,7 +17,14 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilityFactory; import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -29,15 +36,19 @@ import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({RegionServerTests.class, SmallTests.class}) @Category({ RegionServerTests.class, SmallTests.class })
public class TestMetricsTableAggregate { public class TestMetricsTableAggregate {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMetricsTableAggregate.class); HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
public static MetricsAssertHelper HELPER = private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class);
private static MetricsAssertHelper HELPER =
CompatibilityFactory.getInstance(MetricsAssertHelper.class); CompatibilityFactory.getInstance(MetricsAssertHelper.class);
private String tableName = "testTableMetrics"; private String tableName = "testTableMetrics";
@ -87,6 +98,7 @@ public class TestMetricsTableAggregate {
HELPER.assertGauge(pre + "averageRegionSize", 88, agg); HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
} }
@Test
public void testFlush() { public void testFlush() {
rsm.updateFlush(tableName, 1, 2, 3); rsm.updateFlush(tableName, 1, 2, 3);
HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg); HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
@ -139,4 +151,32 @@ public class TestMetricsTableAggregate {
HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg); HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
} }
private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) {
try {
for (int i = 0; i < round; i++) {
String tn = tableName + "-" + i;
barrier.await(10, TimeUnit.SECONDS);
rsm.updateFlush(tn, 100, 1000, 500);
}
} catch (Exception e) {
LOG.warn("Failed to update metrics", e);
succ.set(false);
}
}
@Test
public void testConcurrentUpdate() throws InterruptedException {
int threadNumber = 10;
int round = 100;
AtomicBoolean succ = new AtomicBoolean(true);
CyclicBarrier barrier = new CyclicBarrier(threadNumber);
Thread[] threads = IntStream.range(0, threadNumber)
.mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i))
.toArray(Thread[]::new);
Stream.of(threads).forEach(Thread::start);
for (Thread t : threads) {
t.join();
}
assertTrue(succ.get());
}
} }