HBASE-14166 Per-Region metrics can be stale
This commit is contained in:
parent
737f264509
commit
ad2c7c6336
|
@ -46,6 +46,9 @@ public interface MetricsRegionAggregateSource extends BaseSource {
|
|||
*/
|
||||
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
|
||||
|
||||
String NUM_REGIONS = "numRegions";
|
||||
String NUMBER_OF_REGIONS_DESC = "Number of regions in the metrics system";
|
||||
|
||||
/**
|
||||
* Register a MetricsRegionSource as being open.
|
||||
*
|
||||
|
|
|
@ -84,6 +84,8 @@ public interface MetricsRegionWrapper {
|
|||
|
||||
long getNumCompactionsCompleted();
|
||||
|
||||
int getRegionHashCode();
|
||||
|
||||
/**
|
||||
* Get the time spent by coprocessors in this region.
|
||||
*/
|
||||
|
|
|
@ -18,23 +18,28 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.TreeSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.impl.JmxCacheBuster;
|
||||
import org.apache.hadoop.metrics2.lib.Interns;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsExecutorImpl;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsRegionAggregateSourceImpl extends BaseSourceImpl
|
||||
implements MetricsRegionAggregateSource {
|
||||
|
||||
// lock to guard against concurrent access to regionSources
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final MetricsExecutorImpl executor = new MetricsExecutorImpl();
|
||||
|
||||
private final TreeSet<MetricsRegionSourceImpl> regionSources =
|
||||
new TreeSet<MetricsRegionSourceImpl>();
|
||||
private final HashSet<MetricsRegionSource> regionSources =
|
||||
new HashSet<MetricsRegionSource>();
|
||||
|
||||
public MetricsRegionAggregateSourceImpl() {
|
||||
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
|
||||
|
@ -46,28 +51,49 @@ public class MetricsRegionAggregateSourceImpl extends BaseSourceImpl
|
|||
String metricsContext,
|
||||
String metricsJmxContext) {
|
||||
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
|
||||
|
||||
// Every few mins clean the JMX cache.
|
||||
executor.getExecutor().scheduleWithFixedDelay(new Runnable() {
|
||||
public void run() {
|
||||
JmxCacheBuster.clearJmxCache(true);
|
||||
}
|
||||
}, 5, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(MetricsRegionSource source) {
|
||||
lock.writeLock().lock();
|
||||
Lock l = lock.writeLock();
|
||||
l.lock();
|
||||
try {
|
||||
regionSources.add((MetricsRegionSourceImpl) source);
|
||||
regionSources.add(source);
|
||||
clearCache();
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
l.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deregister(MetricsRegionSource source) {
|
||||
lock.writeLock().lock();
|
||||
public void deregister(MetricsRegionSource toRemove) {
|
||||
Lock l = lock.writeLock();
|
||||
l.lock();
|
||||
try {
|
||||
regionSources.remove(source);
|
||||
regionSources.remove(toRemove);
|
||||
clearCache();
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
l.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void clearCache() {
|
||||
JmxCacheBuster.clearJmxCache(true);
|
||||
executor.getExecutor().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
JmxCacheBuster.clearJmxCache();
|
||||
}
|
||||
}, 5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all
|
||||
* expectations of java programmers. Instead of returning anything Hadoop metrics expects
|
||||
|
@ -78,21 +104,22 @@ public class MetricsRegionAggregateSourceImpl extends BaseSourceImpl
|
|||
*/
|
||||
@Override
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
|
||||
|
||||
MetricsRecordBuilder mrb = collector.addRecord(metricsName);
|
||||
|
||||
if (regionSources != null) {
|
||||
lock.readLock().lock();
|
||||
Lock l = lock.readLock();
|
||||
l.lock();
|
||||
try {
|
||||
for (MetricsRegionSourceImpl regionMetricSource : regionSources) {
|
||||
regionMetricSource.snapshot(mrb, all);
|
||||
for (MetricsRegionSource regionMetricSource : regionSources) {
|
||||
if (regionMetricSource instanceof MetricsRegionSourceImpl) {
|
||||
((MetricsRegionSourceImpl) regionMetricSource).snapshot(mrb, all);
|
||||
}
|
||||
}
|
||||
mrb.addGauge(Interns.info(NUM_REGIONS, NUMBER_OF_REGIONS_DESC), regionSources.size());
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
l.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
metricsRegistry.snapshot(mrb, all);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -34,29 +37,38 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
|
|||
@InterfaceAudience.Private
|
||||
public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
||||
|
||||
private final MetricsRegionWrapper regionWrapper;
|
||||
|
||||
|
||||
private boolean closed = false;
|
||||
private MetricsRegionAggregateSourceImpl agg;
|
||||
private DynamicMetricsRegistry registry;
|
||||
private static final Log LOG = LogFactory.getLog(MetricsRegionSourceImpl.class);
|
||||
|
||||
private String regionNamePrefix;
|
||||
private String regionPutKey;
|
||||
private String regionDeleteKey;
|
||||
private String regionGetKey;
|
||||
private String regionIncrementKey;
|
||||
private String regionAppendKey;
|
||||
private String regionScanNextKey;
|
||||
private MutableCounterLong regionPut;
|
||||
private MutableCounterLong regionDelete;
|
||||
private boolean closed = false;
|
||||
|
||||
private MutableCounterLong regionIncrement;
|
||||
private MutableCounterLong regionAppend;
|
||||
// lock to ensure that lock and pushing metrics can't race.
|
||||
// When changing or acting on the closed boolean this lock must be held.
|
||||
// The write lock must be held when changing closed.
|
||||
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(false);
|
||||
|
||||
private MutableHistogram regionGet;
|
||||
private MutableHistogram regionScanNext;
|
||||
// Non-final so that we can null out the wrapper
|
||||
// This is just paranoia. We really really don't want to
|
||||
// leak a whole region by way of keeping the
|
||||
// regionWrapper around too long.
|
||||
private MetricsRegionWrapper regionWrapper;
|
||||
|
||||
private final MetricsRegionAggregateSourceImpl agg;
|
||||
private final DynamicMetricsRegistry registry;
|
||||
|
||||
private final String regionNamePrefix;
|
||||
private final String regionPutKey;
|
||||
private final String regionDeleteKey;
|
||||
private final String regionGetKey;
|
||||
private final String regionIncrementKey;
|
||||
private final String regionAppendKey;
|
||||
private final String regionScanNextKey;
|
||||
|
||||
private final MutableCounterLong regionPut;
|
||||
private final MutableCounterLong regionDelete;
|
||||
private final MutableCounterLong regionIncrement;
|
||||
private final MutableCounterLong regionAppend;
|
||||
private final MutableHistogram regionGet;
|
||||
private final MutableHistogram regionScanNext;
|
||||
|
||||
public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper,
|
||||
MetricsRegionAggregateSourceImpl aggregate) {
|
||||
|
@ -77,16 +89,16 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
String suffix = "Count";
|
||||
|
||||
regionPutKey = regionNamePrefix + MetricsRegionServerSource.MUTATE_KEY + suffix;
|
||||
regionPut = registry.getLongCounter(regionPutKey, 0l);
|
||||
regionPut = registry.getLongCounter(regionPutKey, 0L);
|
||||
|
||||
regionDeleteKey = regionNamePrefix + MetricsRegionServerSource.DELETE_KEY + suffix;
|
||||
regionDelete = registry.getLongCounter(regionDeleteKey, 0l);
|
||||
regionDelete = registry.getLongCounter(regionDeleteKey, 0L);
|
||||
|
||||
regionIncrementKey = regionNamePrefix + MetricsRegionServerSource.INCREMENT_KEY + suffix;
|
||||
regionIncrement = registry.getLongCounter(regionIncrementKey, 0l);
|
||||
regionIncrement = registry.getLongCounter(regionIncrementKey, 0L);
|
||||
|
||||
regionAppendKey = regionNamePrefix + MetricsRegionServerSource.APPEND_KEY + suffix;
|
||||
regionAppend = registry.getLongCounter(regionAppendKey, 0l);
|
||||
regionAppend = registry.getLongCounter(regionAppendKey, 0L);
|
||||
|
||||
regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY;
|
||||
regionGet = registry.newHistogram(regionGetKey);
|
||||
|
@ -97,21 +109,35 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
Lock lock = readWriteLock.writeLock();
|
||||
lock.lock();
|
||||
try {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
closed = true;
|
||||
agg.deregister(this);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Removing region Metrics: " + regionWrapper.getRegionName());
|
||||
}
|
||||
|
||||
registry.removeMetric(regionPutKey);
|
||||
registry.removeMetric(regionDeleteKey);
|
||||
|
||||
registry.removeMetric(regionIncrementKey);
|
||||
|
||||
registry.removeMetric(regionAppendKey);
|
||||
|
||||
registry.removeMetric(regionGetKey);
|
||||
registry.removeMetric(regionScanNextKey);
|
||||
registry.removeHistogramMetrics(regionGetKey);
|
||||
registry.removeHistogramMetrics(regionScanNextKey);
|
||||
|
||||
regionWrapper = null;
|
||||
|
||||
JmxCacheBuster.clearJmxCache();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -151,7 +177,6 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
|
||||
@Override
|
||||
public int compareTo(MetricsRegionSource source) {
|
||||
|
||||
if (!(source instanceof MetricsRegionSourceImpl))
|
||||
return -1;
|
||||
|
||||
|
@ -160,20 +185,16 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
.compareTo(impl.regionWrapper.getRegionName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return this.regionWrapper.getRegionName().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) return true;
|
||||
if (!(obj instanceof MetricsRegionSourceImpl)) return false;
|
||||
return compareTo((MetricsRegionSourceImpl)obj) == 0;
|
||||
}
|
||||
|
||||
void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
|
||||
if (closed) return;
|
||||
Lock lock = readWriteLock.readLock();
|
||||
|
||||
// Grab the read lock.
|
||||
// This ensures that
|
||||
lock.lock();
|
||||
try {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
mrb.addGauge(
|
||||
Interns.info(regionNamePrefix + MetricsRegionServerSource.STORE_COUNT,
|
||||
|
@ -197,19 +218,29 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
mrb.addCounter(Interns.info(regionNamePrefix + MetricsRegionSource.NUM_FILES_COMPACTED_COUNT,
|
||||
MetricsRegionSource.NUM_FILES_COMPACTED_DESC),
|
||||
this.regionWrapper.getNumFilesCompacted());
|
||||
mrb.addCounter(Interns.info(regionNamePrefix + MetricsRegionServerSource.READ_REQUEST_COUNT,
|
||||
MetricsRegionServerSource.READ_REQUEST_COUNT_DESC),
|
||||
this.regionWrapper.getReadRequestCount());
|
||||
mrb.addCounter(Interns.info(regionNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT,
|
||||
MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC),
|
||||
this.regionWrapper.getWriteRequestCount());
|
||||
|
||||
for (Map.Entry<String, DescriptiveStatistics> entry : this.regionWrapper
|
||||
.getCoprocessorExecutionStatistics()
|
||||
.entrySet()) {
|
||||
DescriptiveStatistics ds = entry.getValue();
|
||||
mrb.addGauge(Interns.info(regionNamePrefix + " " + entry.getKey() + " "
|
||||
+ MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS,
|
||||
MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS_DESC + "Min: "), ds.getMin() / 1000);
|
||||
MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS_DESC + "Min: "),
|
||||
ds.getMin() / 1000);
|
||||
mrb.addGauge(Interns.info(regionNamePrefix + " " + entry.getKey() + " "
|
||||
+ MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS,
|
||||
MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS_DESC + "Mean: "), ds.getMean() / 1000);
|
||||
MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS_DESC + "Mean: "),
|
||||
ds.getMean() / 1000);
|
||||
mrb.addGauge(Interns.info(regionNamePrefix + " " + entry.getKey() + " "
|
||||
+ MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS,
|
||||
MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS_DESC + "Max: "), ds.getMax() / 1000);
|
||||
MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS_DESC + "Max: "),
|
||||
ds.getMax() / 1000);
|
||||
mrb.addGauge(Interns.info(regionNamePrefix + " " + entry.getKey() + " "
|
||||
+ MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS,
|
||||
MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS_DESC + "90th percentile: "), ds
|
||||
|
@ -223,6 +254,19 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
MetricsRegionSource.COPROCESSOR_EXECUTION_STATISTICS_DESC + "99th percentile: "), ds
|
||||
.getPercentile(99d) / 1000);
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return regionWrapper.getRegionHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) return true;
|
||||
return obj instanceof MetricsRegionSourceImpl && compareTo((MetricsRegionSourceImpl) obj) == 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.metrics2.impl;
|
||||
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -36,44 +36,53 @@ import org.apache.hadoop.metrics2.lib.MetricsExecutorImpl;
|
|||
* This class need to be in the o.a.h.metrics2.impl namespace as many of the variables/calls used
|
||||
* are package private.
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||
value="LI_LAZY_INIT_STATIC",
|
||||
justification="Yeah, its weird but its what we want")
|
||||
@InterfaceAudience.Private
|
||||
public class JmxCacheBuster {
|
||||
private static final Log LOG = LogFactory.getLog(JmxCacheBuster.class);
|
||||
private static Object lock = new Object();
|
||||
private static ScheduledFuture fut = null;
|
||||
private static AtomicReference<ScheduledFuture> fut = new AtomicReference<>(null);
|
||||
private static MetricsExecutor executor = new MetricsExecutorImpl();
|
||||
|
||||
private JmxCacheBuster() {
|
||||
// Static only cache.
|
||||
}
|
||||
|
||||
/**
|
||||
* For JMX to forget about all previously exported metrics.
|
||||
*/
|
||||
public static void clearJmxCache() {
|
||||
|
||||
//If there are more then 100 ms before the executor will run then everything should be merged.
|
||||
synchronized (lock) {
|
||||
if (fut == null || (!fut.isDone() && fut.getDelay(TimeUnit.MILLISECONDS) > 100)) return;
|
||||
fut = executor.getExecutor().schedule(new JmxCacheBusterRunnable(), 5, TimeUnit.SECONDS);
|
||||
public static void clearJmxCache() {
|
||||
clearJmxCache(false);
|
||||
}
|
||||
|
||||
public static synchronized void clearJmxCache(boolean force) {
|
||||
//If there are more then 100 ms before the executor will run then everything should be merged.
|
||||
ScheduledFuture future = fut.get();
|
||||
if (!force &&
|
||||
(future == null || (!future.isDone() && future.getDelay(TimeUnit.MILLISECONDS) > 100))) {
|
||||
// BAIL OUT
|
||||
return;
|
||||
}
|
||||
future = executor.getExecutor().schedule(new JmxCacheBusterRunnable(), 5, TimeUnit.SECONDS);
|
||||
fut.set(future);
|
||||
}
|
||||
|
||||
static class JmxCacheBusterRunnable implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Clearing JMX mbean cache.");
|
||||
}
|
||||
|
||||
// This is pretty extreme but it's the best way that
|
||||
// I could find to get metrics to be removed.
|
||||
try {
|
||||
if (DefaultMetricsSystem.instance() != null ) {
|
||||
if (DefaultMetricsSystem.instance() != null) {
|
||||
DefaultMetricsSystem.instance().stop();
|
||||
DefaultMetricsSystem.instance().start();
|
||||
}
|
||||
|
||||
} catch (Exception exception ) {
|
||||
LOG.debug("error clearing the jmx it appears the metrics system hasn't been started", exception);
|
||||
} catch (Exception exception) {
|
||||
LOG.debug("error clearing the jmx it appears the metrics system hasn't been started",
|
||||
exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.metrics2.lib;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
public class DefaultMetricsSystemHelper {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DefaultMetricsSystemHelper.class);
|
||||
private final Method removeObjectMethod;
|
||||
|
||||
public DefaultMetricsSystemHelper() {
|
||||
Method m;
|
||||
try {
|
||||
Class<? extends DefaultMetricsSystem> clazz = DefaultMetricsSystem.INSTANCE.getClass();
|
||||
m = clazz.getDeclaredMethod("removeObjectName", String.class);
|
||||
m.setAccessible(true);
|
||||
} catch (NoSuchMethodException e) {
|
||||
m = null;
|
||||
}
|
||||
removeObjectMethod = m;
|
||||
}
|
||||
|
||||
public boolean removeObjectName(final String name) {
|
||||
if (removeObjectMethod != null) {
|
||||
try {
|
||||
removeObjectMethod.invoke(DefaultMetricsSystem.INSTANCE, name);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Unable to remove object name from cache: " + name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.metrics2.lib;
|
|||
import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics2.MetricsException;
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
|
@ -44,18 +46,29 @@ import com.google.common.collect.Maps;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DynamicMetricsRegistry {
|
||||
private static final Log LOG = LogFactory.getLog(DynamicMetricsRegistry.class);
|
||||
|
||||
private final ConcurrentMap<String, MutableMetric> metricsMap =
|
||||
Maps.newConcurrentMap();
|
||||
private final ConcurrentMap<String, MetricsTag> tagsMap =
|
||||
Maps.newConcurrentMap();
|
||||
private final MetricsInfo metricsInfo;
|
||||
private final DefaultMetricsSystemHelper helper = new DefaultMetricsSystemHelper();
|
||||
private final static String[] histogramSuffixes = new String[]{
|
||||
"_num_ops",
|
||||
"_min",
|
||||
"_max",
|
||||
"_median",
|
||||
"_75th_percentile",
|
||||
"_95th_percentile",
|
||||
"_99th_percentile"};
|
||||
|
||||
/**
|
||||
* Construct the registry with a record name
|
||||
* @param name of the record of the metrics
|
||||
*/
|
||||
public DynamicMetricsRegistry(String name) {
|
||||
metricsInfo = Interns.info(name, name);
|
||||
this(Interns.info(name,name));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -405,9 +418,16 @@ public class DynamicMetricsRegistry {
|
|||
* @param name name of the metric to remove
|
||||
*/
|
||||
public void removeMetric(String name) {
|
||||
helper.removeObjectName(name);
|
||||
metricsMap.remove(name);
|
||||
}
|
||||
|
||||
public void removeHistogramMetrics(String baseName) {
|
||||
for (String suffix:histogramSuffixes) {
|
||||
removeMetric(baseName+suffix);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a MetricMutableGaugeLong from the storage. If it is not there atomically put it.
|
||||
*
|
||||
|
@ -540,6 +560,9 @@ public class DynamicMetricsRegistry {
|
|||
}
|
||||
|
||||
public void clearMetrics() {
|
||||
for (String name:metricsMap.keySet()) {
|
||||
helper.removeObjectName(name);
|
||||
}
|
||||
metricsMap.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.metrics2.MetricsExecutor;
|
||||
|
||||
/**
|
||||
* Class to handle the ScheduledExecutorService{@link ScheduledExecutorService} used by MetricMutableQuantiles{@link MetricMutableQuantiles}
|
||||
* Class to handle the ScheduledExecutorService{@link ScheduledExecutorService} used by
|
||||
* MetricMutableQuantiles{@link MetricMutableQuantiles}, MetricsRegionAggregateSourceImpl, and
|
||||
* JmxCacheBuster
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsExecutorImpl implements MetricsExecutor {
|
||||
|
@ -46,11 +48,11 @@ public class MetricsExecutorImpl implements MetricsExecutor {
|
|||
|
||||
private enum ExecutorSingleton {
|
||||
INSTANCE;
|
||||
|
||||
private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1, new ThreadPoolExecutorThreadFactory("HBase-Metrics2-"));
|
||||
private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1,
|
||||
new ThreadPoolExecutorThreadFactory("HBase-Metrics2-"));
|
||||
}
|
||||
|
||||
private static class ThreadPoolExecutorThreadFactory implements ThreadFactory {
|
||||
private final static class ThreadPoolExecutorThreadFactory implements ThreadFactory {
|
||||
private final String name;
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
|
||||
|
|
|
@ -124,6 +124,11 @@ public class TestMetricsRegionSourceImpl {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRegionHashCode() {
|
||||
return regionName.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
|
||||
return null;
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MetricsRegion {
|
||||
|
||||
private final MetricsRegionSource source;
|
||||
private MetricsRegionWrapper regionWrapper;
|
||||
|
||||
|
|
|
@ -135,6 +135,11 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
return this.region.compactionsFinished.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRegionHashCode() {
|
||||
return this.region.hashCode();
|
||||
}
|
||||
|
||||
public class HRegionMetricsWrapperRunnable implements Runnable {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -85,6 +85,11 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRegionHashCode() {
|
||||
return 42;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
|
||||
return new HashMap<String, DescriptiveStatistics>();
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompatibilityFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
public class TestRemoveRegionMetrics {
|
||||
|
||||
private static MiniHBaseCluster cluster;
|
||||
private static Configuration conf;
|
||||
private static HBaseTestingUtility TEST_UTIL;
|
||||
private static MetricsAssertHelper metricsHelper;
|
||||
|
||||
@BeforeClass
|
||||
public static void startCluster() throws Exception {
|
||||
metricsHelper = CompatibilityFactory.getInstance(MetricsAssertHelper.class);
|
||||
TEST_UTIL = new HBaseTestingUtility();
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.getLong("hbase.splitlog.max.resubmit", 0);
|
||||
// Make the failure test faster
|
||||
conf.setInt("zookeeper.recovery.retry", 0);
|
||||
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
|
||||
|
||||
TEST_UTIL.startMiniCluster(1, 2);
|
||||
cluster = TEST_UTIL.getHBaseCluster();
|
||||
|
||||
cluster.waitForActiveAndReadyMaster();
|
||||
|
||||
while (cluster.getLiveRegionServerThreads().size() < 2) {
|
||||
Threads.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMoveRegion() throws IOException, InterruptedException {
|
||||
String tableNameString = "testMoveRegion";
|
||||
TableName tableName = TableName.valueOf(tableNameString);
|
||||
Table t = TEST_UTIL.createTable(tableName, Bytes.toBytes("D"));
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(t.getName());
|
||||
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
|
||||
HRegionInfo regionInfo;
|
||||
byte[] row = Bytes.toBytes("r1");
|
||||
|
||||
|
||||
for (int i = 0; i < 30; i++) {
|
||||
boolean moved = false;
|
||||
try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
|
||||
regionInfo = locator.getRegionLocation(row, true).getRegionInfo();
|
||||
}
|
||||
|
||||
int currentServerIdx = cluster.getServerWith(regionInfo.getRegionName());
|
||||
int destServerIdx = (currentServerIdx +1)% cluster.getLiveRegionServerThreads().size();
|
||||
HRegionServer currentServer = cluster.getRegionServer(currentServerIdx);
|
||||
HRegionServer destServer = cluster.getRegionServer(destServerIdx);
|
||||
byte[] destServerName = Bytes.toBytes(destServer.getServerName().getServerName());
|
||||
|
||||
|
||||
// Do a put. The counters should be non-zero now
|
||||
Put p = new Put(row);
|
||||
p.addColumn(Bytes.toBytes("D"), Bytes.toBytes("Zero"), Bytes.toBytes("VALUE"));
|
||||
t.put(p);
|
||||
|
||||
|
||||
MetricsRegionAggregateSource currentAgg = currentServer.getRegion(regionInfo.getRegionName())
|
||||
.getMetrics()
|
||||
.getSource()
|
||||
.getAggregateSource();
|
||||
|
||||
String prefix = "namespace_"+ NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+
|
||||
"_table_"+tableNameString +
|
||||
"_region_" + regionInfo.getEncodedName()+
|
||||
"_metric";
|
||||
|
||||
metricsHelper.assertCounter(prefix + "_mutateCount", 1, currentAgg);
|
||||
|
||||
|
||||
try {
|
||||
admin.move(regionInfo.getEncodedNameAsBytes(), destServerName);
|
||||
moved = true;
|
||||
Thread.sleep(5000);
|
||||
} catch (IOException ioe) {
|
||||
moved = false;
|
||||
}
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(t.getName());
|
||||
|
||||
if (moved) {
|
||||
MetricsRegionAggregateSource destAgg = destServer.getRegion(regionInfo.getRegionName())
|
||||
.getMetrics()
|
||||
.getSource()
|
||||
.getAggregateSource();
|
||||
metricsHelper.assertCounter(prefix + "_mutateCount", 0, destAgg);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue