HBASE-7255 KV size metric went missing from StoreScanner.
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1467485 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
089e876398
commit
1c81734163
|
@ -188,6 +188,7 @@ public interface MetricsRegionServerSource extends BaseSource {
|
|||
static final String INCREMENT_KEY = "increment";
|
||||
static final String MUTATE_KEY = "mutate";
|
||||
static final String APPEND_KEY = "append";
|
||||
static final String SCAN_NEXT_KEY = "scanNext";
|
||||
static final String SLOW_MUTATE_KEY = "slowPutCount";
|
||||
static final String SLOW_GET_KEY = "slowGetCount";
|
||||
static final String SLOW_DELETE_KEY = "slowDeleteCount";
|
||||
|
|
|
@ -25,6 +25,9 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
*/
|
||||
public interface MetricsRegionSource extends Comparable<MetricsRegionSource> {
|
||||
|
||||
public static final String OPS_SAMPLE_NAME = "ops";
|
||||
public static final String SIZE_VALUE_NAME = "size";
|
||||
|
||||
/**
|
||||
* Close the region's metrics as this region is closing.
|
||||
*/
|
||||
|
@ -41,10 +44,16 @@ public interface MetricsRegionSource extends Comparable<MetricsRegionSource> {
|
|||
void updateDelete();
|
||||
|
||||
/**
|
||||
* Update related counts of gets.
|
||||
* Update count and sizes of gets.
|
||||
* @param getSize size in bytes of the resulting key values for a get
|
||||
*/
|
||||
void updateGet();
|
||||
void updateGet(long getSize);
|
||||
|
||||
/**
|
||||
* Update the count and sizes of resultScanner.next()
|
||||
* @param scanSize Size in bytes of the resulting key values for a next()
|
||||
*/
|
||||
void updateScan(long scanSize);
|
||||
/**
|
||||
* Update related counts of increments.
|
||||
*/
|
||||
|
@ -59,4 +68,6 @@ public interface MetricsRegionSource extends Comparable<MetricsRegionSource> {
|
|||
* Get the aggregate source to which this reports.
|
||||
*/
|
||||
MetricsRegionAggregateSource getAggregateSource();
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|||
import org.apache.hadoop.metrics2.impl.JmxCacheBuster;
|
||||
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MetricMutableStat;
|
||||
|
||||
public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
||||
|
||||
|
@ -39,12 +40,15 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
private String regionGetKey;
|
||||
private String regionIncrementKey;
|
||||
private String regionAppendKey;
|
||||
private String regionScanNextKey;
|
||||
private MetricMutableCounterLong regionPut;
|
||||
private MetricMutableCounterLong regionDelete;
|
||||
private MetricMutableCounterLong regionGet;
|
||||
private MetricMutableCounterLong regionIncrement;
|
||||
private MetricMutableCounterLong regionAppend;
|
||||
|
||||
private MetricMutableStat regionGet;
|
||||
private MetricMutableStat regionScanNext;
|
||||
|
||||
public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper,
|
||||
MetricsRegionAggregateSourceImpl aggregate) {
|
||||
this.regionWrapper = regionWrapper;
|
||||
|
@ -70,14 +74,17 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
regionDeleteKey = regionNamePrefix + MetricsRegionServerSource.DELETE_KEY + suffix;
|
||||
regionDelete = registry.getLongCounter(regionDeleteKey, 0l);
|
||||
|
||||
regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY + suffix;
|
||||
regionGet = registry.getLongCounter(regionGetKey, 0l);
|
||||
|
||||
regionIncrementKey = regionNamePrefix + MetricsRegionServerSource.INCREMENT_KEY + suffix;
|
||||
regionIncrement = registry.getLongCounter(regionIncrementKey, 0l);
|
||||
|
||||
regionAppendKey = regionNamePrefix + MetricsRegionServerSource.APPEND_KEY + suffix;
|
||||
regionAppend = registry.getLongCounter(regionAppendKey, 0l);
|
||||
|
||||
regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY;
|
||||
regionGet = registry.newStat(regionGetKey, "", OPS_SAMPLE_NAME, SIZE_VALUE_NAME);
|
||||
|
||||
regionScanNextKey = regionNamePrefix + MetricsRegionServerSource.SCAN_NEXT_KEY;
|
||||
regionScanNext = registry.newStat(regionScanNextKey, "", OPS_SAMPLE_NAME, SIZE_VALUE_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,11 +95,13 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
LOG.trace("Removing region Metrics: " + regionWrapper.getRegionName());
|
||||
registry.removeMetric(regionPutKey);
|
||||
registry.removeMetric(regionDeleteKey);
|
||||
registry.removeMetric(regionGetKey);
|
||||
registry.removeMetric(regionIncrementKey);
|
||||
|
||||
registry.removeMetric(regionAppendKey);
|
||||
|
||||
registry.removeMetric(regionGetKey);
|
||||
registry.removeMetric(regionScanNextKey);
|
||||
|
||||
JmxCacheBuster.clearJmxCache();
|
||||
}
|
||||
|
||||
|
@ -107,8 +116,13 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateGet() {
|
||||
regionGet.incr();
|
||||
public void updateGet(long getSize) {
|
||||
regionGet.add(getSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateScan(long scanSize) {
|
||||
regionScanNext.add(scanSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,16 +20,18 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.MetricsRegionSourceImpl;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.impl.JmxCacheBuster;
|
||||
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.Interns;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableStat;
|
||||
|
||||
public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
||||
|
||||
private final MetricsRegionWrapper regionWrapper;
|
||||
|
||||
|
||||
private boolean closed = false;
|
||||
private MetricsRegionAggregateSourceImpl agg;
|
||||
private DynamicMetricsRegistry registry;
|
||||
|
@ -41,12 +43,15 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
private String regionGetKey;
|
||||
private String regionIncrementKey;
|
||||
private String regionAppendKey;
|
||||
private String regionScanNextKey;
|
||||
private MutableCounterLong regionPut;
|
||||
private MutableCounterLong regionDelete;
|
||||
private MutableCounterLong regionGet;
|
||||
|
||||
private MutableCounterLong regionIncrement;
|
||||
private MutableCounterLong regionAppend;
|
||||
|
||||
private MutableStat regionGet;
|
||||
private MutableStat regionScanNext;
|
||||
|
||||
public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper,
|
||||
MetricsRegionAggregateSourceImpl aggregate) {
|
||||
|
@ -72,14 +77,17 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
regionDeleteKey = regionNamePrefix + MetricsRegionServerSource.DELETE_KEY + suffix;
|
||||
regionDelete = registry.getLongCounter(regionDeleteKey, 0l);
|
||||
|
||||
regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY + suffix;
|
||||
regionGet = registry.getLongCounter(regionGetKey, 0l);
|
||||
|
||||
regionIncrementKey = regionNamePrefix + MetricsRegionServerSource.INCREMENT_KEY + suffix;
|
||||
regionIncrement = registry.getLongCounter(regionIncrementKey, 0l);
|
||||
|
||||
regionAppendKey = regionNamePrefix + MetricsRegionServerSource.APPEND_KEY + suffix;
|
||||
regionAppend = registry.getLongCounter(regionAppendKey, 0l);
|
||||
|
||||
regionGetKey = regionNamePrefix + MetricsRegionServerSource.GET_KEY;
|
||||
regionGet = registry.newStat(regionGetKey, "", OPS_SAMPLE_NAME, SIZE_VALUE_NAME);
|
||||
|
||||
regionScanNextKey = regionNamePrefix + MetricsRegionServerSource.SCAN_NEXT_KEY;
|
||||
regionScanNext = registry.newStat(regionScanNextKey, "", OPS_SAMPLE_NAME, SIZE_VALUE_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,11 +98,14 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
LOG.trace("Removing region Metrics: " + regionWrapper.getRegionName());
|
||||
registry.removeMetric(regionPutKey);
|
||||
registry.removeMetric(regionDeleteKey);
|
||||
registry.removeMetric(regionGetKey);
|
||||
|
||||
registry.removeMetric(regionIncrementKey);
|
||||
|
||||
registry.removeMetric(regionAppendKey);
|
||||
|
||||
registry.removeMetric(regionGetKey);
|
||||
registry.removeMetric(regionScanNextKey);
|
||||
|
||||
JmxCacheBuster.clearJmxCache();
|
||||
}
|
||||
|
||||
|
@ -109,8 +120,13 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateGet() {
|
||||
regionGet.incr();
|
||||
public void updateGet(long getSize) {
|
||||
regionGet.add(getSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateScan(long scanSize) {
|
||||
regionScanNext.add(scanSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -3409,14 +3409,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> outResults, int limit)
|
||||
public boolean next(List<KeyValue> outResults)
|
||||
throws IOException {
|
||||
return next(outResults, limit, null);
|
||||
// apply the batching limit by default
|
||||
return next(outResults, batch);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResults, int limit,
|
||||
String metric) throws IOException {
|
||||
public synchronized boolean next(List<KeyValue> outResults, int limit) throws IOException {
|
||||
if (this.filterClosed) {
|
||||
throw new UnknownScannerException("Scanner was closed (timed out?) " +
|
||||
"after we renewed it. Could be caused by a very slow scanner " +
|
||||
|
@ -3429,7 +3429,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// This could be a new thread from the last time we called next().
|
||||
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
|
||||
|
||||
return nextRaw(outResults, limit, metric);
|
||||
return nextRaw(outResults, limit);
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
|
@ -3438,49 +3438,44 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
@Override
|
||||
public boolean nextRaw(List<KeyValue> outResults)
|
||||
throws IOException {
|
||||
return nextRaw(outResults, batch, null);
|
||||
return nextRaw(outResults, batch);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<KeyValue> outResults, int limit,
|
||||
String metric) throws IOException {
|
||||
public boolean nextRaw(List<KeyValue> outResults, int limit) throws IOException {
|
||||
boolean returnResult;
|
||||
if (outResults.isEmpty()) {
|
||||
// Usually outResults is empty. This is true when next is called
|
||||
// to handle scan or get operation.
|
||||
returnResult = nextInternal(outResults, limit, metric);
|
||||
returnResult = nextInternal(outResults, limit);
|
||||
} else {
|
||||
List<KeyValue> tmpList = new ArrayList<KeyValue>();
|
||||
returnResult = nextInternal(tmpList, limit, metric);
|
||||
returnResult = nextInternal(tmpList, limit);
|
||||
outResults.addAll(tmpList);
|
||||
}
|
||||
resetFilters();
|
||||
if (isFilterDone()) {
|
||||
return false;
|
||||
}
|
||||
if (region != null && region.metricsRegion != null) {
|
||||
long totalSize = 0;
|
||||
if (outResults != null) {
|
||||
for(KeyValue kv:outResults) {
|
||||
totalSize += kv.getLength();
|
||||
}
|
||||
}
|
||||
region.metricsRegion.updateScanNext(totalSize);
|
||||
}
|
||||
return returnResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> outResults)
|
||||
throws IOException {
|
||||
// apply the batching limit by default
|
||||
return next(outResults, batch, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> outResults, String metric)
|
||||
throws IOException {
|
||||
// apply the batching limit by default
|
||||
return next(outResults, batch, metric);
|
||||
}
|
||||
|
||||
private void populateFromJoinedHeap(List<KeyValue> results, int limit, String metric)
|
||||
private void populateFromJoinedHeap(List<KeyValue> results, int limit)
|
||||
throws IOException {
|
||||
assert joinedContinuationRow != null;
|
||||
KeyValue kv = populateResult(results, this.joinedHeap, limit,
|
||||
joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
|
||||
joinedContinuationRow.getRowLength(), metric);
|
||||
joinedContinuationRow.getRowLength());
|
||||
if (kv != KV_LIMIT) {
|
||||
// We are done with this row, reset the continuation.
|
||||
joinedContinuationRow = null;
|
||||
|
@ -3498,14 +3493,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @param currentRow Byte array with key we are fetching.
|
||||
* @param offset offset for currentRow
|
||||
* @param length length for currentRow
|
||||
* @param metric Metric key to be passed into KeyValueHeap::next().
|
||||
* @return KV_LIMIT if limit reached, next KeyValue otherwise.
|
||||
*/
|
||||
private KeyValue populateResult(List<KeyValue> results, KeyValueHeap heap, int limit,
|
||||
byte[] currentRow, int offset, short length, String metric) throws IOException {
|
||||
byte[] currentRow, int offset, short length) throws IOException {
|
||||
KeyValue nextKv;
|
||||
do {
|
||||
heap.next(results, limit - results.size(), metric);
|
||||
heap.next(results, limit - results.size());
|
||||
if (limit > 0 && results.size() == limit) {
|
||||
return KV_LIMIT;
|
||||
}
|
||||
|
@ -3522,7 +3516,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return this.filter != null && this.filter.filterAllRemaining();
|
||||
}
|
||||
|
||||
private boolean nextInternal(List<KeyValue> results, int limit, String metric)
|
||||
private boolean nextInternal(List<KeyValue> results, int limit)
|
||||
throws IOException {
|
||||
if (!results.isEmpty()) {
|
||||
throw new IllegalArgumentException("First parameter should be an empty list");
|
||||
|
@ -3575,7 +3569,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
|
||||
length, metric);
|
||||
length);
|
||||
// Ok, we are good, let's try to get some results from the main heap.
|
||||
if (nextKv == KV_LIMIT) {
|
||||
if (this.filter != null && filter.hasFilterRow()) {
|
||||
|
@ -3620,12 +3614,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
&& joinedHeap.peek().matchingRow(currentRow, offset, length));
|
||||
if (mayHaveData) {
|
||||
joinedContinuationRow = current;
|
||||
populateFromJoinedHeap(results, limit, metric);
|
||||
populateFromJoinedHeap(results, limit);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Populating from the joined heap was stopped by limits, populate some more.
|
||||
populateFromJoinedHeap(results, limit, metric);
|
||||
populateFromJoinedHeap(results, limit);
|
||||
}
|
||||
|
||||
// We may have just called populateFromJoinedMap and hit the limits. If that is
|
||||
|
@ -4335,7 +4329,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// do after lock
|
||||
if (this.metricsRegion != null) {
|
||||
this.metricsRegion.updateGet();
|
||||
long totalSize = 0l;
|
||||
if (results != null) {
|
||||
for (KeyValue kv:results) {
|
||||
totalSize += kv.getLength();
|
||||
}
|
||||
}
|
||||
this.metricsRegion.updateGet(totalSize);
|
||||
}
|
||||
|
||||
return results;
|
||||
|
|
|
@ -48,15 +48,6 @@ public interface InternalScanner extends Closeable {
|
|||
* @throws IOException e
|
||||
*/
|
||||
public boolean next(List<KeyValue> results) throws IOException;
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values.
|
||||
* @param results return output array
|
||||
* @param metric the metric name
|
||||
* @return true if more rows exist after this one, false if scanner is done
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean next(List<KeyValue> results, String metric) throws IOException;
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values with a limit on the number of values
|
||||
|
@ -67,17 +58,6 @@ public interface InternalScanner extends Closeable {
|
|||
* @throws IOException e
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit) throws IOException;
|
||||
|
||||
/**
|
||||
* Grab the next row's worth of values with a limit on the number of values
|
||||
* to return.
|
||||
* @param result return output array
|
||||
* @param limit limit on row count to get
|
||||
* @param metric the metric name
|
||||
* @return true if more rows exist after this one, false if scanner is done
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit, String metric) throws IOException;
|
||||
|
||||
/**
|
||||
* Closes the scanner and releases any resources it has allocated
|
||||
|
|
|
@ -121,27 +121,11 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
|
|||
* @return true if there are more keys, false if all scanners are done
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit) throws IOException {
|
||||
return next(result, limit, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the next row of keys from the top-most scanner.
|
||||
* <p>
|
||||
* This method takes care of updating the heap.
|
||||
* <p>
|
||||
* This can ONLY be called when you are using Scanners that implement
|
||||
* InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
|
||||
* @param result output result list
|
||||
* @param limit limit on row count to get
|
||||
* @param metric the metric name
|
||||
* @return true if there are more keys, false if all scanners are done
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
|
||||
if (this.current == null) {
|
||||
return false;
|
||||
}
|
||||
InternalScanner currentAsInternal = (InternalScanner)this.current;
|
||||
boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric);
|
||||
boolean mayContainMoreRows = currentAsInternal.next(result, limit);
|
||||
KeyValue pee = this.current.peek();
|
||||
/*
|
||||
* By definition, any InternalScanner must return false only when it has no
|
||||
|
@ -173,11 +157,6 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
|
|||
return next(result, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> result, String metric) throws IOException {
|
||||
return next(result, -1, metric);
|
||||
}
|
||||
|
||||
private static class KVScannerComparator implements Comparator<KeyValueScanner> {
|
||||
private KVComparator kvComparator;
|
||||
/**
|
||||
|
|
|
@ -48,8 +48,12 @@ public class MetricsRegion {
|
|||
source.updateDelete();
|
||||
}
|
||||
|
||||
public void updateGet() {
|
||||
source.updateGet();
|
||||
public void updateGet(long getSize) {
|
||||
source.updateGet(getSize);
|
||||
}
|
||||
|
||||
public void updateScanNext(long scanSize) {
|
||||
source.updateScan(scanSize);
|
||||
}
|
||||
|
||||
public void updateAppend() {
|
||||
|
|
|
@ -69,7 +69,7 @@ public interface RegionScanner extends InternalScanner {
|
|||
* to return.
|
||||
* This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
|
||||
* Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
|
||||
* See {@link #nextRaw(List, int, String)}
|
||||
* See {@link #nextRaw(List, int)}
|
||||
* @param result return output array
|
||||
* @return true if more rows exist after this one, false if scanner is done
|
||||
* @throws IOException e
|
||||
|
@ -99,9 +99,8 @@ public interface RegionScanner extends InternalScanner {
|
|||
* </pre></code>
|
||||
* @param result return output array
|
||||
* @param limit limit on row count to get
|
||||
* @param metric the metric name
|
||||
* @return true if more rows exist after this one, false if scanner is done
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
|
||||
public boolean nextRaw(List<KeyValue> result, int limit) throws IOException;
|
||||
}
|
||||
|
|
|
@ -353,19 +353,6 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
*/
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
|
||||
return next(outResult, limit, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next row of values from this Store.
|
||||
* @param outResult
|
||||
* @param limit
|
||||
* @return true if there are more rows, false if scanner is done
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResult, int limit,
|
||||
String metric) throws IOException {
|
||||
|
||||
if (checkReseek()) {
|
||||
return true;
|
||||
}
|
||||
|
@ -401,104 +388,94 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
KeyValue.KVComparator comparator =
|
||||
store != null ? store.getComparator() : null;
|
||||
|
||||
long cumulativeMetric = 0;
|
||||
int count = 0;
|
||||
try {
|
||||
LOOP: while((kv = this.heap.peek()) != null) {
|
||||
// Check that the heap gives us KVs in an increasing order.
|
||||
assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
|
||||
"Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
|
||||
prevKV = kv;
|
||||
LOOP: while((kv = this.heap.peek()) != null) {
|
||||
// Check that the heap gives us KVs in an increasing order.
|
||||
assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
|
||||
"Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
|
||||
prevKV = kv;
|
||||
|
||||
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
|
||||
switch(qcode) {
|
||||
case INCLUDE:
|
||||
case INCLUDE_AND_SEEK_NEXT_ROW:
|
||||
case INCLUDE_AND_SEEK_NEXT_COL:
|
||||
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
|
||||
switch(qcode) {
|
||||
case INCLUDE:
|
||||
case INCLUDE_AND_SEEK_NEXT_ROW:
|
||||
case INCLUDE_AND_SEEK_NEXT_COL:
|
||||
|
||||
Filter f = matcher.getFilter();
|
||||
if (f != null) {
|
||||
kv = f.transform(kv);
|
||||
}
|
||||
Filter f = matcher.getFilter();
|
||||
if (f != null) {
|
||||
kv = f.transform(kv);
|
||||
}
|
||||
|
||||
this.countPerRow++;
|
||||
if (storeLimit > -1 &&
|
||||
this.countPerRow > (storeLimit + storeOffset)) {
|
||||
// do what SEEK_NEXT_ROW does.
|
||||
if (!matcher.moreRowsMayExistAfter(kv)) {
|
||||
return false;
|
||||
}
|
||||
reseek(matcher.getKeyForNextRow(kv));
|
||||
break LOOP;
|
||||
}
|
||||
|
||||
// add to results only if we have skipped #storeOffset kvs
|
||||
// also update metric accordingly
|
||||
if (this.countPerRow > storeOffset) {
|
||||
if (metric != null) {
|
||||
cumulativeMetric += kv.getLength();
|
||||
}
|
||||
outResult.add(kv);
|
||||
count++;
|
||||
}
|
||||
|
||||
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
|
||||
if (!matcher.moreRowsMayExistAfter(kv)) {
|
||||
return false;
|
||||
}
|
||||
reseek(matcher.getKeyForNextRow(kv));
|
||||
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
|
||||
reseek(matcher.getKeyForNextColumn(kv));
|
||||
} else {
|
||||
this.heap.next();
|
||||
}
|
||||
|
||||
if (limit > 0 && (count == limit)) {
|
||||
break LOOP;
|
||||
}
|
||||
continue;
|
||||
|
||||
case DONE:
|
||||
return true;
|
||||
|
||||
case DONE_SCAN:
|
||||
close();
|
||||
return false;
|
||||
|
||||
case SEEK_NEXT_ROW:
|
||||
// This is just a relatively simple end of scan fix, to short-cut end
|
||||
// us if there is an endKey in the scan.
|
||||
this.countPerRow++;
|
||||
if (storeLimit > -1 &&
|
||||
this.countPerRow > (storeLimit + storeOffset)) {
|
||||
// do what SEEK_NEXT_ROW does.
|
||||
if (!matcher.moreRowsMayExistAfter(kv)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
reseek(matcher.getKeyForNextRow(kv));
|
||||
break;
|
||||
break LOOP;
|
||||
}
|
||||
|
||||
case SEEK_NEXT_COL:
|
||||
reseek(matcher.getKeyForNextColumn(kv));
|
||||
break;
|
||||
// add to results only if we have skipped #storeOffset kvs
|
||||
// also update metric accordingly
|
||||
if (this.countPerRow > storeOffset) {
|
||||
outResult.add(kv);
|
||||
count++;
|
||||
}
|
||||
|
||||
case SKIP:
|
||||
this.heap.next();
|
||||
break;
|
||||
|
||||
case SEEK_NEXT_USING_HINT:
|
||||
KeyValue nextKV = matcher.getNextKeyHint(kv);
|
||||
if (nextKV != null) {
|
||||
reseek(nextKV);
|
||||
} else {
|
||||
heap.next();
|
||||
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
|
||||
if (!matcher.moreRowsMayExistAfter(kv)) {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
reseek(matcher.getKeyForNextRow(kv));
|
||||
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
|
||||
reseek(matcher.getKeyForNextColumn(kv));
|
||||
} else {
|
||||
this.heap.next();
|
||||
}
|
||||
|
||||
default:
|
||||
throw new RuntimeException("UNEXPECTED");
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (cumulativeMetric > 0 && metric != null) {
|
||||
if (limit > 0 && (count == limit)) {
|
||||
break LOOP;
|
||||
}
|
||||
continue;
|
||||
|
||||
case DONE:
|
||||
return true;
|
||||
|
||||
case DONE_SCAN:
|
||||
close();
|
||||
return false;
|
||||
|
||||
case SEEK_NEXT_ROW:
|
||||
// This is just a relatively simple end of scan fix, to short-cut end
|
||||
// us if there is an endKey in the scan.
|
||||
if (!matcher.moreRowsMayExistAfter(kv)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
reseek(matcher.getKeyForNextRow(kv));
|
||||
break;
|
||||
|
||||
case SEEK_NEXT_COL:
|
||||
reseek(matcher.getKeyForNextColumn(kv));
|
||||
break;
|
||||
|
||||
case SKIP:
|
||||
this.heap.next();
|
||||
break;
|
||||
|
||||
case SEEK_NEXT_USING_HINT:
|
||||
KeyValue nextKV = matcher.getNextKeyHint(kv);
|
||||
if (nextKV != null) {
|
||||
reseek(nextKV);
|
||||
} else {
|
||||
heap.next();
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new RuntimeException("UNEXPECTED");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -513,13 +490,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
|||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResult) throws IOException {
|
||||
return next(outResult, -1, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean next(List<KeyValue> outResult, String metric)
|
||||
throws IOException {
|
||||
return next(outResult, -1, metric);
|
||||
return next(outResult, -1);
|
||||
}
|
||||
|
||||
// Implementation of ChangedReadersObserver
|
||||
|
|
|
@ -77,35 +77,23 @@ public class TestCoprocessorInterface extends HBaseTestCase {
|
|||
return delegate.next(results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, String metric)
|
||||
throws IOException {
|
||||
return delegate.next(results, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> result, int limit) throws IOException {
|
||||
return delegate.next(result, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> result, int limit, String metric)
|
||||
throws IOException {
|
||||
return delegate.next(result, limit, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<KeyValue> result, int limit, String metric)
|
||||
throws IOException {
|
||||
return delegate.nextRaw(result, limit, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<KeyValue> result)
|
||||
throws IOException {
|
||||
return delegate.nextRaw(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nextRaw(List<KeyValue> result, int limit)
|
||||
throws IOException {
|
||||
return delegate.nextRaw(result, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegate.close();
|
||||
|
|
|
@ -324,25 +324,13 @@ public class TestRegionObserverInterface {
|
|||
return next(results, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, String metric)
|
||||
throws IOException {
|
||||
return next(results, -1, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, int limit)
|
||||
throws IOException{
|
||||
return next(results, limit, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(List<KeyValue> results, int limit, String metric)
|
||||
throws IOException {
|
||||
List<KeyValue> internalResults = new ArrayList<KeyValue>();
|
||||
boolean hasMore;
|
||||
do {
|
||||
hasMore = scanner.next(internalResults, limit, metric);
|
||||
hasMore = scanner.next(internalResults, limit);
|
||||
if (!internalResults.isEmpty()) {
|
||||
long row = Bytes.toLong(internalResults.get(0).getRow());
|
||||
if (row % 2 == 0) {
|
||||
|
|
|
@ -31,6 +31,9 @@ import org.junit.AfterClass;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
||||
@Category(MediumTests.class)
|
||||
|
@ -139,7 +142,7 @@ public class TestRegionServerMetrics {
|
|||
.getSource()
|
||||
.getAggregateSource();
|
||||
String prefix = "table."+tableNameString + ".region." + i.getEncodedName();
|
||||
metricsHelper.assertCounter(prefix + ".getCount", 10, agg);
|
||||
metricsHelper.assertCounter(prefix + ".getNumOps", 10, agg);
|
||||
metricsHelper.assertCounter(prefix + ".mutateCount", 30, agg);
|
||||
}
|
||||
|
||||
|
@ -309,4 +312,43 @@ public class TestRegionServerMetrics {
|
|||
|
||||
t.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanNext() throws IOException {
|
||||
String tableNameString = "testScanNext";
|
||||
byte[] tableName = Bytes.toBytes(tableNameString);
|
||||
byte[] cf = Bytes.toBytes("d");
|
||||
byte[] qualifier = Bytes.toBytes("qual");
|
||||
byte[] val = Bytes.toBytes("One");
|
||||
|
||||
|
||||
TEST_UTIL.createTable(tableName, cf);
|
||||
HTable t = new HTable(conf, tableName);
|
||||
t.setAutoFlush(false);
|
||||
for (int insertCount =0; insertCount < 100; insertCount++) {
|
||||
Put p = new Put(Bytes.toBytes("" + insertCount + "row"));
|
||||
p.add(cf, qualifier, val);
|
||||
t.put(p);
|
||||
}
|
||||
t.flushCommits();
|
||||
|
||||
Scan s = new Scan();
|
||||
s.setBatch(1);
|
||||
s.setCaching(1);
|
||||
ResultScanner resultScanners = t.getScanner(s);
|
||||
|
||||
for (int nextCount = 0; nextCount < 30; nextCount++) {
|
||||
Result result = resultScanners.next();
|
||||
assertNotNull(result);
|
||||
assertEquals(1, result.size());
|
||||
}
|
||||
for ( HRegionInfo i:t.getRegionLocations().keySet()) {
|
||||
MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName())
|
||||
.getMetrics()
|
||||
.getSource()
|
||||
.getAggregateSource();
|
||||
String prefix = "table."+tableNameString + ".region." + i.getEncodedName();
|
||||
metricsHelper.assertCounter(prefix + ".scanNextNumOps", 30, agg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue