HBASE-5788 Move Dynamic Metrics storage off of HRegion

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1327316 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-04-17 23:01:30 +00:00
parent 183e36e43b
commit b10f06129b
8 changed files with 180 additions and 118 deletions

View File

@ -109,6 +109,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -336,86 +337,6 @@ public class HRegion implements HeapSize { // , Writable{
private HTableDescriptor htableDescriptor = null;
private RegionSplitPolicy splitPolicy;
// for simple numeric metrics (# of blocks read from block cache)
public static final ConcurrentMap<String, AtomicLong> numericMetrics = new ConcurrentHashMap<String, AtomicLong>();
public static final String METRIC_GETSIZE = "getsize";
public static final String METRIC_NEXTSIZE = "nextsize";
// for simple numeric metrics (current block cache size)
// These ones are not reset to zero when queried, unlike the previous.
public static final ConcurrentMap<String, AtomicLong> numericPersistentMetrics = new ConcurrentHashMap<String, AtomicLong>();
/**
* Used for metrics where we want track a metrics (such as latency) over a
* number of operations.
*/
public static final ConcurrentMap<String, Pair<AtomicLong, AtomicInteger>>
timeVaryingMetrics = new ConcurrentHashMap<String,
Pair<AtomicLong, AtomicInteger>>();
public static void incrNumericMetric(String key, long amount) {
AtomicLong oldVal = numericMetrics.get(key);
if (oldVal == null) {
oldVal = numericMetrics.putIfAbsent(key, new AtomicLong(amount));
if (oldVal == null)
return;
}
oldVal.addAndGet(amount);
}
public static void setNumericMetric(String key, long amount) {
numericMetrics.put(key, new AtomicLong(amount));
}
public static void incrTimeVaryingMetric(String key, long amount) {
Pair<AtomicLong, AtomicInteger> oldVal = timeVaryingMetrics.get(key);
if (oldVal == null) {
oldVal = timeVaryingMetrics.putIfAbsent(key,
new Pair<AtomicLong, AtomicInteger>(new AtomicLong(amount),
new AtomicInteger(1)));
if (oldVal == null)
return;
}
oldVal.getFirst().addAndGet(amount); // total time
oldVal.getSecond().incrementAndGet(); // increment ops by 1
}
public static void incrNumericPersistentMetric(String key, long amount) {
AtomicLong oldVal = numericPersistentMetrics.get(key);
if (oldVal == null) {
oldVal = numericPersistentMetrics
.putIfAbsent(key, new AtomicLong(amount));
if (oldVal == null)
return;
}
oldVal.addAndGet(amount);
}
public static long getNumericMetric(String key) {
AtomicLong m = numericMetrics.get(key);
if (m == null)
return 0;
return m.get();
}
public static Pair<Long, Integer> getTimeVaryingMetric(String key) {
Pair<AtomicLong, AtomicInteger> pair = timeVaryingMetrics.get(key);
if (pair == null) {
return new Pair<Long, Integer>(0L, 0);
}
return new Pair<Long, Integer>(pair.getFirst().get(),
pair.getSecond().get());
}
static long getNumericPersistentMetric(String key) {
AtomicLong m = numericPersistentMetrics.get(key);
if (m == null)
return 0;
return m.get();
}
/**
* Should only be used for testing purposes
*/
@ -1890,7 +1811,7 @@ public class HRegion implements HeapSize { // , Writable{
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
getTableDesc().getNameAsString(), familyMap.keySet());
if (!metricPrefix.isEmpty()) {
HRegion.incrTimeVaryingMetric(metricPrefix + "delete_", after - now);
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "delete_", after - now);
}
if (flush) {
@ -2281,7 +2202,7 @@ public class HRegion implements HeapSize { // , Writable{
if (metricPrefix == null) {
metricPrefix = SchemaMetrics.CF_BAD_FAMILY_PREFIX;
}
HRegion.incrTimeVaryingMetric(metricPrefix + "multiput_",
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "multiput_",
endTimeMs - startTimeMs);
if (!success) {
@ -2540,7 +2461,7 @@ public class HRegion implements HeapSize { // , Writable{
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
this.getTableDesc().getNameAsString(), familyMap.keySet());
if (!metricPrefix.isEmpty()) {
HRegion.incrTimeVaryingMetric(metricPrefix + "put_", after - now);
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "put_", after - now);
}
if (flush) {
@ -4155,7 +4076,7 @@ public class HRegion implements HeapSize { // , Writable{
RegionScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results, HRegion.METRIC_GETSIZE);
scanner.next(results, SchemaMetrics.METRIC_GETSIZE);
} finally {
if (scanner != null)
scanner.close();
@ -4171,7 +4092,7 @@ public class HRegion implements HeapSize { // , Writable{
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
this.getTableDesc().getNameAsString(), get.familySet());
if (!metricPrefix.isEmpty()) {
HRegion.incrTimeVaryingMetric(metricPrefix + "get_", after - now);
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "get_", after - now);
}
return results;
@ -4245,14 +4166,14 @@ public class HRegion implements HeapSize { // , Writable{
processor.postProcess(this, walEdit);
} catch (IOException e) {
long endNanoTime = System.nanoTime();
HRegion.incrTimeVaryingMetric(metricsName + ".error.nano",
RegionMetricsStorage.incrTimeVaryingMetric(metricsName + ".error.nano",
endNanoTime - startNanoTime);
throw e;
} finally {
closeRegionOperation();
}
final long endNanoTime = System.nanoTime();
HRegion.incrTimeVaryingMetric(metricsName + ".nano",
RegionMetricsStorage.incrTimeVaryingMetric(metricsName + ".nano",
endNanoTime - startNanoTime);
return;
}
@ -4362,7 +4283,7 @@ public class HRegion implements HeapSize { // , Writable{
} catch (IOException e) {
long endNanoTime = System.nanoTime();
HRegion.incrTimeVaryingMetric(metricsName + ".error.nano",
RegionMetricsStorage.incrTimeVaryingMetric(metricsName + ".error.nano",
endNanoTime - startNanoTime);
throw e;
} finally {
@ -4374,19 +4295,19 @@ public class HRegion implements HeapSize { // , Writable{
}
// Populate all metrics
long endNanoTime = System.nanoTime();
HRegion.incrTimeVaryingMetric(metricsName + ".nano",
RegionMetricsStorage.incrTimeVaryingMetric(metricsName + ".nano",
endNanoTime - startNanoTime);
HRegion.incrTimeVaryingMetric(metricsName + ".acquirelock.nano",
RegionMetricsStorage.incrTimeVaryingMetric(metricsName + ".acquirelock.nano",
lockedNanoTime - startNanoTime);
HRegion.incrTimeVaryingMetric(metricsName + ".process.nano",
RegionMetricsStorage.incrTimeVaryingMetric(metricsName + ".process.nano",
processDoneNanoTime - lockedNanoTime);
HRegion.incrTimeVaryingMetric(metricsName + ".occupylock.nano",
RegionMetricsStorage.incrTimeVaryingMetric(metricsName + ".occupylock.nano",
unlockedNanoTime - lockedNanoTime);
HRegion.incrTimeVaryingMetric(metricsName + ".sync.nano",
RegionMetricsStorage.incrTimeVaryingMetric(metricsName + ".sync.nano",
endNanoTime - unlockedNanoTime);
}
@ -4783,7 +4704,7 @@ public class HRegion implements HeapSize { // , Writable{
long after = EnvironmentEdgeManager.currentTimeMillis();
String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
getTableDesc().getName(), family);
HRegion.incrTimeVaryingMetric(metricPrefix + "increment_", after - before);
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "increment_", after - before);
if (flush) {
// Request a cache flush. Do it outside update lock.

View File

@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
@ -1256,7 +1257,7 @@ public class HRegionServer extends RegionServer
}
for (Entry<String, MutableDouble> e : tempVals.entrySet()) {
HRegion.setNumericMetric(e.getKey(), e.getValue().longValue());
RegionMetricsStorage.setNumericMetric(e.getKey(), e.getValue().longValue());
}
this.metrics.stores.set(stores);
@ -2245,7 +2246,7 @@ public class HRegionServer extends RegionServer
&& currentScanResultSize < maxScannerResultSize; i++) {
requestCount.incrementAndGet();
// Collect values to be returned here
boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE);
boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
if (!values.isEmpty()) {
for (KeyValue kv : values) {
currentScanResultSize += kv.heapSize();

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@ -681,7 +682,7 @@ public abstract class RegionServer implements
for (int i = 0; i < rows
&& currentScanResultSize < maxScannerResultSize; i++) {
// Collect values to be returned here
boolean moreRows = scanner.next(values, HRegion.METRIC_NEXTSIZE);
boolean moreRows = scanner.next(values, SchemaMetrics.METRIC_NEXTSIZE);
if (!values.isEmpty()) {
for (KeyValue kv : values) {
currentScanResultSize += kv.heapSize();

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -373,7 +374,7 @@ class StoreScanner extends NonLazyKeyValueScanner
results.add(kv);
if (metric != null) {
HRegion.incrNumericMetric(this.metricNamePrefix + metric,
RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric,
kv.getLength());
}

View File

@ -0,0 +1,130 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver.metrics;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Pair;
/**
* This class if for maintaining the maps used to power metrics for hfiles,
* regions, and regionservers. It has methods to mutate and get state of metrics
* numbers. These numbers are exposed to Hadoop metrics through
* RegionServerDynamicMetrics.
*/
@InterfaceAudience.Private
public class RegionMetricsStorage {
// for simple numeric metrics (# of blocks read from block cache)
private static final ConcurrentMap<String, AtomicLong> numericMetrics =
new ConcurrentHashMap<String, AtomicLong>();
// for simple numeric metrics (current block cache size)
// These ones are not reset to zero when queried, unlike the previous.
private static final ConcurrentMap<String, AtomicLong> numericPersistentMetrics =
new ConcurrentHashMap<String, AtomicLong>();
/**
* Used for metrics where we want track a metrics (such as latency) over a
* number of operations.
*/
private static final ConcurrentMap<String, Pair<AtomicLong, AtomicInteger>> timeVaryingMetrics =
new ConcurrentHashMap<String, Pair<AtomicLong, AtomicInteger>>();
public static Map<String, AtomicLong> getNumericMetrics() {
return numericMetrics;
}
public static Map<String, AtomicLong> getNumericPersistentMetrics() {
return numericPersistentMetrics;
}
public static Map<String, Pair<AtomicLong, AtomicInteger>> getTimeVaryingMetrics() {
return timeVaryingMetrics;
}
public static void incrNumericMetric(String key, long amount) {
AtomicLong oldVal = numericMetrics.get(key);
if (oldVal == null) {
oldVal = numericMetrics.putIfAbsent(key, new AtomicLong(amount));
if (oldVal == null)
return;
}
oldVal.addAndGet(amount);
}
public static void incrTimeVaryingMetric(String key, long amount) {
Pair<AtomicLong, AtomicInteger> oldVal = timeVaryingMetrics.get(key);
if (oldVal == null) {
oldVal =
timeVaryingMetrics.putIfAbsent(key,
new Pair<AtomicLong, AtomicInteger>(
new AtomicLong(amount),
new AtomicInteger(1)));
if (oldVal == null)
return;
}
oldVal.getFirst().addAndGet(amount); // total time
oldVal.getSecond().incrementAndGet(); // increment ops by 1
}
public static void incrNumericPersistentMetric(String key, long amount) {
AtomicLong oldVal = numericPersistentMetrics.get(key);
if (oldVal == null) {
oldVal = numericPersistentMetrics.putIfAbsent(key, new AtomicLong(amount));
if (oldVal == null)
return;
}
oldVal.addAndGet(amount);
}
public static void setNumericMetric(String key, long amount) {
numericMetrics.put(key, new AtomicLong(amount));
}
public static long getNumericMetric(String key) {
AtomicLong m = numericMetrics.get(key);
if (m == null)
return 0;
return m.get();
}
public static Pair<Long, Integer> getTimeVaryingMetric(String key) {
Pair<AtomicLong, AtomicInteger> pair = timeVaryingMetrics.get(key);
if (pair == null) {
return new Pair<Long, Integer>(0L, 0);
}
return new Pair<Long, Integer>(pair.getFirst().get(), pair.getSecond().get());
}
public static long getNumericPersistentMetric(String key) {
AtomicLong m = numericPersistentMetrics.get(key);
if (m == null)
return 0;
return m.get();
}
}

View File

@ -133,17 +133,17 @@ public class RegionServerDynamicMetrics implements Updater {
*/
public void doUpdates(MetricsContext context) {
/* get dynamically created numeric metrics, and push the metrics */
for (Entry<String, AtomicLong> entry : HRegion.numericMetrics.entrySet()) {
for (Entry<String, AtomicLong> entry : RegionMetricsStorage.getNumericMetrics().entrySet()) {
this.setNumericMetric(entry.getKey(), entry.getValue().getAndSet(0));
}
/* get dynamically created numeric metrics, and push the metrics.
* These ones aren't to be reset; they are cumulative. */
for (Entry<String, AtomicLong> entry : HRegion.numericPersistentMetrics.entrySet()) {
for (Entry<String, AtomicLong> entry : RegionMetricsStorage.getNumericPersistentMetrics().entrySet()) {
this.setNumericMetric(entry.getKey(), entry.getValue().get());
}
/* get dynamically created time varying metrics, and push the metrics */
for (Entry<String, Pair<AtomicLong, AtomicInteger>> entry :
HRegion.timeVaryingMetrics.entrySet()) {
RegionMetricsStorage.getTimeVaryingMetrics().entrySet()) {
Pair<AtomicLong, AtomicInteger> value = entry.getValue();
this.incrTimeVaryingMetric(entry.getKey(),
value.getFirst().getAndSet(0),

View File

@ -29,6 +29,9 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -39,12 +42,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/**
* A collection of metric names in a given column family or a (table, column
* A names in a given column family or a (table, column
* family) combination. The following "dimensions" are supported:
* <ul>
* <li>Table name (optional; enabled based on configuration)</li>
@ -303,6 +305,11 @@ public class SchemaMetrics {
}
}
public static final String METRIC_GETSIZE = "getsize";
public static final String METRIC_NEXTSIZE = "nextsize";
/**
* Returns a {@link SchemaMetrics} object for the given table and column
* family, instantiating it if necessary.
@ -366,7 +373,7 @@ public class SchemaMetrics {
if (blockCategory == null) {
blockCategory = BlockCategory.UNKNOWN; // So that we see this in stats.
}
HRegion.incrNumericMetric(getBlockMetricName(blockCategory,
RegionMetricsStorage.incrNumericMetric(getBlockMetricName(blockCategory,
isCompaction, metricType), 1);
if (blockCategory != BlockCategory.ALL_CATEGORIES) {
@ -377,7 +384,7 @@ public class SchemaMetrics {
private void addToReadTime(BlockCategory blockCategory,
boolean isCompaction, long timeMs) {
HRegion.incrTimeVaryingMetric(getBlockMetricName(blockCategory,
RegionMetricsStorage.incrTimeVaryingMetric(getBlockMetricName(blockCategory,
isCompaction, BlockMetricType.READ_TIME), timeMs);
// Also update the read time aggregated across all block categories
@ -433,7 +440,7 @@ public class SchemaMetrics {
*/
public void updatePersistentStoreMetric(StoreMetricType storeMetricType,
long value) {
HRegion.incrNumericPersistentMetric(
RegionMetricsStorage.incrNumericPersistentMetric(
storeMetricNames[storeMetricType.ordinal()], value);
}
@ -478,7 +485,7 @@ public class SchemaMetrics {
if (category == null) {
category = BlockCategory.ALL_CATEGORIES;
}
HRegion.incrNumericPersistentMetric(getBlockMetricName(category, false,
RegionMetricsStorage.incrNumericPersistentMetric(getBlockMetricName(category, false,
BlockMetricType.CACHE_SIZE), cacheSizeDelta);
if (category != BlockCategory.ALL_CATEGORIES) {
@ -502,7 +509,7 @@ public class SchemaMetrics {
* positives/negatives as specified by the argument.
*/
public void updateBloomMetrics(boolean isInBloom) {
HRegion.incrNumericMetric(getBloomMetricName(isInBloom), 1);
RegionMetricsStorage.incrNumericMetric(getBloomMetricName(isInBloom), 1);
if (this != ALL_SCHEMA_METRICS) {
ALL_SCHEMA_METRICS.updateBloomMetrics(isInBloom);
}
@ -731,11 +738,11 @@ public class SchemaMetrics {
long metricValue;
if (isTimeVaryingKey(metricName)) {
Pair<Long, Integer> totalAndCount =
HRegion.getTimeVaryingMetric(stripTimeVaryingSuffix(metricName));
RegionMetricsStorage.getTimeVaryingMetric(stripTimeVaryingSuffix(metricName));
metricValue = metricName.endsWith(TOTAL_SUFFIX) ?
totalAndCount.getFirst() : totalAndCount.getSecond();
} else {
metricValue = HRegion.getNumericMetric(metricName);
metricValue = RegionMetricsStorage.getNumericMetric(metricName);
}
metricsSnapshot.put(metricName, metricValue);

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.
StoreMetricType;
@ -90,7 +91,7 @@ public class TestRegionServerMetrics {
Long startValue = startingMetrics.get(storeMetricName);
assertEquals("Invalid value for store metric " + storeMetricName
+ " (type " + storeMetricType + ")", expected,
HRegion.getNumericMetric(storeMetricName)
RegionMetricsStorage.getNumericMetric(storeMetricName)
- (startValue != null ? startValue : 0));
}
@ -130,7 +131,7 @@ public class TestRegionServerMetrics {
final String storeMetricName = ALL_METRICS
.getStoreMetricNameMax(StoreMetricType.STORE_FILE_COUNT);
assertEquals("Invalid value for store metric " + storeMetricName,
NUM_FLUSHES, HRegion.getNumericMetric(storeMetricName));
NUM_FLUSHES, RegionMetricsStorage.getNumericMetric(storeMetricName));
}
@ -144,14 +145,14 @@ public class TestRegionServerMetrics {
for (int i =0; i < cfs.length; ++i) {
String prefix = SchemaMetrics.generateSchemaMetricsPrefix(table, cfs[i]);
String getMetric = prefix + HRegion.METRIC_GETSIZE;
String nextMetric = prefix + HRegion.METRIC_NEXTSIZE;
String getMetric = prefix + SchemaMetrics.METRIC_GETSIZE;
String nextMetric = prefix + SchemaMetrics.METRIC_NEXTSIZE;
// verify getsize and nextsize matches
int getSize = HRegion.numericMetrics.containsKey(getMetric) ?
HRegion.numericMetrics.get(getMetric).intValue() : 0;
int nextSize = HRegion.numericMetrics.containsKey(nextMetric) ?
HRegion.numericMetrics.get(nextMetric).intValue() : 0;
int getSize = RegionMetricsStorage.getNumericMetrics().containsKey(getMetric) ?
RegionMetricsStorage.getNumericMetrics().get(getMetric).intValue() : 0;
int nextSize = RegionMetricsStorage.getNumericMetrics().containsKey(nextMetric) ?
RegionMetricsStorage.getNumericMetrics().get(nextMetric).intValue() : 0;
assertEquals(metrics[i], getSize);
assertEquals(metrics[cfs.length + i], nextSize);