[jira] [HBASE-5292] Prevent counting getSize on compactions
Author: Zhiqiu Kong Summary: Added two separate metrics for both get() and next(). This is done by refactoring on internal next() API. To be more specific, only Get.get() and ResultScanner.next() passes the metric name ("getsize" and "nextsize" repectively) to HRegion::RegionScanner::next(List<KeyValue>, String) This will eventually hit StoreScanner()::next((List<KeyValue>, int, String) where the metrics are counted. And their call paths are: 1) Get HTable::get(final Get get) => HRegionServer::get(byte [] regionName, Get get) => HRegion::get(final Get get, final Integer lockid) => HRegion::get(final Get get) [pass METRIC_GETSIZE to the callee] => HRegion::RegionScanner::next(List<KeyValue> outResults, String metric) => HRegion::RegionScanner::next(List<KeyValue> outResults, int limit, String metric) => HRegion::RegionScanner::nextInternal(int limit, String metric) => KeyValueHeap::next(List<KeyValue> result, int limit, String metric) => StoreScanner::next(List<KeyValue> outResult, int limit, String metric) 2) Next HTable::ClientScanner::next() => ScannerCallable::call() => HRegionServer::next(long scannerId) => HRegionServer::next(final long scannerId, int nbRows) [pass METRIC_NEXTSIZE to the callee] => HRegion::RegionScanner::next(List<KeyValue> outResults, String metric) => HRegion::RegionScanner::next(List<KeyValue> outResults, int limit, String metric) => HRegion::RegionScanner::nextInternal(int limit, String metric) => KeyValueHeap::next(List<KeyValue> result, int limit, String metric) => StoreScanner::next(List<KeyValue> outResult, int limit, String metric) Test Plan: 1. Passed unit tests. 2. Created a testcase TestRegionServerMetrics::testGetNextSize to guarantee: * Get/Next contributes to getsize/nextsize metrics * Both getsize/nextsize are per Column Family * Flush/compaction won't affect these two metrics Reviewed By: mbautin Reviewers: Kannan, mbautin, Liyin, JIRA CC: Kannan, mbautin, Liyin, zhiqiu Differential Revision: https://reviews.facebook.net/D1617 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1299147 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e4c43d72bf
commit
be78e3c4ee
|
@ -33,6 +33,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
|
@ -200,7 +201,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Registered region protocol handlers
|
||||
private ClassToInstanceMap<CoprocessorProtocol>
|
||||
protocolHandlers = MutableClassToInstanceMap.create();
|
||||
|
||||
|
||||
private Map<String, Class<? extends CoprocessorProtocol>>
|
||||
protocolHandlerNames = Maps.newHashMap();
|
||||
|
||||
|
@ -333,6 +334,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// for simple numeric metrics (# of blocks read from block cache)
|
||||
public static final ConcurrentMap<String, AtomicLong> numericMetrics = new ConcurrentHashMap<String, AtomicLong>();
|
||||
|
||||
public static final String METRIC_GETSIZE = "getsize";
|
||||
public static final String METRIC_NEXTSIZE = "nextsize";
|
||||
|
||||
// for simple numeric metrics (current block cache size)
|
||||
// These ones are not reset to zero when queried, unlike the previous.
|
||||
public static final ConcurrentMap<String, AtomicLong> numericPersistentMetrics = new ConcurrentHashMap<String, AtomicLong>();
|
||||
|
@ -342,7 +346,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* number of operations.
|
||||
*/
|
||||
public static final ConcurrentMap<String, Pair<AtomicLong, AtomicInteger>>
|
||||
timeVaryingMetrics = new ConcurrentHashMap<String,
|
||||
timeVaryingMetrics = new ConcurrentHashMap<String,
|
||||
Pair<AtomicLong, AtomicInteger>>();
|
||||
|
||||
public static void incrNumericMetric(String key, long amount) {
|
||||
|
@ -958,7 +962,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
CompletionService<ImmutableList<StoreFile>> completionService =
|
||||
new ExecutorCompletionService<ImmutableList<StoreFile>>(
|
||||
storeCloserThreadPool);
|
||||
|
||||
|
||||
// close each store in parallel
|
||||
for (final Store store : stores.values()) {
|
||||
completionService
|
||||
|
@ -2903,7 +2907,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return currentEditSeqId;
|
||||
} finally {
|
||||
status.cleanup();
|
||||
if (reader != null) {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
@ -3345,6 +3349,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResults, int limit)
|
||||
throws IOException {
|
||||
return next(outResults, limit, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResults, int limit,
|
||||
String metric) throws IOException {
|
||||
if (this.filterClosed) {
|
||||
throw new UnknownScannerException("Scanner was closed (timed out?) " +
|
||||
"after we renewed it. Could be caused by a very slow scanner " +
|
||||
|
@ -3359,7 +3369,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
results.clear();
|
||||
|
||||
boolean returnResult = nextInternal(limit);
|
||||
boolean returnResult = nextInternal(limit, metric);
|
||||
|
||||
outResults.addAll(results);
|
||||
resetFilters();
|
||||
|
@ -3376,7 +3386,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
public synchronized boolean next(List<KeyValue> outResults)
|
||||
throws IOException {
|
||||
// apply the batching limit by default
|
||||
return next(outResults, batch);
|
||||
return next(outResults, batch, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResults, String metric)
|
||||
throws IOException {
|
||||
// apply the batching limit by default
|
||||
return next(outResults, batch, metric);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -3386,7 +3403,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return this.filter != null && this.filter.filterAllRemaining();
|
||||
}
|
||||
|
||||
private boolean nextInternal(int limit) throws IOException {
|
||||
private boolean nextInternal(int limit, String metric) throws IOException {
|
||||
while (true) {
|
||||
byte [] currentRow = peekRow();
|
||||
if (isStopRow(currentRow)) {
|
||||
|
@ -3403,7 +3420,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
} else {
|
||||
byte [] nextRow;
|
||||
do {
|
||||
this.storeHeap.next(results, limit - results.size());
|
||||
this.storeHeap.next(results, limit - results.size(), metric);
|
||||
if (limit > 0 && results.size() == limit) {
|
||||
if (this.filter != null && filter.hasFilterRow()) {
|
||||
throw new IncompatibleFilterException(
|
||||
|
@ -4160,7 +4177,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
RegionScanner scanner = null;
|
||||
try {
|
||||
scanner = getScanner(scan);
|
||||
scanner.next(results);
|
||||
scanner.next(results, HRegion.METRIC_GETSIZE);
|
||||
} finally {
|
||||
if (scanner != null)
|
||||
scanner.close();
|
||||
|
|
|
@ -2383,7 +2383,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
&& currentScanResultSize < maxScannerResultSize; i++) {
|
||||
requestCount.incrementAndGet();
|
||||
// Collect values to be returned here
|
||||
boolean moreRows = s.next(values);
|
||||
boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE);
|
||||
if (!values.isEmpty()) {
|
||||
for (KeyValue kv : values) {
|
||||
currentScanResultSize += kv.heapSize();
|
||||
|
|
|
@ -49,6 +49,15 @@ public interface InternalScanner extends Closeable {
|
|||
* @throws IOException e
|
||||
*/
|
||||
public boolean next(List<KeyValue> results) throws IOException;
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values.
|
||||
* @param results return output array
|
||||
* @param metric the metric name
|
||||
* @return true if more rows exist after this one, false if scanner is done
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean next(List<KeyValue> results, String metric) throws IOException;
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values with a limit on the number of values
|
||||
|
@ -59,6 +68,17 @@ public interface InternalScanner extends Closeable {
|
|||
* @throws IOException e
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit) throws IOException;
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values with a limit on the number of values
|
||||
* to return.
|
||||
* @param result return output array
|
||||
* @param limit limit on row count to get
|
||||
* @param metric the metric name
|
||||
* @return true if more rows exist after this one, false if scanner is done
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit, String metric) throws IOException;
|
||||
|
||||
/**
|
||||
* Closes the scanner and releases any resources it has allocated
|
||||
|
|
|
@ -122,11 +122,27 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
|
|||
* @return true if there are more keys, false if all scanners are done
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit) throws IOException {
|
||||
return next(result, limit, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the next row of keys from the top-most scanner.
|
||||
* <p>
|
||||
* This method takes care of updating the heap.
|
||||
* <p>
|
||||
* This can ONLY be called when you are using Scanners that implement
|
||||
* InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
|
||||
* @param result output result list
|
||||
* @param limit limit on row count to get
|
||||
* @param metric the metric name
|
||||
* @return true if there are more keys, false if all scanners are done
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
|
||||
if (this.current == null) {
|
||||
return false;
|
||||
}
|
||||
InternalScanner currentAsInternal = (InternalScanner)this.current;
|
||||
boolean mayContainMoreRows = currentAsInternal.next(result, limit);
|
||||
boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric);
|
||||
KeyValue pee = this.current.peek();
|
||||
/*
|
||||
* By definition, any InternalScanner must return false only when it has no
|
||||
|
@ -158,6 +174,11 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
|
|||
return next(result, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> result, String metric) throws IOException {
|
||||
return next(result, -1, metric);
|
||||
}
|
||||
|
||||
private static class KVScannerComparator implements Comparator<KeyValueScanner> {
|
||||
private KVComparator kvComparator;
|
||||
/**
|
||||
|
|
|
@ -50,8 +50,8 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
private KeyValueHeap heap;
|
||||
private boolean cacheBlocks;
|
||||
|
||||
private String metricNameGetSize;
|
||||
|
||||
private String metricNamePrefix;
|
||||
// Used to indicate that the scanner has closed (see HBASE-1107)
|
||||
// Doesnt need to be volatile because it's always accessed via synchronized methods
|
||||
private boolean closing = false;
|
||||
|
@ -198,7 +198,7 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
/**
|
||||
* Method used internally to initialize metric names throughout the
|
||||
* constructors.
|
||||
*
|
||||
*
|
||||
* To be called after the store variable has been initialized!
|
||||
*/
|
||||
private void initializeMetricNames() {
|
||||
|
@ -208,8 +208,8 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
tableName = store.getTableName();
|
||||
family = Bytes.toString(store.getFamily().getName());
|
||||
}
|
||||
metricNameGetSize = SchemaMetrics.generateSchemaMetricsPrefix(
|
||||
tableName, family) + "getsize";
|
||||
this.metricNamePrefix =
|
||||
SchemaMetrics.generateSchemaMetricsPrefix(tableName, family);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -308,6 +308,18 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
*/
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
|
||||
return next(outResult, limit, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next row of values from this Store.
|
||||
* @param outResult
|
||||
* @param limit
|
||||
* @return true if there are more rows, false if scanner is done
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResult, int limit,
|
||||
String metric) throws IOException {
|
||||
|
||||
if (checkReseek()) {
|
||||
return true;
|
||||
|
@ -355,7 +367,15 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
case INCLUDE_AND_SEEK_NEXT_COL:
|
||||
|
||||
Filter f = matcher.getFilter();
|
||||
results.add(f == null ? kv : f.transform(kv));
|
||||
if (f != null) {
|
||||
kv = f.transform(kv);
|
||||
}
|
||||
results.add(kv);
|
||||
|
||||
if (metric != null) {
|
||||
HRegion.incrNumericMetric(this.metricNamePrefix + metric,
|
||||
kv.getLength());
|
||||
}
|
||||
|
||||
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
|
||||
if (!matcher.moreRowsMayExistAfter(kv)) {
|
||||
|
@ -369,7 +389,6 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
this.heap.next();
|
||||
}
|
||||
|
||||
HRegion.incrNumericMetric(metricNameGetSize, kv.getLength());
|
||||
if (limit > 0 && (results.size() == limit)) {
|
||||
break LOOP;
|
||||
}
|
||||
|
@ -434,7 +453,13 @@ class StoreScanner extends NonLazyKeyValueScanner
|
|||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResult) throws IOException {
|
||||
return next(outResult, -1);
|
||||
return next(outResult, -1, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResult, String metric)
|
||||
throws IOException {
|
||||
return next(outResult, -1, metric);
|
||||
}
|
||||
|
||||
// Implementation of ChangedReadersObserver
|
||||
|
|
|
@ -63,11 +63,23 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
public boolean next(List<KeyValue> results) throws IOException {
|
||||
return delegate.next(results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, String metric)
|
||||
throws IOException {
|
||||
return delegate.next(results, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> result, int limit) throws IOException {
|
||||
return delegate.next(result, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> result, int limit, String metric)
|
||||
throws IOException {
|
||||
return delegate.next(result, limit, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
|
|
@ -299,13 +299,26 @@ public class TestRegionObserverInterface {
|
|||
public boolean next(List<KeyValue> results) throws IOException {
|
||||
return next(results, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, String metric)
|
||||
throws IOException {
|
||||
return next(results, -1, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, int limit) throws IOException {
|
||||
public boolean next(List<KeyValue> results, int limit)
|
||||
throws IOException{
|
||||
return next(results, limit, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, int limit, String metric)
|
||||
throws IOException {
|
||||
List<KeyValue> internalResults = new ArrayList<KeyValue>();
|
||||
boolean hasMore;
|
||||
do {
|
||||
hasMore = scanner.next(internalResults, limit);
|
||||
hasMore = scanner.next(internalResults, limit, metric);
|
||||
if (!internalResults.isEmpty()) {
|
||||
long row = Bytes.toLong(internalResults.get(0).getRow());
|
||||
if (row % 2 == 0) {
|
||||
|
|
|
@ -27,7 +27,13 @@ import java.util.Map;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
|
||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.
|
||||
StoreMetricType;
|
||||
|
@ -57,7 +63,7 @@ public class TestRegionServerMetrics {
|
|||
private static final SchemaMetrics ALL_METRICS =
|
||||
SchemaMetrics.ALL_SCHEMA_METRICS;
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL =
|
||||
private final HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
||||
private Map<String, Long> startingMetrics;
|
||||
|
@ -131,5 +137,80 @@ public class TestRegionServerMetrics {
|
|||
@org.junit.Rule
|
||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||
|
||||
private void assertSizeMetric(String table, String[] cfs, int[] metrics) {
|
||||
// we have getsize & nextsize for each column family
|
||||
assertEquals(cfs.length * 2, metrics.length);
|
||||
|
||||
for (int i =0; i < cfs.length; ++i) {
|
||||
String prefix = SchemaMetrics.generateSchemaMetricsPrefix(table, cfs[i]);
|
||||
String getMetric = prefix + HRegion.METRIC_GETSIZE;
|
||||
String nextMetric = prefix + HRegion.METRIC_NEXTSIZE;
|
||||
|
||||
// verify getsize and nextsize matches
|
||||
int getSize = HRegion.numericMetrics.containsKey(getMetric) ?
|
||||
HRegion.numericMetrics.get(getMetric).intValue() : 0;
|
||||
int nextSize = HRegion.numericMetrics.containsKey(nextMetric) ?
|
||||
HRegion.numericMetrics.get(nextMetric).intValue() : 0;
|
||||
|
||||
assertEquals(metrics[i], getSize);
|
||||
assertEquals(metrics[cfs.length + i], nextSize);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetNextSize() throws IOException, InterruptedException {
|
||||
String rowName = "row1";
|
||||
byte[] ROW = Bytes.toBytes(rowName);
|
||||
String tableName = "SizeMetricTest";
|
||||
byte[] TABLE = Bytes.toBytes(tableName);
|
||||
String cf1Name = "cf1";
|
||||
String cf2Name = "cf2";
|
||||
String[] cfs = new String[] {cf1Name, cf2Name};
|
||||
byte[] CF1 = Bytes.toBytes(cf1Name);
|
||||
byte[] CF2 = Bytes.toBytes(cf2Name);
|
||||
|
||||
long ts = 1234;
|
||||
HTable hTable = TEST_UTIL.createTable(TABLE, new byte[][]{CF1, CF2});
|
||||
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||
|
||||
Put p = new Put(ROW);
|
||||
p.add(CF1, CF1, ts, CF1);
|
||||
p.add(CF2, CF2, ts, CF2);
|
||||
hTable.put(p);
|
||||
|
||||
KeyValue kv1 = new KeyValue(ROW, CF1, CF1, ts, CF1);
|
||||
KeyValue kv2 = new KeyValue(ROW, CF2, CF2, ts, CF2);
|
||||
int kvLength = kv1.getLength();
|
||||
assertEquals(kvLength, kv2.getLength());
|
||||
|
||||
// only cf1.getsize is set on Get
|
||||
hTable.get(new Get(ROW).addFamily(CF1));
|
||||
assertSizeMetric(tableName, cfs, new int[] {kvLength, 0, 0, 0});
|
||||
|
||||
// only cf2.getsize is set on Get
|
||||
hTable.get(new Get(ROW).addFamily(CF2));
|
||||
assertSizeMetric(tableName, cfs, new int[] {kvLength, kvLength, 0, 0});
|
||||
|
||||
// only cf2.nextsize is set
|
||||
for (Result res : hTable.getScanner(CF2)) {
|
||||
}
|
||||
assertSizeMetric(tableName, cfs,
|
||||
new int[] {kvLength, kvLength, 0, kvLength});
|
||||
|
||||
// only cf2.nextsize is set
|
||||
for (Result res : hTable.getScanner(CF1)) {
|
||||
}
|
||||
assertSizeMetric(tableName, cfs,
|
||||
new int[] {kvLength, kvLength, kvLength, kvLength});
|
||||
|
||||
// getsize/nextsize should not be set on flush or compaction
|
||||
for (HRegion hr : TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE)) {
|
||||
hr.flushcache();
|
||||
hr.compactStores();
|
||||
}
|
||||
assertSizeMetric(tableName, cfs,
|
||||
new int[] {kvLength, kvLength, kvLength, kvLength});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue