HBASE-20513 Collect and emit ScanMetrics in PerformanceEvaluation
This commit is contained in:
parent
78ffd7ace6
commit
291dedbf81
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
|||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterAllFilter;
|
||||
|
@ -1046,6 +1047,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
private String testName;
|
||||
private Histogram latencyHistogram;
|
||||
private Histogram valueSizeHistogram;
|
||||
private Histogram rpcCallsHistogram;
|
||||
private Histogram remoteRpcCallsHistogram;
|
||||
private Histogram millisBetweenNextHistogram;
|
||||
private Histogram regionsScannedHistogram;
|
||||
private Histogram bytesInResultsHistogram;
|
||||
private Histogram bytesInRemoteResultsHistogram;
|
||||
private RandomDistribution.Zipf zipf;
|
||||
|
||||
/**
|
||||
|
@ -1102,6 +1109,34 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
this.valueSizeHistogram.update(valueSize);
|
||||
}
|
||||
|
||||
void updateScanMetrics(final ScanMetrics metrics) {
|
||||
Map<String,Long> metricsMap = metrics.getMetricsMap();
|
||||
Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
|
||||
if (rpcCalls != null) {
|
||||
this.rpcCallsHistogram.update(rpcCalls.longValue());
|
||||
}
|
||||
Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
|
||||
if (remoteRpcCalls != null) {
|
||||
this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
|
||||
}
|
||||
Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
|
||||
if (millisBetweenNext != null) {
|
||||
this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
|
||||
}
|
||||
Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
|
||||
if (regionsScanned != null) {
|
||||
this.regionsScannedHistogram.update(regionsScanned.longValue());
|
||||
}
|
||||
Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
|
||||
if (bytesInResults != null && bytesInResults.longValue() > 0) {
|
||||
this.bytesInResultsHistogram.update(bytesInResults.longValue());
|
||||
}
|
||||
Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
|
||||
if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
|
||||
this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
|
||||
}
|
||||
}
|
||||
|
||||
String generateStatus(final int sr, final int i, final int lr) {
|
||||
return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
|
||||
(!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
|
||||
|
@ -1123,10 +1158,19 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
}
|
||||
|
||||
void testSetup() throws IOException {
|
||||
createConnection();
|
||||
onStartup();
|
||||
// test metrics
|
||||
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||
// scan metrics
|
||||
rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||
remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||
millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||
regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||
bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||
bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||
|
||||
createConnection();
|
||||
onStartup();
|
||||
}
|
||||
|
||||
abstract void createConnection() throws IOException;
|
||||
|
@ -1148,6 +1192,30 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
+ YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
|
||||
status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
|
||||
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
|
||||
if (rpcCallsHistogram.getCount() > 0) {
|
||||
status.setStatus("rpcCalls (count): " +
|
||||
YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
|
||||
}
|
||||
if (remoteRpcCallsHistogram.getCount() > 0) {
|
||||
status.setStatus("remoteRpcCalls (count): " +
|
||||
YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
|
||||
}
|
||||
if (millisBetweenNextHistogram.getCount() > 0) {
|
||||
status.setStatus("millisBetweenNext (latency): " +
|
||||
YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
|
||||
}
|
||||
if (regionsScannedHistogram.getCount() > 0) {
|
||||
status.setStatus("regionsScanned (count): " +
|
||||
YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
|
||||
}
|
||||
if (bytesInResultsHistogram.getCount() > 0) {
|
||||
status.setStatus("bytesInResults (size): " +
|
||||
YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
|
||||
}
|
||||
if (bytesInRemoteResultsHistogram.getCount() > 0) {
|
||||
status.setStatus("bytesInRemoteResults (size): " +
|
||||
YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
|
||||
}
|
||||
}
|
||||
closeConnection();
|
||||
receiverHost.closeReceivers();
|
||||
|
@ -1455,6 +1523,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
@Override
|
||||
void testTakedown() throws IOException {
|
||||
if (this.testScanner != null) {
|
||||
updateScanMetrics(this.testScanner.getScanMetrics());
|
||||
this.testScanner.close();
|
||||
}
|
||||
super.testTakedown();
|
||||
|
@ -1466,7 +1535,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
Scan scan =
|
||||
new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
|
||||
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
|
||||
.setReadType(opts.scanReadType);
|
||||
.setReadType(opts.scanReadType).setScanMetricsEnabled(true);
|
||||
if (opts.addColumns) {
|
||||
for (int column = 0; column < opts.columns; column++) {
|
||||
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
|
||||
|
@ -1577,7 +1646,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
void testRow(final int i) throws IOException {
|
||||
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
|
||||
.setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
|
||||
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType);
|
||||
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
|
||||
.setScanMetricsEnabled(true);
|
||||
FilterList list = new FilterList();
|
||||
if (opts.addColumns) {
|
||||
for (int column = 0; column < opts.columns; column++) {
|
||||
|
@ -1593,11 +1663,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
list.addFilter(new WhileMatchFilter(new PageFilter(120)));
|
||||
scan.setFilter(list);
|
||||
ResultScanner s = this.table.getScanner(scan);
|
||||
try {
|
||||
for (Result rr; (rr = s.next()) != null;) {
|
||||
updateValueSize(rr);
|
||||
}
|
||||
} finally {
|
||||
updateScanMetrics(s.getScanMetrics());
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getReportingPeriod() {
|
||||
|
@ -1618,7 +1692,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
|
||||
.withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
|
||||
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
|
||||
.setReadType(opts.scanReadType);
|
||||
.setReadType(opts.scanReadType).setScanMetricsEnabled(true);
|
||||
if (opts.filterAll) {
|
||||
scan.setFilter(new FilterAllFilter());
|
||||
}
|
||||
|
@ -1633,6 +1707,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
Result r = null;
|
||||
int count = 0;
|
||||
ResultScanner s = this.table.getScanner(scan);
|
||||
try {
|
||||
for (; (r = s.next()) != null;) {
|
||||
updateValueSize(r);
|
||||
count++;
|
||||
|
@ -1642,9 +1717,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
Bytes.toString(startAndStopRow.getFirst()),
|
||||
Bytes.toString(startAndStopRow.getSecond()), count));
|
||||
}
|
||||
|
||||
} finally {
|
||||
updateScanMetrics(s.getScanMetrics());
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Pair<byte[],byte[]> getStartAndStopRow();
|
||||
|
||||
|
@ -1824,7 +1901,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
if (this.testScanner == null) {
|
||||
Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
|
||||
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
|
||||
.setReadType(opts.scanReadType);
|
||||
.setReadType(opts.scanReadType).setScanMetricsEnabled(true);
|
||||
if (opts.addColumns) {
|
||||
for (int column = 0; column < opts.columns; column++) {
|
||||
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
|
||||
|
@ -2031,7 +2108,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
updateValueSize(r);
|
||||
}
|
||||
} finally {
|
||||
if (scanner != null) scanner.close();
|
||||
if (scanner != null) {
|
||||
updateScanMetrics(scanner.getScanMetrics());
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2046,7 +2126,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
list.addFilter(new FilterAllFilter());
|
||||
}
|
||||
Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
|
||||
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType);
|
||||
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
|
||||
.setScanMetricsEnabled(true);
|
||||
if (opts.addColumns) {
|
||||
for (int column = 0; column < opts.columns; column++) {
|
||||
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
|
||||
|
|
Loading…
Reference in New Issue