HBASE-27532 Add block bytes scanned metrics (#5067)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Bryan Beaudreault 2023-03-27 16:03:15 -04:00 committed by GitHub
parent a6dad700db
commit c825c960d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 354 additions and 188 deletions

View File

@ -28,6 +28,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
* Provides server side metrics related to scan operations.
*/
@InterfaceAudience.Public
@SuppressWarnings("checkstyle:VisibilityModifier") // See HBASE-27757
public class ServerSideScanMetrics {
/**
* Hash to hold the String -&gt; Atomic Long mappings for each metric
@ -47,6 +48,8 @@ public class ServerSideScanMetrics {
public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
public static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
public static final String BLOCK_BYTES_SCANNED_KEY_METRIC_NAME = "BLOCK_BYTES_SCANNED";
/**
* number of rows filtered during scan RPC
*/
@ -59,6 +62,9 @@ public class ServerSideScanMetrics {
*/
public final AtomicLong countOfRowsScanned = createCounter(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
public final AtomicLong countOfBlockBytesScanned =
createCounter(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME);
public void setCounter(String counterName, long value) {
AtomicLong c = this.counters.get(counterName);
if (c != null) {

View File

@ -85,27 +85,32 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
/**
* Update checkAndMutate histogram
* @param t time it took
* @param time time it took
* @param blockBytesScanned how many block bytes were scanned for the check portion of the request
*/
void updateCheckAndMutate(long t);
void updateCheckAndMutate(long time, long blockBytesScanned);
/**
* Update the Get time histogram .
* @param t time it took
* @param time time it took
* @param blockBytesScanned how many block bytes were scanned for the request
*/
void updateGet(long t);
void updateGet(long time, long blockBytesScanned);
/**
* Update the Increment time histogram.
* @param t time it took
* @param time time it took
* @param blockBytesScanned how many block bytes were scanned fetching the current value to
* increment
*/
void updateIncrement(long t);
void updateIncrement(long time, long blockBytesScanned);
/**
* Update the Append time histogram.
* @param t time it took
* @param time time it took
* @param blockBytesScanned how many block bytes were scanned fetching the current value to append
*/
void updateAppend(long t);
void updateAppend(long time, long blockBytesScanned);
/**
* Update the Replay time histogram.
@ -114,15 +119,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
void updateReplay(long t);
/**
* Update the scan size.
* @param scanSize size of the scan
* Update the scan metrics.
* @param time response time of scan
* @param responseCellSize size of the scan resposne
* @param blockBytesScanned size of block bytes scanned to retrieve the response
*/
void updateScanSize(long scanSize);
/**
* Update the scan time.
*/
void updateScanTime(long t);
void updateScan(long time, long responseCellSize, long blockBytesScanned);
/**
* Increment the number of slow Puts that have happened.
@ -445,6 +447,13 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String SCAN_SIZE_KEY = "scanSize";
String SCAN_TIME_KEY = "scanTime";
String BLOCK_BYTES_SCANNED_KEY = "blockBytesScannedCount";
String BLOCK_BYTES_SCANNED_DESC = "Count of block bytes scanned by read requests";
String GET_BLOCK_BYTES_SCANNED_KEY = "getBlockBytesScanned";
String SCAN_BLOCK_BYTES_SCANNED_KEY = "scanBlockBytesScanned";
String CHECK_AND_MUTATE_BLOCK_BYTES_SCANNED_KEY = "checkAndMutateBlockBytesScanned";
String INCREMENT_BLOCK_BYTES_SCANNED_KEY = "incrementBlockBytesScanned";
String APPEND_BLOCK_BYTES_SCANNED_KEY = "appendBlockBytesScanned";
String SLOW_PUT_KEY = "slowPutCount";
String SLOW_GET_KEY = "slowGetCount";
String SLOW_DELETE_KEY = "slowDeleteCount";

View File

@ -48,6 +48,13 @@ public class MetricsRegionServerSourceImpl extends BaseSourceImpl
private final MetricHistogram scanSizeHisto;
private final MetricHistogram scanTimeHisto;
private final MutableFastCounter blockBytesScannedCount;
private final MetricHistogram checkAndMutateBlockBytesScanned;
private final MetricHistogram getBlockBytesScanned;
private final MetricHistogram incrementBlockBytesScanned;
private final MetricHistogram appendBlockBytesScanned;
private final MetricHistogram scanBlockBytesScanned;
private final MutableFastCounter slowPut;
private final MutableFastCounter slowDelete;
private final MutableFastCounter slowGet;
@ -125,6 +132,16 @@ public class MetricsRegionServerSourceImpl extends BaseSourceImpl
scanSizeHisto = getMetricsRegistry().newSizeHistogram(SCAN_SIZE_KEY);
scanTimeHisto = getMetricsRegistry().newTimeHistogram(SCAN_TIME_KEY);
blockBytesScannedCount =
getMetricsRegistry().newCounter(BLOCK_BYTES_SCANNED_KEY, BLOCK_BYTES_SCANNED_DESC, 0L);
checkAndMutateBlockBytesScanned =
getMetricsRegistry().newSizeHistogram(CHECK_AND_MUTATE_BLOCK_BYTES_SCANNED_KEY);
getBlockBytesScanned = getMetricsRegistry().newSizeHistogram(GET_BLOCK_BYTES_SCANNED_KEY);
incrementBlockBytesScanned =
getMetricsRegistry().newSizeHistogram(INCREMENT_BLOCK_BYTES_SCANNED_KEY);
appendBlockBytesScanned = getMetricsRegistry().newSizeHistogram(APPEND_BLOCK_BYTES_SCANNED_KEY);
scanBlockBytesScanned = getMetricsRegistry().newSizeHistogram(SCAN_BLOCK_BYTES_SCANNED_KEY);
flushTimeHisto = getMetricsRegistry().newTimeHistogram(FLUSH_TIME, FLUSH_TIME_DESC);
flushMemstoreSizeHisto =
getMetricsRegistry().newSizeHistogram(FLUSH_MEMSTORE_SIZE, FLUSH_MEMSTORE_SIZE_DESC);
@ -192,18 +209,30 @@ public class MetricsRegionServerSourceImpl extends BaseSourceImpl
}
@Override
public void updateGet(long t) {
public void updateGet(long t, long blockBytesScanned) {
getHisto.add(t);
if (blockBytesScanned > 0) {
blockBytesScannedCount.incr(blockBytesScanned);
getBlockBytesScanned.add(blockBytesScanned);
}
}
@Override
public void updateIncrement(long t) {
public void updateIncrement(long t, long blockBytesScanned) {
incrementHisto.add(t);
if (blockBytesScanned > 0) {
blockBytesScannedCount.incr(blockBytesScanned);
incrementBlockBytesScanned.add(blockBytesScanned);
}
}
@Override
public void updateAppend(long t) {
public void updateAppend(long t, long blockBytesScanned) {
appendHisto.add(t);
if (blockBytesScanned > 0) {
blockBytesScannedCount.incr(blockBytesScanned);
appendBlockBytesScanned.add(blockBytesScanned);
}
}
@Override
@ -212,13 +241,13 @@ public class MetricsRegionServerSourceImpl extends BaseSourceImpl
}
@Override
public void updateScanSize(long scanSize) {
scanSizeHisto.add(scanSize);
}
@Override
public void updateScanTime(long t) {
scanTimeHisto.add(t);
public void updateScan(long time, long responseSize, long blockBytesScanned) {
scanTimeHisto.add(time);
scanSizeHisto.add(responseSize);
if (blockBytesScanned > 0) {
blockBytesScannedCount.incr(blockBytesScanned);
scanBlockBytesScanned.add(blockBytesScanned);
}
}
@Override
@ -646,8 +675,12 @@ public class MetricsRegionServerSourceImpl extends BaseSourceImpl
}
@Override
public void updateCheckAndMutate(long t) {
checkAndMutateHisto.add(t);
public void updateCheckAndMutate(long time, long blockBytesScanned) {
checkAndMutateHisto.add(time);
if (blockBytesScanned > 0) {
blockBytesScannedCount.incr(blockBytesScanned);
checkAndMutateBlockBytesScanned.add(blockBytesScanned);
}
}
@Override

View File

@ -51,15 +51,17 @@ public interface MetricsUserSource extends Comparable<MetricsUserSource> {
void updateDelete(long t);
void updateGet(long t);
void updateGet(long time, long blockBytesScanned);
void updateIncrement(long t);
void updateIncrement(long time, long blockBytesScanned);
void updateAppend(long t);
void updateAppend(long time, long blockBytesScanned);
void updateReplay(long t);
void updateScanTime(long t);
void updateScan(long time, long blockBytesScanned);
void updateCheckAndMutate(long blockBytesScanned);
void getMetrics(MetricsCollector metricsCollector, boolean all);

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,6 +46,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
private final String userIncrementKey;
private final String userAppendKey;
private final String userReplayKey;
private final String userBlockBytesScannedKey;
private MetricHistogram getHisto;
private MetricHistogram scanTimeHisto;
@ -53,6 +55,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
private MetricHistogram incrementHisto;
private MetricHistogram appendHisto;
private MetricHistogram replayHisto;
private MutableFastCounter blockBytesScannedCount;
private final int hashCode;
@ -116,7 +119,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
this.user = user;
this.registry = agg.getMetricsRegistry();
this.userNamePrefix = "user_" + user + "_metric_";
this.userNamePrefix = "User_" + user + "_metric_";
hashCode = userNamePrefix.hashCode();
@ -127,6 +130,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
userBlockBytesScannedKey = userNamePrefix + MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY;
clientMetricsMap = new ConcurrentHashMap<>();
agg.register(this);
}
@ -141,6 +145,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
incrementHisto = registry.newTimeHistogram(userIncrementKey);
appendHisto = registry.newTimeHistogram(userAppendKey);
replayHisto = registry.newTimeHistogram(userReplayKey);
blockBytesScannedCount = registry.newCounter(userBlockBytesScannedKey, "", 0);
}
}
@ -165,6 +170,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
registry.removeMetric(userIncrementKey);
registry.removeMetric(userAppendKey);
registry.removeMetric(userReplayKey);
registry.removeMetric(userBlockBytesScannedKey);
}
}
@ -231,18 +237,21 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
}
@Override
public void updateGet(long t) {
getHisto.add(t);
public void updateGet(long time, long blockBytesScanned) {
getHisto.add(time);
blockBytesScannedCount.incr(blockBytesScanned);
}
@Override
public void updateIncrement(long t) {
incrementHisto.add(t);
public void updateIncrement(long time, long blockBytesScanned) {
incrementHisto.add(time);
blockBytesScannedCount.incr(blockBytesScanned);
}
@Override
public void updateAppend(long t) {
appendHisto.add(t);
public void updateAppend(long time, long blockBytesScanned) {
appendHisto.add(time);
blockBytesScannedCount.incr(blockBytesScanned);
}
@Override
@ -251,8 +260,14 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
}
@Override
public void updateScanTime(long t) {
scanTimeHisto.add(t);
public void updateScan(long time, long blockBytesScanned) {
scanTimeHisto.add(time);
blockBytesScannedCount.incr(blockBytesScanned);
}
@Override
public void updateCheckAndMutate(long blockBytesScanned) {
blockBytesScannedCount.incr(blockBytesScanned);
}
@Override

View File

@ -87,9 +87,18 @@ public interface RpcCallContext {
*/
void incrementResponseCellSize(long cellSize);
long getResponseBlockSize();
/**
* Get the number of block bytes scanned by the current call. In order to serve a response, 1 or
* more lower level blocks must be loaded (from disk or cache) and scanned for the requested
* cells. This value includes the total block size for each block loaded for the request.
*/
long getBlockBytesScanned();
void incrementResponseBlockSize(long blockSize);
/**
* Increment the number of block bytes scanned by the current call. See
* {@link #getBlockBytesScanned()} for details.
*/
void incrementBlockBytesScanned(long blockSize);
long getResponseExceptionSize();

View File

@ -428,7 +428,7 @@ public abstract class RpcServer implements RpcServerInterface, ConfigurationObse
// Use the raw request call size for now.
long requestSize = call.getSize();
long responseSize = result.getSerializedSize();
long responseBlockSize = call.getResponseBlockSize();
long responseBlockSize = call.getBlockBytesScanned();
if (call.isClientCellBlockSupported()) {
// Include the payload size in HBaseRpcController
responseSize += call.getResponseCellSize();

View File

@ -424,12 +424,12 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
}
@Override
public long getResponseBlockSize() {
public long getBlockBytesScanned() {
return responseBlockSize;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
public void incrementBlockBytesScanned(long blockSize) {
responseBlockSize += blockSize;
}

View File

@ -150,44 +150,45 @@ public class MetricsRegionServer {
serverSource.updateCheckAndPut(t);
}
public void updateCheckAndMutate(HRegion region, long t) {
public void updateCheckAndMutate(HRegion region, long time, long blockBytesScanned) {
if (region.getMetricsTableRequests() != null) {
region.getMetricsTableRequests().updateCheckAndMutate(t);
region.getMetricsTableRequests().updateCheckAndMutate(time, blockBytesScanned);
}
serverSource.updateCheckAndMutate(t);
serverSource.updateCheckAndMutate(time, blockBytesScanned);
userAggregate.updateCheckAndMutate(blockBytesScanned);
}
public void updateGet(HRegion region, long t) {
public void updateGet(HRegion region, long time, long blockBytesScanned) {
if (region.getMetricsTableRequests() != null) {
region.getMetricsTableRequests().updateGet(t);
region.getMetricsTableRequests().updateGet(time, blockBytesScanned);
}
if (t > slowMetricTime) {
if (time > slowMetricTime) {
serverSource.incrSlowGet();
}
serverSource.updateGet(t);
userAggregate.updateGet(t);
serverSource.updateGet(time, blockBytesScanned);
userAggregate.updateGet(time, blockBytesScanned);
}
public void updateIncrement(HRegion region, long t) {
public void updateIncrement(HRegion region, long time, long blockBytesScanned) {
if (region.getMetricsTableRequests() != null) {
region.getMetricsTableRequests().updateIncrement(t);
region.getMetricsTableRequests().updateIncrement(time, blockBytesScanned);
}
if (t > slowMetricTime) {
if (time > slowMetricTime) {
serverSource.incrSlowIncrement();
}
serverSource.updateIncrement(t);
userAggregate.updateIncrement(t);
serverSource.updateIncrement(time, blockBytesScanned);
userAggregate.updateIncrement(time, blockBytesScanned);
}
public void updateAppend(HRegion region, long t) {
public void updateAppend(HRegion region, long time, long blockBytesScanned) {
if (region.getMetricsTableRequests() != null) {
region.getMetricsTableRequests().updateAppend(t);
region.getMetricsTableRequests().updateAppend(time, blockBytesScanned);
}
if (t > slowMetricTime) {
if (time > slowMetricTime) {
serverSource.incrSlowAppend();
}
serverSource.updateAppend(t);
userAggregate.updateAppend(t);
serverSource.updateAppend(time, blockBytesScanned);
userAggregate.updateAppend(time, blockBytesScanned);
}
public void updateReplay(long t) {
@ -195,19 +196,12 @@ public class MetricsRegionServer {
userAggregate.updateReplay(t);
}
public void updateScanSize(HRegion region, long scanSize) {
public void updateScan(HRegion region, long time, long responseCellSize, long blockBytesScanned) {
if (region.getMetricsTableRequests() != null) {
region.getMetricsTableRequests().updateScanSize(scanSize);
region.getMetricsTableRequests().updateScan(time, responseCellSize, blockBytesScanned);
}
serverSource.updateScanSize(scanSize);
}
public void updateScanTime(HRegion region, long t) {
if (region.getMetricsTableRequests() != null) {
region.getMetricsTableRequests().updateScanTime(t);
}
serverSource.updateScanTime(t);
userAggregate.updateScanTime(t);
serverSource.updateScan(time, responseCellSize, blockBytesScanned);
userAggregate.updateScan(time, blockBytesScanned);
}
public void updateSplitTime(long t) {
@ -300,4 +294,5 @@ public class MetricsRegionServer {
public void incrScannerLeaseExpired() {
serverSource.incrScannerLeaseExpired();
}
}

View File

@ -29,17 +29,20 @@ public interface MetricsUserAggregate {
void updateDelete(long t);
void updateGet(long t);
void updateGet(long time, long blockBytesScanned);
void updateIncrement(long t);
void updateIncrement(long time, long blockBytesScanned);
void updateAppend(long t);
void updateAppend(long time, long blockBytesScanned);
void updateReplay(long t);
void updateScanTime(long t);
void updateScan(long time, long blockBytesScanned);
void updateCheckAndMutate(long blockBytesScanned);
void updateFilteredReadRequests();
void updateReadRequestCount();
}

View File

@ -51,17 +51,17 @@ public class MetricsUserAggregateFactory {
}
@Override
public void updateGet(long t) {
public void updateGet(long time, long blockBytesScanned) {
}
@Override
public void updateIncrement(long t) {
public void updateIncrement(long time, long blockBytesScanned) {
}
@Override
public void updateAppend(long t) {
public void updateAppend(long time, long blockBytesScanned) {
}
@ -71,7 +71,12 @@ public class MetricsUserAggregateFactory {
}
@Override
public void updateScanTime(long t) {
public void updateScan(long time, long blockBytesScanned) {
}
@Override
public void updateCheckAndMutate(long blockBytesScanned) {
}

View File

@ -114,30 +114,30 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate {
}
@Override
public void updateGet(long t) {
public void updateGet(long time, long blockBytesScanned) {
String user = getActiveUser();
if (user != null) {
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateGet(t);
userSource.updateGet(time, blockBytesScanned);
}
}
@Override
public void updateIncrement(long t) {
public void updateIncrement(long time, long blockBytesScanned) {
String user = getActiveUser();
if (user != null) {
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateIncrement(t);
userSource.updateIncrement(time, blockBytesScanned);
incrementClientWriteMetrics(userSource);
}
}
@Override
public void updateAppend(long t) {
public void updateAppend(long time, long blockBytesScanned) {
String user = getActiveUser();
if (user != null) {
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateAppend(t);
userSource.updateAppend(time, blockBytesScanned);
incrementClientWriteMetrics(userSource);
}
}
@ -153,11 +153,20 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate {
}
@Override
public void updateScanTime(long t) {
public void updateScan(long time, long blockBytesScanned) {
String user = getActiveUser();
if (user != null) {
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateScanTime(t);
userSource.updateScan(time, blockBytesScanned);
}
}
@Override
public void updateCheckAndMutate(long blockBytesScanned) {
String user = getActiveUser();
if (user != null) {
MetricsUserSource userSource = getOrCreateMetricsUser(user);
userSource.updateCheckAndMutate(blockBytesScanned);
}
}

View File

@ -650,16 +650,20 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
*/
private Result append(final HRegion region, final OperationQuota quota,
final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
ActivePolicyEnforcement spaceQuota) throws IOException {
ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
Append append = ProtobufUtil.toAppend(mutation, cellScanner);
checkCellSizeLimit(region, append);
spaceQuota.getPolicyEnforcement(region).check(append);
quota.addMutation(append);
long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
Result r = region.append(append, nonceGroup, nonce);
if (server.getMetrics() != null) {
server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before);
long blockBytesScanned =
context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before,
blockBytesScanned);
}
return r == null ? Result.EMPTY_RESULT : r;
}
@ -669,17 +673,21 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
*/
private Result increment(final HRegion region, final OperationQuota quota,
final MutationProto mutation, final CellScanner cells, long nonceGroup,
ActivePolicyEnforcement spaceQuota) throws IOException {
ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
Increment increment = ProtobufUtil.toIncrement(mutation, cells);
checkCellSizeLimit(region, increment);
spaceQuota.getPolicyEnforcement(region).check(increment);
quota.addMutation(increment);
long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
Result r = region.increment(increment, nonceGroup, nonce);
final MetricsRegionServer metricsRegionServer = server.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before);
long blockBytesScanned =
context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before,
blockBytesScanned);
}
return r == null ? Result.EMPTY_RESULT : r;
}
@ -714,12 +722,11 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
resultOrExceptionBuilder.clear();
try {
Result r = null;
long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
if (
context != null && context.isRetryImmediatelySupported()
&& (context.getResponseCellSize() > maxQuotaResultSize
|| context.getResponseBlockSize() + context.getResponseExceptionSize()
> maxQuotaResultSize)
|| blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize)
) {
// We're storing the exception since the exception and reason string won't
@ -730,7 +737,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
//
// Instead just create the exception and then store it.
sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
+ context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize());
+ context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore);
// Only report the exception once since there's only one request that
// caused the exception. Otherwise this number will dominate the exceptions count.
@ -771,7 +778,10 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
} finally {
final MetricsRegionServer metricsRegionServer = server.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before);
long blockBytesScanned =
context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
blockBytesScanned);
}
}
} else if (action.hasServiceCall()) {
@ -797,11 +807,11 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
switch (type) {
case APPEND:
r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
spaceQuotaEnforcement);
spaceQuotaEnforcement, context);
break;
case INCREMENT:
r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
spaceQuotaEnforcement);
spaceQuotaEnforcement, context);
break;
case PUT:
case DELETE:
@ -2425,6 +2435,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
long before = EnvironmentEdgeManager.currentTime();
OperationQuota quota = null;
HRegion region = null;
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
try {
checkOpen();
requestCount.increment();
@ -2445,7 +2456,6 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
}
Boolean existence = null;
Result r = null;
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
Get clientGet = ProtobufUtil.toGet(get);
@ -2495,11 +2505,10 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
throw new ServiceException(ie);
} finally {
final MetricsRegionServer metricsRegionServer = server.getMetrics();
if (metricsRegionServer != null) {
TableDescriptor td = region != null ? region.getTableDescriptor() : null;
if (td != null) {
metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before);
}
if (metricsRegionServer != null && region != null) {
long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0;
metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
blockBytesScanned);
}
if (quota != null) {
quota.close();
@ -2751,7 +2760,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
if (regionAction.getActionCount() == 1) {
CheckAndMutateResult result =
checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
regionActionResultBuilder.setProcessed(result.isSuccess());
resultOrExceptionOrBuilder.setIndex(0);
if (result.getResult() != null) {
@ -2916,7 +2925,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
if (request.hasCondition()) {
CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
request.getCondition(), nonceGroup, spaceQuotaEnforcement);
request.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
builder.setProcessed(result.isSuccess());
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
@ -2930,11 +2939,13 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
switch (type) {
case APPEND:
// TODO: this doesn't actually check anything.
r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
context);
break;
case INCREMENT:
// TODO: this doesn't actually check anything.
r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
context);
break;
case PUT:
put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
@ -3001,8 +3012,9 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
ActivePolicyEnforcement spaceQuota) throws IOException {
ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
@ -3022,7 +3034,9 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
MetricsRegionServer metricsRegionServer = server.getMetrics();
if (metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
metricsRegionServer.updateCheckAndMutate(region, after - before);
long blockBytesScanned =
context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned);
MutationType type = mutation.getMutateType();
switch (type) {
@ -3313,7 +3327,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
long maxCellSize = maxResultSize;
long maxBlockSize = maxQuotaResultSize;
if (rpcCall != null) {
maxBlockSize -= rpcCall.getResponseBlockSize();
maxBlockSize -= rpcCall.getBlockBytesScanned();
maxCellSize -= rpcCall.getResponseCellSize();
}
@ -3431,6 +3445,10 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
if (trackMetrics) {
// rather than increment yet another counter in StoreScanner, just set the value here
// from block size progress before writing into the response
scannerContext.getMetrics().countOfBlockBytesScanned
.set(scannerContext.getBlockSizeProgress());
Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
@ -3448,12 +3466,16 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
region.closeRegionOperation();
// Update serverside metrics, even on error.
long end = EnvironmentEdgeManager.currentTime();
long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0;
long responseCellSize = 0;
long blockBytesScanned = 0;
if (rpcCall != null) {
responseCellSize = rpcCall.getResponseCellSize();
blockBytesScanned = rpcCall.getBlockBytesScanned();
}
region.getMetrics().updateScanTime(end - before);
final MetricsRegionServer metricsRegionServer = server.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateScanSize(region, responseCellSize);
metricsRegionServer.updateScanTime(region, end - before);
metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
}
}

View File

@ -621,7 +621,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
heap.recordBlockSize(blockSize -> {
if (rpcCall.isPresent()) {
rpcCall.get().incrementResponseBlockSize(blockSize);
rpcCall.get().incrementBlockBytesScanned(blockSize);
}
scannerContext.incrementBlockProgress(blockSize);
});

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.metrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.metrics.Counter;
import org.apache.hadoop.hbase.metrics.Histogram;
import org.apache.hadoop.hbase.metrics.Meter;
import org.apache.hadoop.hbase.metrics.MetricRegistries;
@ -72,6 +73,12 @@ public class MetricsTableRequests {
private final static String CHECK_AND_DELETE_TIME = "checkAndDeleteTime";
private final static String CHECK_AND_PUT_TIME = "checkAndPutTime";
private final static String CHECK_AND_MUTATE_TIME = "checkAndMutateTime";
String BLOCK_BYTES_SCANNED_KEY = "blockBytesScannedCount";
String GET_BLOCK_BYTES_SCANNED_KEY = "getBlockBytesScanned";
String SCAN_BLOCK_BYTES_SCANNED_KEY = "scanBlockBytesScanned";
String CHECK_AND_MUTATE_BLOCK_BYTES_SCANNED_KEY = "checkAndMutateBlockBytesScanned";
String INCREMENT_BLOCK_BYTES_SCANNED_KEY = "incrementBlockBytesScanned";
String APPEND_BLOCK_BYTES_SCANNED_KEY = "appendBlockBytesScanned";
private final static String TABLE_READ_QUERY_PER_SECOND = "tableReadQueryPerSecond";
private final static String TABLE_WRITE_QUERY_PER_SECOND = "tableWriteQueryPerSecond";
@ -87,6 +94,12 @@ public class MetricsTableRequests {
private Histogram checkAndDeleteTimeHistogram;
private Histogram checkAndPutTimeHistogram;
private Histogram checkAndMutateTimeHistogram;
private Counter blockBytesScannedCount;
private Histogram checkAndMutateBlockBytesScanned;
private Histogram getBlockBytesScanned;
private Histogram incrementBlockBytesScanned;
private Histogram appendBlockBytesScanned;
private Histogram scanBlockBytesScanned;
private Meter readMeter;
private Meter writeMeter;
@ -133,6 +146,13 @@ public class MetricsTableRequests {
checkAndDeleteTimeHistogram = registry.histogram(CHECK_AND_DELETE_TIME);
checkAndPutTimeHistogram = registry.histogram(CHECK_AND_PUT_TIME);
checkAndMutateTimeHistogram = registry.histogram(CHECK_AND_MUTATE_TIME);
blockBytesScannedCount = registry.counter(BLOCK_BYTES_SCANNED_KEY);
checkAndMutateBlockBytesScanned =
registry.histogram(CHECK_AND_MUTATE_BLOCK_BYTES_SCANNED_KEY);
getBlockBytesScanned = registry.histogram(GET_BLOCK_BYTES_SCANNED_KEY);
incrementBlockBytesScanned = registry.histogram(INCREMENT_BLOCK_BYTES_SCANNED_KEY);
appendBlockBytesScanned = registry.histogram(APPEND_BLOCK_BYTES_SCANNED_KEY);
scanBlockBytesScanned = registry.histogram(SCAN_BLOCK_BYTES_SCANNED_KEY);
}
if (enabTableQueryMeterMetrics) {
@ -208,51 +228,63 @@ public class MetricsTableRequests {
/**
* Update the Get time histogram .
* @param t time it took
* @param time time it took
* @param blockBytesScanned size of block bytes scanned to retrieve the response
*/
public void updateGet(long t) {
public void updateGet(long time, long blockBytesScanned) {
if (isEnableTableLatenciesMetrics()) {
getTimeHistogram.update(t);
getTimeHistogram.update(time);
if (blockBytesScanned > 0) {
blockBytesScannedCount.increment(blockBytesScanned);
getBlockBytesScanned.update(blockBytesScanned);
}
}
}
/**
* Update the Increment time histogram.
* @param t time it took
* @param time time it took
* @param blockBytesScanned size of block bytes scanned to retrieve the response
*/
public void updateIncrement(long t) {
public void updateIncrement(long time, long blockBytesScanned) {
if (isEnableTableLatenciesMetrics()) {
incrementTimeHistogram.update(t);
incrementTimeHistogram.update(time);
if (blockBytesScanned > 0) {
blockBytesScannedCount.increment(blockBytesScanned);
incrementBlockBytesScanned.update(blockBytesScanned);
}
}
}
/**
* Update the Append time histogram.
* @param t time it took
* @param time time it took
* @param blockBytesScanned size of block bytes scanned to retrieve the response
*/
public void updateAppend(long t) {
public void updateAppend(long time, long blockBytesScanned) {
if (isEnableTableLatenciesMetrics()) {
appendTimeHistogram.update(t);
appendTimeHistogram.update(time);
if (blockBytesScanned > 0) {
blockBytesScannedCount.increment(blockBytesScanned);
appendBlockBytesScanned.update(blockBytesScanned);
}
}
}
/**
* Update the scan size.
* @param scanSize size of the scan
* Update the scan metrics.
* @param time response time of scan
* @param responseCellSize size of the scan resposne
* @param blockBytesScanned size of block bytes scanned to retrieve the response
*/
public void updateScanSize(long scanSize) {
public void updateScan(long time, long responseCellSize, long blockBytesScanned) {
if (isEnableTableLatenciesMetrics()) {
scanSizeHistogram.update(scanSize);
}
}
/**
* Update the scan time.
* @param t time it took
*/
public void updateScanTime(long t) {
if (isEnableTableLatenciesMetrics()) {
scanTimeHistogram.update(t);
scanTimeHistogram.update(time);
scanSizeHistogram.update(responseCellSize);
if (blockBytesScanned > 0) {
blockBytesScannedCount.increment(blockBytesScanned);
scanBlockBytesScanned.update(blockBytesScanned);
}
}
}
@ -280,9 +312,13 @@ public class MetricsTableRequests {
* Update the CheckAndMutate time histogram.
* @param time time it took
*/
public void updateCheckAndMutate(long time) {
public void updateCheckAndMutate(long time, long blockBytesScanned) {
if (isEnableTableLatenciesMetrics()) {
checkAndMutateTimeHistogram.update(time);
if (blockBytesScanned > 0) {
blockBytesScannedCount.increment(blockBytesScanned);
checkAndMutateBlockBytesScanned.update(blockBytesScanned);
}
}
}

View File

@ -669,12 +669,12 @@ public class TestNamedQueueRecorder {
}
@Override
public long getResponseBlockSize() {
public long getBlockBytesScanned() {
return 0;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
public void incrementBlockBytesScanned(long blockSize) {
}
@Override

View File

@ -287,12 +287,12 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
}
@Override
public long getResponseBlockSize() {
public long getBlockBytesScanned() {
return 0;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
public void incrementBlockBytesScanned(long blockSize) {
}
@Override

View File

@ -154,20 +154,20 @@ public class TestMetricsRegionServer {
when(metricsTableRequests.isEnableTableLatenciesMetrics()).thenReturn(false);
when(metricsTableRequests.isEnabTableQueryMeterMetrics()).thenReturn(false);
for (int i = 0; i < 12; i++) {
rsm.updateAppend(region, 12);
rsm.updateAppend(region, 1002);
rsm.updateAppend(region, 12, 120);
rsm.updateAppend(region, 1002, 10020);
}
for (int i = 0; i < 13; i++) {
rsm.updateDeleteBatch(region, 13);
rsm.updateDeleteBatch(region, 1003);
}
for (int i = 0; i < 14; i++) {
rsm.updateGet(region, 14);
rsm.updateGet(region, 1004);
rsm.updateGet(region, 14, 140);
rsm.updateGet(region, 1004, 10040);
}
for (int i = 0; i < 15; i++) {
rsm.updateIncrement(region, 15);
rsm.updateIncrement(region, 1005);
rsm.updateIncrement(region, 15, 150);
rsm.updateIncrement(region, 1005, 10050);
}
for (int i = 0; i < 16; i++) {
rsm.updatePutBatch(region, 16);
@ -181,19 +181,24 @@ public class TestMetricsRegionServer {
rsm.updateDelete(region, 1003);
rsm.updateCheckAndDelete(region, 17);
rsm.updateCheckAndPut(region, 17);
rsm.updateCheckAndMutate(region, 17);
rsm.updateCheckAndMutate(region, 17, 170);
}
HELPER.assertCounter("blockBytesScannedCount", 420090, serverSource);
HELPER.assertCounter("appendNumOps", 24, serverSource);
HELPER.assertCounter("appendBlockBytesScannedNumOps", 24, serverSource);
HELPER.assertCounter("deleteBatchNumOps", 26, serverSource);
HELPER.assertCounter("getNumOps", 28, serverSource);
HELPER.assertCounter("getBlockBytesScannedNumOps", 28, serverSource);
HELPER.assertCounter("incrementNumOps", 30, serverSource);
HELPER.assertCounter("incrementBlockBytesScannedNumOps", 30, serverSource);
HELPER.assertCounter("putBatchNumOps", 32, serverSource);
HELPER.assertCounter("putNumOps", 34, serverSource);
HELPER.assertCounter("deleteNumOps", 34, serverSource);
HELPER.assertCounter("checkAndDeleteNumOps", 17, serverSource);
HELPER.assertCounter("checkAndPutNumOps", 17, serverSource);
HELPER.assertCounter("checkAndMutateNumOps", 17, serverSource);
HELPER.assertCounter("checkAndMutateBlockBytesScannedNumOps", 17, serverSource);
HELPER.assertCounter("slowAppendCount", 12, serverSource);
HELPER.assertCounter("slowDeleteCount", 17, serverSource);

View File

@ -64,20 +64,26 @@ public class TestMetricsTableRequests {
Optional<MetricRegistry> registry2 = MetricRegistries.global().get(info2);
assertTrue(registry2.isPresent());
requests1.updateGet(500L);
requests1.updateGet(500L, 5000L);
Snapshot latencies1SnapshotGet =
((HistogramImpl) registry1.get().get("getTime").get()).snapshot();
assertEquals(500, latencies1SnapshotGet.get999thPercentile());
Snapshot blockBytesScanned1SnapshotGet =
((HistogramImpl) registry1.get().get("getBlockBytesScanned").get()).snapshot();
assertEquals(5000, blockBytesScanned1SnapshotGet.get999thPercentile());
requests1.updatePut(50L);
Snapshot latencies1SnapshotPut =
((HistogramImpl) registry1.get().get("putTime").get()).snapshot();
assertEquals(50, latencies1SnapshotPut.get99thPercentile());
requests2.updateGet(300L);
requests2.updateGet(300L, 3000L);
Snapshot latencies2SnapshotGet =
((HistogramImpl) registry2.get().get("getTime").get()).snapshot();
assertEquals(300, latencies2SnapshotGet.get999thPercentile());
Snapshot blockBytesScanned2SnapshotGet =
((HistogramImpl) registry2.get().get("getBlockBytesScanned").get()).snapshot();
assertEquals(3000, blockBytesScanned2SnapshotGet.get999thPercentile());
requests2.updatePut(75L);
Snapshot latencies2SnapshotPut =

View File

@ -63,6 +63,7 @@ public class TestMetricsUserAggregate {
public void setUp() {
wrapper = new MetricsRegionServerWrapperStub();
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF, true);
rsm = new MetricsRegionServer(wrapper, conf, null);
userAgg = (MetricsUserAggregate) rsm.getMetricsUserAggregate();
}
@ -74,10 +75,10 @@ public class TestMetricsUserAggregate {
when(metricsTableRequests.isEnableTableLatenciesMetrics()).thenReturn(false);
when(metricsTableRequests.isEnabTableQueryMeterMetrics()).thenReturn(false);
for (int i = 0; i < 10; i++) {
rsm.updateGet(region, 10);
rsm.updateGet(region, 10, 10);
}
for (int i = 0; i < 11; i++) {
rsm.updateScanTime(region, 11);
rsm.updateScan(region, 11, 111, 1111);
}
for (int i = 0; i < 12; i++) {
rsm.updatePut(region, 12);
@ -86,10 +87,10 @@ public class TestMetricsUserAggregate {
rsm.updateDelete(region, 13);
}
for (int i = 0; i < 14; i++) {
rsm.updateIncrement(region, 14);
rsm.updateIncrement(region, 14, 140);
}
for (int i = 0; i < 15; i++) {
rsm.updateAppend(region, 15);
rsm.updateAppend(region, 15, 150);
}
for (int i = 0; i < 16; i++) {
rsm.updateReplay(16);
@ -99,13 +100,6 @@ public class TestMetricsUserAggregate {
@Test
public void testPerUserOperations() {
Configuration conf = HBaseConfiguration.create();
// If metrics for users is not enabled, this test doesn't make sense.
if (
!conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF,
MetricsUserAggregateFactory.DEFAULT_METRIC_USER_ENABLED_CONF)
) {
return;
}
User userFoo = User.createUserForTesting(conf, "FOO", new String[0]);
User userBar = User.createUserForTesting(conf, "BAR", new String[0]);
@ -132,6 +126,7 @@ public class TestMetricsUserAggregate {
HELPER.assertCounter("userfoometricincrementnumops", 14, userAgg.getSource());
HELPER.assertCounter("userfoometricappendnumops", 15, userAgg.getSource());
HELPER.assertCounter("userfoometricreplaynumops", 16, userAgg.getSource());
HELPER.assertCounter("userfoometricblockbytesscannedcount", 16531, userAgg.getSource());
HELPER.assertCounter("userbarmetricgetnumops", 10, userAgg.getSource());
HELPER.assertCounter("userbarmetricscantimenumops", 11, userAgg.getSource());
@ -140,18 +135,12 @@ public class TestMetricsUserAggregate {
HELPER.assertCounter("userbarmetricincrementnumops", 14, userAgg.getSource());
HELPER.assertCounter("userbarmetricappendnumops", 15, userAgg.getSource());
HELPER.assertCounter("userbarmetricreplaynumops", 16, userAgg.getSource());
HELPER.assertCounter("userbarmetricblockbytesscannedcount", 16531, userAgg.getSource());
}
@Test
public void testLossyCountingOfUserMetrics() {
Configuration conf = HBaseConfiguration.create();
// If metrics for users is not enabled, this test doesn't make sense.
if (
!conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF,
MetricsUserAggregateFactory.DEFAULT_METRIC_USER_ENABLED_CONF)
) {
return;
}
int noOfUsers = 10000;
for (int i = 1; i <= noOfUsers; i++) {
User.createUserForTesting(conf, "FOO" + i, new String[0]).getUGI()
@ -163,7 +152,7 @@ public class TestMetricsUserAggregate {
when(region.getMetricsTableRequests()).thenReturn(metricsTableRequests);
when(metricsTableRequests.isEnableTableLatenciesMetrics()).thenReturn(false);
when(metricsTableRequests.isEnabTableQueryMeterMetrics()).thenReturn(false);
rsm.updateGet(region, 10);
rsm.updateGet(region, 10, 100);
return null;
}
});
@ -173,7 +162,11 @@ public class TestMetricsUserAggregate {
for (int i = 1; i <= noOfUsers / 10; i++) {
assertFalse(
HELPER.checkCounterExists("userfoo" + i + "metricgetnumops", userAgg.getSource()));
assertFalse(HELPER.checkCounterExists("userfoo" + i + "metricblockbytesscannedcount",
userAgg.getSource()));
}
HELPER.assertCounter("userfoo" + noOfUsers + "metricgetnumops", 1, userAgg.getSource());
HELPER.assertCounter("userfoo" + noOfUsers + "metricblockbytesscannedcount", 100,
userAgg.getSource());
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SkipFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@ -69,11 +70,8 @@ public class TestScannerBlockSizeLimits {
private static final byte[] COLUMN1 = Bytes.toBytes(0);
private static final byte[] COLUMN2 = Bytes.toBytes(1);
private static final byte[] COLUMN3 = Bytes.toBytes(2);
private static final byte[] COLUMN4 = Bytes.toBytes(4);
private static final byte[] COLUMN5 = Bytes.toBytes(5);
private static final byte[][] COLUMNS = new byte[][] { COLUMN1, COLUMN2 };
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
@ -83,6 +81,15 @@ public class TestScannerBlockSizeLimits {
createTestData();
}
@Before
public void setupEach() throws Exception {
HRegionServer regionServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
for (HRegion region : regionServer.getRegions(TABLE)) {
System.out.println("Clearing cache for region " + region.getRegionInfo().getEncodedName());
regionServer.clearRegionBlockCache(region);
}
}
private static void createTestData() throws IOException, InterruptedException {
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
@ -91,12 +98,13 @@ public class TestScannerBlockSizeLimits {
for (int i = 1; i < 10; i++) {
// 5 columns per row, in 2 families
// Each column value is 1000 bytes, which is enough to fill a full block with row and header.
// So 5 blocks per row.
// So 5 blocks per row in FAMILY1
Put put = new Put(Bytes.toBytes(i));
for (int j = 0; j < 6; j++) {
put.addColumn(FAMILY1, Bytes.toBytes(j), DATA);
}
// Additional block in FAMILY2 (notably smaller than block size)
put.addColumn(FAMILY2, COLUMN1, DATA);
region.put(put);
@ -128,6 +136,8 @@ public class TestScannerBlockSizeLimits {
scanner.next(100);
// we fetch 2 columns from 1 row, so about 2 blocks
assertEquals(4120, metrics.countOfBlockBytesScanned.get());
assertEquals(1, metrics.countOfRowsScanned.get());
assertEquals(1, metrics.countOfRPCcalls.get());
}
@ -150,6 +160,8 @@ public class TestScannerBlockSizeLimits {
.addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2)
.setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2)))));
ScanMetrics metrics = scanner.getScanMetrics();
boolean foundRow3 = false;
for (Result result : scanner) {
Set<Integer> rows = new HashSet<>();
@ -163,10 +175,11 @@ public class TestScannerBlockSizeLimits {
foundRow3 = true;
}
}
ScanMetrics metrics = scanner.getScanMetrics();
// 4 blocks per row, so 36 blocks. We can scan 3 blocks per RPC, which is 12 RPCs. But we can
// skip 1 row, so skip 2 RPCs.
// 22 blocks, last one is 1030 bytes (approx 3 per row for 8 rows, but some compaction happens
// in family2 since each row only has 1 cell there and 2 can fit per block)
assertEquals(44290, metrics.countOfBlockBytesScanned.get());
// We can return 22 blocks in 9 RPCs, but need an extra one to check for more rows at end
assertEquals(10, metrics.countOfRPCcalls.get());
}
@ -192,6 +205,7 @@ public class TestScannerBlockSizeLimits {
ScanMetrics metrics = scanner.getScanMetrics();
// scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total
assertEquals(18540, metrics.countOfBlockBytesScanned.get());
// limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd)
// so that's 3 RPC and the last RPC pulls the cells loaded by the last block
assertEquals(4, metrics.countOfRPCcalls.get());
@ -216,9 +230,11 @@ public class TestScannerBlockSizeLimits {
}
}
ScanMetrics metrics = scanner.getScanMetrics();
System.out.println(metrics.countOfBlockBytesScanned.get());
// We will return 9 rows, but also 2 cursors because we exceed the scan size limit partway
// through. So that accounts for 11 rpcs.
// 9 rows, total of 32 blocks (last one is 1030)
assertEquals(64890, metrics.countOfBlockBytesScanned.get());
// We can return 32 blocks in approx 11 RPCs but we need 2 cursors due to the narrow filter
assertEquals(2, cursors);
assertEquals(11, metrics.countOfRPCcalls.get());
}
@ -247,8 +263,9 @@ public class TestScannerBlockSizeLimits {
ScanMetrics metrics = scanner.getScanMetrics();
// Our filter causes us to read the first column of each row, then INCLUDE_AND_SEEK_NEXT_ROW.
// So we load 1 block per row, and there are 9 rows. Our max scan size is large enough to
// return 2 full blocks, with some overflow. So we are able to squeeze this all into 4 RPCs.
// So we load 1 block per row, and there are 9 rows. So 9 blocks
assertEquals(18540, metrics.countOfBlockBytesScanned.get());
// We can return 9 blocks in 3 RPCs, but need 1 more to check for more results (returns 0)
assertEquals(4, metrics.countOfRPCcalls.get());
}
@ -266,8 +283,9 @@ public class TestScannerBlockSizeLimits {
ScanMetrics metrics = scanner.getScanMetrics();
// We have to read the first cell/block of each row, then can skip to the last block. So that's
// 2 blocks per row to read (18 total). Our max scan size is enough to read 3 blocks per RPC,
// plus one final RPC to finish region.
// 2 blocks per row to read (18 blocks total)
assertEquals(37080, metrics.countOfBlockBytesScanned.get());
// Our max scan size is enough to read 3 blocks per RPC, plus one final RPC to finish region.
assertEquals(7, metrics.countOfRPCcalls.get());
}