HBASE-3614 Expose per-region request rate metrics

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1328140 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-04-19 22:45:30 +00:00
parent 13b35ce82c
commit f9fb38e31c
5 changed files with 378 additions and 42 deletions

View File

@ -38,6 +38,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
@ -109,6 +110,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.metrics.OperationMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -336,6 +338,7 @@ public class HRegion implements HeapSize { // , Writable{
public final static String REGIONINFO_FILE = ".regioninfo";
private HTableDescriptor htableDescriptor = null;
private RegionSplitPolicy splitPolicy;
private final OperationMetrics opMetrics;
/**
* Should only be used for testing purposes
@ -358,6 +361,8 @@ public class HRegion implements HeapSize { // , Writable{
this.threadWakeFrequency = 0L;
this.coprocessorHost = null;
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.opMetrics = new OperationMetrics();
}
/**
@ -419,6 +424,8 @@ public class HRegion implements HeapSize { // , Writable{
setHTableSpecificConf();
this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.opMetrics = new OperationMetrics(conf, this.regionInfo);
/*
* timestamp.slop provides a server-side constraint on the timestamp. This
@ -1817,11 +1824,7 @@ public class HRegion implements HeapSize { // , Writable{
coprocessorHost.postDelete(delete, walEdit, writeToWAL);
}
final long after = EnvironmentEdgeManager.currentTimeMillis();
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
getTableDesc().getNameAsString(), familyMap.keySet());
if (!metricPrefix.isEmpty()) {
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "delete_", after - now);
}
this.opMetrics.updateDeleteMetrics(familyMap.keySet(), after-now);
if (flush) {
// Request a cache flush. Do it outside update lock.
@ -1967,11 +1970,15 @@ public class HRegion implements HeapSize { // , Writable{
@SuppressWarnings("unchecked")
private long doMiniBatchPut(
BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
String metricPrefix = null;
final String tableName = getTableDesc().getNameAsString();
// variable to note if all Put items are for the same CF -- metrics related
boolean cfSetConsistent = true;
//The set of columnFamilies first seen.
Set<byte[]> cfSet = null;
long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
WALEdit walEdit = new WALEdit();
@ -2051,19 +2058,13 @@ public class HRegion implements HeapSize { // , Writable{
lastIndexExclusive++;
numReadyToWrite++;
// If first time around, designate a prefix for metrics based on the CF
// set. After that, watch for inconsistencies.
final String curMetricPrefix =
SchemaMetrics.generateSchemaMetricsPrefix(tableName,
put.getFamilyMap().keySet());
if (metricPrefix == null) {
metricPrefix = curMetricPrefix;
} else if (cfSetConsistent && !metricPrefix.equals(curMetricPrefix)) {
// The column family set for this batch put is undefined.
cfSetConsistent = false;
metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(tableName,
SchemaMetrics.UNKNOWN);
//If Column Families stay consistent through out all of the
//individual puts then metrics can be reported as a mutliput across
//column families in the first put.
if (cfSet == null) {
cfSet = put.getFamilyMap().keySet();
} else {
cfSetConsistent = cfSetConsistent && put.getFamilyMap().keySet().equals(cfSet);
}
}
@ -2208,11 +2209,12 @@ public class HRegion implements HeapSize { // , Writable{
// do after lock
final long endTimeMs = EnvironmentEdgeManager.currentTimeMillis();
if (metricPrefix == null) {
metricPrefix = SchemaMetrics.CF_BAD_FAMILY_PREFIX;
}
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "multiput_",
endTimeMs - startTimeMs);
//See if the column families were consistent through the whole thing.
//if they were then keep them. If they were not then pass a null.
//null will be treated as unknown.
final Set<byte[]> keptCfs = cfSetConsistent ? cfSet : null;
this.opMetrics.updateMultiPutMetrics(keptCfs, endTimeMs - startTimeMs);
if (!success) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
@ -2467,11 +2469,8 @@ public class HRegion implements HeapSize { // , Writable{
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
this.getTableDesc().getNameAsString(), familyMap.keySet());
if (!metricPrefix.isEmpty()) {
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "put_", after - now);
}
this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
if (flush) {
// Request a cache flush. Do it outside update lock.
@ -4098,11 +4097,7 @@ public class HRegion implements HeapSize { // , Writable{
// do after lock
final long after = EnvironmentEdgeManager.currentTimeMillis();
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
this.getTableDesc().getNameAsString(), get.familySet());
if (!metricPrefix.isEmpty()) {
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "get_", after - now);
}
this.opMetrics.updateGetMetrics(get.familySet(), after - now);
return results;
}
@ -4503,11 +4498,16 @@ public class HRegion implements HeapSize { // , Writable{
} finally {
closeRegionOperation();
}
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - now);
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
}
return append.isReturnResults() ? new Result(allKVs) : null;
}
@ -4614,7 +4614,10 @@ public class HRegion implements HeapSize { // , Writable{
} finally {
closeRegionOperation();
}
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - now);
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@ -4711,10 +4714,8 @@ public class HRegion implements HeapSize { // , Writable{
// do after lock
long after = EnvironmentEdgeManager.currentTimeMillis();
String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
getTableDesc().getName(), family);
RegionMetricsStorage.incrTimeVaryingMetric(metricPrefix + "increment_", after - before);
this.opMetrics.updateIncrementColumnValueMetrics(family, after - before);
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@ -4743,7 +4744,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
32 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
33 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
(6 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);

View File

@ -0,0 +1,207 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver.metrics;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
/**
* This class provides a simplified interface to expose time varying metrics
* about GET/DELETE/PUT/ICV operations on a region and on Column Families. All
* metrics are stored in {@link RegionMetricsStorage} and exposed to hadoop
* metrics through {@link RegionServerDynamicMetrics}.
*/
@InterfaceAudience.Private
public class OperationMetrics {
private static final String DELETE_KEY = "delete_";
private static final String PUT_KEY = "put_";
private static final String GET_KEY = "get_";
private static final String ICV_KEY = "incrementColumnValue_";
private static final String INCREMENT_KEY = "increment_";
private static final String MULTIPUT_KEY = "multiput_";
private static final String APPEND_KEY = "append_";
/** Conf key controlling whether we should expose metrics.*/
private static final String CONF_KEY =
"hbase.metrics.exposeOperationTimes";
private String tableName = null;
private String regionName = null;
private String regionMetrixPrefix = null;
private Configuration conf = null;
/**
* Create a new OperationMetrics
* @param conf The Configuration of the HRegion reporting operations coming in.
* @param regionInfo The region info
*/
public OperationMetrics(Configuration conf, HRegionInfo regionInfo) {
// Configure SchemaMetrics before trying to create a RegionOperationMetrics instance as
// RegionOperationMetrics relies on SchemaMetrics to do naming.
if (conf != null) {
SchemaMetrics.configureGlobally(conf);
this.conf = conf;
if (regionInfo != null) {
this.tableName = regionInfo.getTableNameAsString();
this.regionName = regionInfo.getEncodedName();
} else {
this.tableName = SchemaMetrics.UNKNOWN;
this.regionName = SchemaMetrics.UNKNOWN;
}
this.regionMetrixPrefix =
SchemaMetrics.generateRegionMetricsPrefix(this.tableName, this.regionName);
}
}
/**
* This is used in creating a testing HRegion where the regionInfo is unknown
* @param conf
*/
public OperationMetrics() {
this(null, null);
}
/**
* Update the stats associated with {@link HTable#put(java.util.List)}.
*
* @param columnFamilies Set of CF's this multiput is associated with
* @param value the time
*/
public void updateMultiPutMetrics(Set<byte[]> columnFamilies, long value) {
doUpdateTimeVarying(columnFamilies, MULTIPUT_KEY, value);
}
/**
* Update the metrics associated with a {@link Get}
*
* @param columnFamilies
* Set of Column Families in this get.
* @param value
* the time
*/
public void updateGetMetrics(Set<byte[]> columnFamilies, long value) {
doUpdateTimeVarying(columnFamilies, GET_KEY, value);
}
/**
* Update metrics associated with an {@link Increment}
* @param columnFamilies
* @param value
*/
public void updateIncrementMetrics(Set<byte[]> columnFamilies, long value) {
doUpdateTimeVarying(columnFamilies, INCREMENT_KEY, value);
}
/**
* Update the metrics associated with an {@link Append}
* @param columnFamilies
* @param value
*/
public void updateAppendMetrics(Set<byte[]> columnFamilies, long value) {
doUpdateTimeVarying(columnFamilies, APPEND_KEY, value);
}
/**
* Update the metrics associated with
* {@link HTable#incrementColumnValue(byte[], byte[], byte[], long)}
*
* @param columnFamily
* The single column family associated with an ICV
* @param value
* the time
*/
public void updateIncrementColumnValueMetrics(byte[] columnFamily, long value) {
String cfMetricPrefix =
SchemaMetrics.generateSchemaMetricsPrefix(this.tableName, Bytes.toString(columnFamily));
doSafeIncTimeVarying(cfMetricPrefix, ICV_KEY, value);
doSafeIncTimeVarying(this.regionMetrixPrefix, ICV_KEY, value);
}
/**
* update metrics associated with a {@link Put}
*
* @param columnFamilies
* Set of column families involved.
* @param value
* the time.
*/
public void updatePutMetrics(Set<byte[]> columnFamilies, long value) {
doUpdateTimeVarying(columnFamilies, PUT_KEY, value);
}
/**
* update metrics associated with a {@link Delete}
*
* @param columnFamilies
* @param value
* the time.
*/
public void updateDeleteMetrics(Set<byte[]> columnFamilies, long value) {
doUpdateTimeVarying(columnFamilies, DELETE_KEY, value);
}
/**
* Method to send updates for cf and region metrics. This is the normal method
* used if the naming of stats and CF's are in line with put/delete/multiput.
*
* @param columnFamilies
* the set of column families involved.
* @param key
* the metric name.
* @param value
* the time.
*/
private void doUpdateTimeVarying(Set<byte[]> columnFamilies, String key, long value) {
String cfPrefix = null;
if (columnFamilies != null) {
cfPrefix = SchemaMetrics.generateSchemaMetricsPrefix(tableName, columnFamilies);
} else {
cfPrefix = SchemaMetrics.generateSchemaMetricsPrefix(tableName, SchemaMetrics.UNKNOWN);
}
doSafeIncTimeVarying(cfPrefix, key, value);
doSafeIncTimeVarying(this.regionMetrixPrefix, key, value);
}
private void doSafeIncTimeVarying(String prefix, String key, long value) {
if (conf.getBoolean(CONF_KEY, true)) {
if (prefix != null && !prefix.isEmpty() && key != null && !key.isEmpty()) {
RegionMetricsStorage.incrTimeVaryingMetric(prefix + key, value);
}
}
}
}

View File

@ -169,9 +169,10 @@ public class SchemaMetrics {
*/
public static final String UNKNOWN = "__unknown";
private static final String TABLE_PREFIX = "tbl.";
public static final String TABLE_PREFIX = "tbl.";
public static final String CF_PREFIX = "cf.";
public static final String BLOCK_TYPE_PREFIX = "bt.";
public static final String REGION_PREFIX = "region.";
public static final String CF_UNKNOWN_PREFIX = CF_PREFIX + UNKNOWN + ".";
public static final String CF_BAD_FAMILY_PREFIX = CF_PREFIX + "__badfamily.";
@ -622,6 +623,23 @@ public class SchemaMetrics {
return SchemaMetrics.generateSchemaMetricsPrefix(tableName, sb.toString());
}
/**
* Get the prefix for metrics generated about a single region.
* @param tableName the table name or {@link #TOTAL_KEY} for all tables
* @param regionName regionName
* @return the prefix for this table/region combination.
*/
static String generateRegionMetricsPrefix(String tableName, String regionName) {
tableName = getEffectiveTableName(tableName);
String schemaMetricPrefix =
tableName.equals(TOTAL_KEY) ? "" : TABLE_PREFIX + tableName + ".";
schemaMetricPrefix +=
regionName.equals(TOTAL_KEY) ? "" : REGION_PREFIX + regionName + ".";
return schemaMetricPrefix;
}
/**
* Sets the flag of whether to use table name in metric names. This flag

View File

@ -870,4 +870,13 @@
</description>
</property>
<property>
<name>hbase.metrics.exposeOperationTimes</name>
<value>true</value>
<description>Whether to report metrics about time taken performing an
operation on the region server. Get, Put, Delete, Increment, and Append can all
have their times exposed through Hadoop metrics per CF and per region.
</description>
</property>
</configuration>

View File

@ -23,12 +23,16 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
@ -39,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.
StoreMetricType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -84,6 +89,29 @@ public class TestRegionServerMetrics {
SchemaMetrics.validateMetricChanges(startingMetrics);
}
private void assertTimeVaryingMetricCount(int expectedCount, String table, String cf,
String regionName, String metricPrefix) {
Integer expectedCountInteger = new Integer(expectedCount);
if (cf != null) {
String cfKey =
SchemaMetrics.TABLE_PREFIX + table + "." +
SchemaMetrics.CF_PREFIX + cf + "." + metricPrefix;
Pair<Long, Integer> cfPair = RegionMetricsStorage.getTimeVaryingMetric(cfKey);
assertEquals(expectedCountInteger, cfPair.getSecond());
}
if (regionName != null) {
String rKey =
SchemaMetrics.TABLE_PREFIX + table + "." +
SchemaMetrics.REGION_PREFIX + regionName + "." + metricPrefix;
Pair<Long, Integer> regionPair = RegionMetricsStorage.getTimeVaryingMetric(rKey);
assertEquals(expectedCountInteger, regionPair.getSecond());
}
}
private void assertStoreMetricEquals(long expected,
SchemaMetrics schemaMetrics, StoreMetricType storeMetricType) {
final String storeMetricName =
@ -94,6 +122,79 @@ public class TestRegionServerMetrics {
RegionMetricsStorage.getNumericMetric(storeMetricName)
- (startValue != null ? startValue : 0));
}
@Test
public void testOperationMetrics() throws IOException {
String cf = "OPCF";
String otherCf = "otherCF";
String rk = "testRK";
String icvCol = "icvCol";
String appendCol = "appendCol";
String regionName = null;
HTable hTable =
TEST_UTIL.createTable(TABLE_NAME.getBytes(),
new byte[][] { cf.getBytes(), otherCf.getBytes() });
Set<HRegionInfo> regionInfos = hTable.getRegionLocations().keySet();
regionName = regionInfos.toArray(new HRegionInfo[regionInfos.size()])[0].getEncodedName();
//Do a multi put that has one cf. Since they are in different rk's
//The lock will still be obtained and everything will be applied in one multiput.
Put pOne = new Put(rk.getBytes());
pOne.add(cf.getBytes(), icvCol.getBytes(), Bytes.toBytes(0L));
Put pTwo = new Put("ignored1RK".getBytes());
pTwo.add(cf.getBytes(), "ignored".getBytes(), Bytes.toBytes(0L));
hTable.put(Arrays.asList(new Put[] {pOne, pTwo}));
// Do a multiput where the cf doesn't stay consistent.
Put pThree = new Put("ignored2RK".getBytes());
pThree.add(cf.getBytes(), "ignored".getBytes(), Bytes.toBytes("TEST1"));
Put pFour = new Put("ignored3RK".getBytes());
pFour.add(otherCf.getBytes(), "ignored".getBytes(), Bytes.toBytes(0L));
hTable.put(Arrays.asList(new Put[] { pThree, pFour }));
hTable.incrementColumnValue(rk.getBytes(), cf.getBytes(), icvCol.getBytes(), 1L);
Get g = new Get(rk.getBytes());
g.addColumn(cf.getBytes(), appendCol.getBytes());
hTable.get(g);
Append a = new Append(rk.getBytes());
a.add(cf.getBytes(), appendCol.getBytes(), Bytes.toBytes("-APPEND"));
hTable.append(a);
Delete dOne = new Delete(rk.getBytes());
dOne.deleteFamily(cf.getBytes());
hTable.delete(dOne);
Delete dTwo = new Delete(rk.getBytes());
hTable.delete(dTwo);
// There should be one multi put where the cf is consistent
assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "multiput_");
// There were two multiputs to the cf.
assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "multiput_");
// There was one multiput where the cf was not consistent.
assertTimeVaryingMetricCount(1, TABLE_NAME, "__unknown", null, "multiput_");
// One increment and one append
assertTimeVaryingMetricCount(1, TABLE_NAME, cf, regionName, "increment_");
assertTimeVaryingMetricCount(1, TABLE_NAME, cf, regionName, "append_");
// One delete where the cf is known
assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "delete_");
// two deletes in the region.
assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "delete_");
// Three gets. one for gets. One for append. One for increment.
assertTimeVaryingMetricCount(3, TABLE_NAME, cf, regionName, "get_");
}
@Test
public void testMultipleRegions() throws IOException, InterruptedException {