HBASE-20513 Collect and emit ScanMetrics in PerformanceEvaluation
This commit is contained in:
parent
fd24083e26
commit
23b9054089
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
|||||||
import org.apache.hadoop.hbase.client.RowMutations;
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||||
@ -973,6 +974,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
private String testName;
|
private String testName;
|
||||||
private Histogram latencyHistogram;
|
private Histogram latencyHistogram;
|
||||||
private Histogram valueSizeHistogram;
|
private Histogram valueSizeHistogram;
|
||||||
|
private Histogram rpcCallsHistogram;
|
||||||
|
private Histogram remoteRpcCallsHistogram;
|
||||||
|
private Histogram millisBetweenNextHistogram;
|
||||||
|
private Histogram regionsScannedHistogram;
|
||||||
|
private Histogram bytesInResultsHistogram;
|
||||||
|
private Histogram bytesInRemoteResultsHistogram;
|
||||||
private RandomDistribution.Zipf zipf;
|
private RandomDistribution.Zipf zipf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1030,6 +1037,34 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
this.valueSizeHistogram.update(valueSize);
|
this.valueSizeHistogram.update(valueSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void updateScanMetrics(final ScanMetrics metrics) {
|
||||||
|
Map<String,Long> metricsMap = metrics.getMetricsMap();
|
||||||
|
Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
|
||||||
|
if (rpcCalls != null) {
|
||||||
|
this.rpcCallsHistogram.update(rpcCalls.longValue());
|
||||||
|
}
|
||||||
|
Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
|
||||||
|
if (remoteRpcCalls != null) {
|
||||||
|
this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
|
||||||
|
}
|
||||||
|
Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
|
||||||
|
if (millisBetweenNext != null) {
|
||||||
|
this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
|
||||||
|
}
|
||||||
|
Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
|
||||||
|
if (regionsScanned != null) {
|
||||||
|
this.regionsScannedHistogram.update(regionsScanned.longValue());
|
||||||
|
}
|
||||||
|
Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
|
||||||
|
if (bytesInResults != null && bytesInResults.longValue() > 0) {
|
||||||
|
this.bytesInResultsHistogram.update(bytesInResults.longValue());
|
||||||
|
}
|
||||||
|
Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
|
||||||
|
if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
|
||||||
|
this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
String generateStatus(final int sr, final int i, final int lr) {
|
String generateStatus(final int sr, final int i, final int lr) {
|
||||||
return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
|
return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
|
||||||
(!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
|
(!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
|
||||||
@ -1051,12 +1086,22 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void testSetup() throws IOException {
|
void testSetup() throws IOException {
|
||||||
|
// test metrics
|
||||||
|
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
|
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
|
// scan metrics
|
||||||
|
rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
|
remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
|
millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
|
regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
|
bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
|
bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
|
|
||||||
if (!opts.oneCon) {
|
if (!opts.oneCon) {
|
||||||
this.connection = ConnectionFactory.createConnection(conf);
|
this.connection = ConnectionFactory.createConnection(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
onStartup();
|
onStartup();
|
||||||
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
|
||||||
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract void onStartup() throws IOException;
|
abstract void onStartup() throws IOException;
|
||||||
@ -1076,6 +1121,30 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
+ YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
|
+ YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
|
||||||
status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.count());
|
status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.count());
|
||||||
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
|
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
|
||||||
|
if (rpcCallsHistogram.count() > 0) {
|
||||||
|
status.setStatus("rpcCalls (count): " +
|
||||||
|
YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
|
||||||
|
}
|
||||||
|
if (remoteRpcCallsHistogram.count() > 0) {
|
||||||
|
status.setStatus("remoteRpcCalls (count): " +
|
||||||
|
YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
|
||||||
|
}
|
||||||
|
if (millisBetweenNextHistogram.count() > 0) {
|
||||||
|
status.setStatus("millisBetweenNext (latency): " +
|
||||||
|
YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
|
||||||
|
}
|
||||||
|
if (regionsScannedHistogram.count() > 0) {
|
||||||
|
status.setStatus("regionsScanned (count): " +
|
||||||
|
YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
|
||||||
|
}
|
||||||
|
if (bytesInResultsHistogram.count() > 0) {
|
||||||
|
status.setStatus("bytesInResults (size): " +
|
||||||
|
YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
|
||||||
|
}
|
||||||
|
if (bytesInRemoteResultsHistogram.count() > 0) {
|
||||||
|
status.setStatus("bytesInRemoteResults (size): " +
|
||||||
|
YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (!opts.oneCon) {
|
if (!opts.oneCon) {
|
||||||
connection.close();
|
connection.close();
|
||||||
@ -1203,7 +1272,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
@Override
|
@Override
|
||||||
void testRow(final int i) throws IOException {
|
void testRow(final int i) throws IOException {
|
||||||
Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
|
Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
|
||||||
scan.setCaching(opts.caching);
|
|
||||||
FilterList list = new FilterList();
|
FilterList list = new FilterList();
|
||||||
if (opts.addColumns) {
|
if (opts.addColumns) {
|
||||||
for (int column = 0; column < opts.columns; column++) {
|
for (int column = 0; column < opts.columns; column++) {
|
||||||
@ -1218,11 +1286,19 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
}
|
}
|
||||||
list.addFilter(new WhileMatchFilter(new PageFilter(120)));
|
list.addFilter(new WhileMatchFilter(new PageFilter(120)));
|
||||||
scan.setFilter(list);
|
scan.setFilter(list);
|
||||||
|
scan.setCaching(opts.caching);
|
||||||
|
scan.setScanMetricsEnabled(true);
|
||||||
ResultScanner s = this.table.getScanner(scan);
|
ResultScanner s = this.table.getScanner(scan);
|
||||||
for (Result rr; (rr = s.next()) != null;) {
|
try {
|
||||||
updateValueSize(rr);
|
for (Result rr; (rr = s.next()) != null;) {
|
||||||
|
updateValueSize(rr);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (s != null) {
|
||||||
|
updateScanMetrics(scan.getScanMetrics());
|
||||||
|
s.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
s.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1242,7 +1318,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
void testRow(final int i) throws IOException {
|
void testRow(final int i) throws IOException {
|
||||||
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
|
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
|
||||||
Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
|
Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
|
||||||
scan.setCaching(opts.caching);
|
|
||||||
if (opts.filterAll) {
|
if (opts.filterAll) {
|
||||||
scan.setFilter(new FilterAllFilter());
|
scan.setFilter(new FilterAllFilter());
|
||||||
}
|
}
|
||||||
@ -1254,20 +1329,27 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
} else {
|
} else {
|
||||||
scan.addFamily(FAMILY_NAME);
|
scan.addFamily(FAMILY_NAME);
|
||||||
}
|
}
|
||||||
|
scan.setCaching(opts.caching);
|
||||||
|
scan.setScanMetricsEnabled(true);
|
||||||
Result r = null;
|
Result r = null;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
ResultScanner s = this.table.getScanner(scan);
|
ResultScanner s = this.table.getScanner(scan);
|
||||||
for (; (r = s.next()) != null;) {
|
try {
|
||||||
updateValueSize(r);
|
for (; (r = s.next()) != null;) {
|
||||||
count++;
|
updateValueSize(r);
|
||||||
}
|
count++;
|
||||||
if (i % 100 == 0) {
|
}
|
||||||
LOG.info(String.format("Scan for key range %s - %s returned %s rows",
|
if (i % 100 == 0) {
|
||||||
|
LOG.info(String.format("Scan for key range %s - %s returned %s rows",
|
||||||
Bytes.toString(startAndStopRow.getFirst()),
|
Bytes.toString(startAndStopRow.getFirst()),
|
||||||
Bytes.toString(startAndStopRow.getSecond()), count));
|
Bytes.toString(startAndStopRow.getSecond()), count));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (s != null) {
|
||||||
|
updateScanMetrics(scan.getScanMetrics());
|
||||||
|
s.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Pair<byte[],byte[]> getStartAndStopRow();
|
protected abstract Pair<byte[],byte[]> getStartAndStopRow();
|
||||||
@ -1427,6 +1509,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static class ScanTest extends TableTest {
|
static class ScanTest extends TableTest {
|
||||||
|
private Scan scan;
|
||||||
private ResultScanner testScanner;
|
private ResultScanner testScanner;
|
||||||
|
|
||||||
ScanTest(Connection con, TestOptions options, Status status) {
|
ScanTest(Connection con, TestOptions options, Status status) {
|
||||||
@ -1436,6 +1519,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
@Override
|
@Override
|
||||||
void testTakedown() throws IOException {
|
void testTakedown() throws IOException {
|
||||||
if (this.testScanner != null) {
|
if (this.testScanner != null) {
|
||||||
|
updateScanMetrics(scan.getScanMetrics());
|
||||||
this.testScanner.close();
|
this.testScanner.close();
|
||||||
}
|
}
|
||||||
super.testTakedown();
|
super.testTakedown();
|
||||||
@ -1446,7 +1530,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
void testRow(final int i) throws IOException {
|
void testRow(final int i) throws IOException {
|
||||||
if (this.testScanner == null) {
|
if (this.testScanner == null) {
|
||||||
Scan scan = new Scan(format(opts.startRow));
|
Scan scan = new Scan(format(opts.startRow));
|
||||||
scan.setCaching(opts.caching);
|
|
||||||
if (opts.addColumns) {
|
if (opts.addColumns) {
|
||||||
for (int column = 0; column < opts.columns; column++) {
|
for (int column = 0; column < opts.columns; column++) {
|
||||||
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
|
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
|
||||||
@ -1458,7 +1541,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
if (opts.filterAll) {
|
if (opts.filterAll) {
|
||||||
scan.setFilter(new FilterAllFilter());
|
scan.setFilter(new FilterAllFilter());
|
||||||
}
|
}
|
||||||
this.testScanner = table.getScanner(scan);
|
scan.setCaching(opts.caching);
|
||||||
|
scan.setScanMetricsEnabled(true);
|
||||||
|
this.testScanner = table.getScanner(scan);
|
||||||
}
|
}
|
||||||
Result r = testScanner.next();
|
Result r = testScanner.next();
|
||||||
updateValueSize(r);
|
updateValueSize(r);
|
||||||
@ -1650,7 +1735,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
updateValueSize(r);
|
updateValueSize(r);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (scanner != null) scanner.close();
|
if (scanner != null) {
|
||||||
|
updateScanMetrics(scan.getScanMetrics());
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1665,7 +1753,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
list.addFilter(new FilterAllFilter());
|
list.addFilter(new FilterAllFilter());
|
||||||
}
|
}
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setCaching(opts.caching);
|
|
||||||
if (opts.addColumns) {
|
if (opts.addColumns) {
|
||||||
for (int column = 0; column < opts.columns; column++) {
|
for (int column = 0; column < opts.columns; column++) {
|
||||||
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
|
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
|
||||||
@ -1675,6 +1762,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
scan.addFamily(FAMILY_NAME);
|
scan.addFamily(FAMILY_NAME);
|
||||||
}
|
}
|
||||||
scan.setFilter(list);
|
scan.setFilter(list);
|
||||||
|
scan.setCaching(opts.caching);
|
||||||
|
scan.setScanMetricsEnabled(true);
|
||||||
return scan;
|
return scan;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user