HBASE-21136 NPE in MetricsTableSourceImpl.updateFlushTime

This commit is contained in:
zhangduo 2018-09-01 20:00:35 +08:00
parent 1a324e3a71
commit 68c5313ca4
3 changed files with 64 additions and 41 deletions

View File

@ -48,19 +48,15 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
}
private void register(MetricsTableSource source) {
synchronized (this) {
source.registerMetrics();
}
source.registerMetrics();
}
@Override
public void deleteTableSource(String table) {
try {
synchronized (this) {
MetricsTableSource source = tableSources.remove(table);
if (source != null) {
source.close();
}
MetricsTableSource source = tableSources.remove(table);
if (source != null) {
source.close();
}
} catch (Exception e) {
// Ignored. If this errors out it means that someone is double
@ -76,17 +72,13 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
if (source != null) {
return source;
}
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createTable(table, wrapper);
MetricsTableSource prev = tableSources.putIfAbsent(table, source);
if (prev != null) {
return prev;
} else {
MetricsTableSource newSource = CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).createTable(table, wrapper);
return tableSources.computeIfAbsent(table, k -> {
// register the new metrics now
register(source);
}
return source;
newSource.registerMetrics();
return newSource;
});
}
/**

View File

@ -15,21 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES_DESC;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES;
@ -74,6 +61,17 @@ import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPL
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC;
import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class MetricsTableSourceImpl implements MetricsTableSource {
@ -123,7 +121,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
public MetricsTableSourceImpl(String tblName,
MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) {
LOG.debug("Creating new MetricsTableSourceImpl for table ");
LOG.debug("Creating new MetricsTableSourceImpl for table '{}'", tblName);
this.tableName = TableName.valueOf(tblName);
this.agg = aggregate;
@ -240,17 +238,11 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
if (!(source instanceof MetricsTableSourceImpl)) {
return -1;
}
MetricsTableSourceImpl impl = (MetricsTableSourceImpl) source;
if (impl == null) {
return -1;
}
return Long.compare(hashCode, impl.hashCode);
}
void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
// If there is a close that started be double extra sure
// that we're not getting any locks and not putting data
// into the metrics that should be removed. So early out
@ -263,7 +255,6 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
// This ensures that removes of the metrics
// can't happen while we are putting them back in.
synchronized (this) {
// It's possible that a close happened between checking
// the closed variable and getting the lock.
if (closed.get()) {

View File

@ -17,7 +17,14 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -29,15 +36,19 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({RegionServerTests.class, SmallTests.class})
@Category({ RegionServerTests.class, SmallTests.class })
public class TestMetricsTableAggregate {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
public static MetricsAssertHelper HELPER =
private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class);
private static MetricsAssertHelper HELPER =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
private String tableName = "testTableMetrics";
@ -87,6 +98,7 @@ public class TestMetricsTableAggregate {
HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
}
@Test
public void testFlush() {
rsm.updateFlush(tableName, 1, 2, 3);
HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
@ -139,4 +151,32 @@ public class TestMetricsTableAggregate {
HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
}
private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) {
try {
for (int i = 0; i < round; i++) {
String tn = tableName + "-" + i;
barrier.await(10, TimeUnit.SECONDS);
rsm.updateFlush(tn, 100, 1000, 500);
}
} catch (Exception e) {
LOG.warn("Failed to update metrics", e);
succ.set(false);
}
}
@Test
public void testConcurrentUpdate() throws InterruptedException {
int threadNumber = 10;
int round = 100;
AtomicBoolean succ = new AtomicBoolean(true);
CyclicBarrier barrier = new CyclicBarrier(threadNumber);
Thread[] threads = IntStream.range(0, threadNumber)
.mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i))
.toArray(Thread[]::new);
Stream.of(threads).forEach(Thread::start);
for (Thread t : threads) {
t.join();
}
assertTrue(succ.get());
}
}