HBASE-24824 Add more stats in PE for read replica (#2205)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
huaxiangsun 2020-08-07 09:55:32 -07:00 committed by GitHub
parent 0b604d921a
commit 6ef90aad7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 100 additions and 26 deletions

View File

@ -219,10 +219,22 @@ public class PerformanceEvaluation extends Configured implements Tool {
public RunResult(long duration, Histogram hist) { public RunResult(long duration, Histogram hist) {
this.duration = duration; this.duration = duration;
this.hist = hist; this.hist = hist;
numbOfReplyOverThreshold = 0;
numOfReplyFromReplica = 0;
}
public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
Histogram hist) {
this.duration = duration;
this.hist = hist;
this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
this.numOfReplyFromReplica = numOfReplyFromReplica;
} }
public final long duration; public final long duration;
public final Histogram hist; public final Histogram hist;
public final long numbOfReplyOverThreshold;
public final long numOfReplyFromReplica;
@Override @Override
public String toString() { public String toString() {
@ -492,6 +504,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
}); });
LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration + LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
"ms over " + threadOpts.perClientRunRows + " rows"); "ms over " + threadOpts.perClientRunRows + " rows");
if (opts.latencyThreshold > 0) {
LOG.info("Number of replies over latency threshold " + opts.latencyThreshold +
"(ms) is " + run.numbOfReplyOverThreshold);
}
return run; return run;
} }
}); });
@ -512,10 +528,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
long total = 0; long total = 0;
float avgLatency = 0 ; float avgLatency = 0 ;
float avgTPS = 0; float avgTPS = 0;
long replicaWins = 0;
for (RunResult result : results) { for (RunResult result : results) {
total += result.duration; total += result.duration;
avgLatency += result.hist.getSnapshot().getMean(); avgLatency += result.hist.getSnapshot().getMean();
avgTPS += opts.perClientRunRows * 1.0f / result.duration; avgTPS += opts.perClientRunRows * 1.0f / result.duration;
replicaWins += result.numOfReplyFromReplica;
} }
avgTPS *= 1000; // ms to second avgTPS *= 1000; // ms to second
avgLatency = avgLatency / results.length; avgLatency = avgLatency / results.length;
@ -525,12 +543,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
+ "\tAvg: " + (total / results.length) + "ms"); + "\tAvg: " + (total / results.length) + "ms");
LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
if (opts.replicas > 1) {
LOG.info("[results from replica regions] " + replicaWins);
}
for (int i = 0; i < opts.connCount; i++) { for (int i = 0; i < opts.connCount; i++) {
cons[i].close(); cons[i].close();
asyncCons[i].close(); asyncCons[i].close();
} }
return results; return results;
} }
@ -706,6 +727,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
int columns = 1; int columns = 1;
int families = 1; int families = 1;
int caching = 30; int caching = 30;
int latencyThreshold = 0; // in millsecond
boolean addColumns = true; boolean addColumns = true;
MemoryCompactionPolicy inMemoryCompaction = MemoryCompactionPolicy inMemoryCompaction =
MemoryCompactionPolicy.valueOf( MemoryCompactionPolicy.valueOf(
@ -741,6 +763,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.useTags = that.useTags; this.useTags = that.useTags;
this.noOfTags = that.noOfTags; this.noOfTags = that.noOfTags;
this.reportLatency = that.reportLatency; this.reportLatency = that.reportLatency;
this.latencyThreshold = that.latencyThreshold;
this.multiGet = that.multiGet; this.multiGet = that.multiGet;
this.multiPut = that.multiPut; this.multiPut = that.multiPut;
this.inMemoryCF = that.inMemoryCF; this.inMemoryCF = that.inMemoryCF;
@ -1130,6 +1153,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private String testName; private String testName;
private Histogram latencyHistogram; private Histogram latencyHistogram;
private Histogram replicaLatencyHistogram;
private Histogram valueSizeHistogram; private Histogram valueSizeHistogram;
private Histogram rpcCallsHistogram; private Histogram rpcCallsHistogram;
private Histogram remoteRpcCallsHistogram; private Histogram remoteRpcCallsHistogram;
@ -1138,6 +1162,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
private Histogram bytesInResultsHistogram; private Histogram bytesInResultsHistogram;
private Histogram bytesInRemoteResultsHistogram; private Histogram bytesInRemoteResultsHistogram;
private RandomDistribution.Zipf zipf; private RandomDistribution.Zipf zipf;
private long numOfReplyOverLatencyThreshold = 0;
private long numOfReplyFromReplica = 0;
/** /**
* Note that all subclasses of this class must provide a public constructor * Note that all subclasses of this class must provide a public constructor
@ -1175,13 +1201,28 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
void updateValueSize(final Result [] rs) throws IOException { void updateValueSize(final Result [] rs) throws IOException {
if (rs == null || !isRandomValueSize()) return; updateValueSize(rs, 0);
for (Result r: rs) updateValueSize(r); }
void updateValueSize(final Result [] rs, final long latency) throws IOException {
if (rs == null || (latency == 0)) return;
for (Result r: rs) updateValueSize(r, latency);
} }
void updateValueSize(final Result r) throws IOException { void updateValueSize(final Result r) throws IOException {
if (r == null || !isRandomValueSize()) return; updateValueSize(r, 0);
}
void updateValueSize(final Result r, final long latency) throws IOException {
if (r == null || (latency == 0)) return;
int size = 0; int size = 0;
// update replicaHistogram
if (r.isStale()) {
replicaLatencyHistogram.update(latency / 1000);
numOfReplyFromReplica ++;
}
if (!isRandomValueSize()) return;
for (CellScanner scanner = r.cellScanner(); scanner.advance();) { for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
size += scanner.current().getValueLength(); size += scanner.current().getValueLength();
} }
@ -1245,6 +1286,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testSetup() throws IOException { void testSetup() throws IOException {
// test metrics // test metrics
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
// If it is a replica test, set up histogram for replica.
if (opts.replicas > 1) {
replicaLatencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
}
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
// scan metrics // scan metrics
rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
@ -1268,6 +1313,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport( status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
latencyHistogram)); latencyHistogram));
if (opts.replicas > 1) {
status.setStatus("Latency (us) from Replica Regions: " +
YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
}
status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
if (valueSizeHistogram.getCount() > 0) { if (valueSizeHistogram.getCount() > 0) {
@ -1349,7 +1398,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
boolean requestSent = false; boolean requestSent = false;
try (TraceScope scope = TraceUtil.createTrace("test row");){ try (TraceScope scope = TraceUtil.createTrace("test row");){
requestSent = testRow(i); requestSent = testRow(i, startTime);
} }
if ( (i - startRow) > opts.measureAfter) { if ( (i - startRow) > opts.measureAfter) {
// If multiget or multiput is enabled, say set to 10, testRow() returns immediately // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
@ -1357,7 +1406,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
// We should only set latency when actual request is sent because otherwise // We should only set latency when actual request is sent because otherwise
// it turns out to be 0. // it turns out to be 0.
if (requestSent) { if (requestSent) {
latencyHistogram.update((System.nanoTime() - startTime) / 1000); long latency = (System.nanoTime() - startTime) / 1000;
latencyHistogram.update(latency);
if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
numOfReplyOverLatencyThreshold ++;
}
} }
if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
status.setStatus(generateStatus(startRow, i, lastRow)); status.setStatus(generateStatus(startRow, i, lastRow));
@ -1389,7 +1442,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* False if not, multiGet and multiPut e.g., the rows are sent * False if not, multiGet and multiPut e.g., the rows are sent
* to server only if enough gets/puts are gathered. * to server only if enough gets/puts are gathered.
*/ */
abstract boolean testRow(final int i) throws IOException, InterruptedException; abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException;
} }
static abstract class Test extends TestBase { static abstract class Test extends TestBase {
@ -1460,7 +1513,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException, InterruptedException { boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
if (opts.randomSleep > 0) { if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep)); Thread.sleep(rd.nextInt(opts.randomSleep));
} }
@ -1569,7 +1622,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
if (this.testScanner == null) { if (this.testScanner == null) {
Scan scan = Scan scan =
new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
@ -1603,7 +1656,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException, InterruptedException { boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
Get get = new Get(format(i)); Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) { for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@ -1645,7 +1698,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override @Override
@SuppressWarnings("ReturnValueIgnored") @SuppressWarnings("ReturnValueIgnored")
boolean testRow(final int i) throws IOException, InterruptedException { boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
byte[] row = generateRow(i); byte[] row = generateRow(i);
Put put = new Put(row); Put put = new Put(row);
for (int family = 0; family < opts.families; family++) { for (int family = 0; family < opts.families; family++) {
@ -1720,7 +1773,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)) Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
.setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
@ -1768,7 +1821,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
.withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
@ -1871,6 +1924,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private final Consistency consistency; private final Consistency consistency;
private ArrayList<Get> gets; private ArrayList<Get> gets;
private Random rd = new Random(); private Random rd = new Random();
private long numOfReplyFromReplica = 0;
RandomReadTest(Connection con, TestOptions options, Status status) { RandomReadTest(Connection con, TestOptions options, Status status) {
super(con, options, status); super(con, options, status);
@ -1882,7 +1936,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException, InterruptedException { boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
if (opts.randomSleep > 0) { if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep)); Thread.sleep(rd.nextInt(opts.randomSleep));
} }
@ -1907,13 +1961,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.gets.add(get); this.gets.add(get);
if (this.gets.size() == opts.multiGet) { if (this.gets.size() == opts.multiGet) {
Result [] rs = this.table.get(this.gets); Result [] rs = this.table.get(this.gets);
updateValueSize(rs); if (opts.replicas > 1) {
long latency = System.nanoTime() - startTime;
updateValueSize(rs, latency);
} else {
updateValueSize(rs);
}
this.gets.clear(); this.gets.clear();
} else { } else {
return false; return false;
} }
} else { } else {
updateValueSize(this.table.get(get)); if (opts.replicas > 1) {
Result r = this.table.get(get);
long latency = System.nanoTime() - startTime;
updateValueSize(r, latency);
} else {
updateValueSize(this.table.get(get));
}
} }
return true; return true;
} }
@ -1964,7 +2029,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
if (this.testScanner == null) { if (this.testScanner == null) {
Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
@ -2027,7 +2092,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
Increment increment = new Increment(format(i)); Increment increment = new Increment(format(i));
// unlike checkAndXXX tests, which make most sense to do on a single value, // unlike checkAndXXX tests, which make most sense to do on a single value,
// if multiple families are specified for an increment test we assume it is // if multiple families are specified for an increment test we assume it is
@ -2047,7 +2112,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
byte [] bytes = format(i); byte [] bytes = format(i);
Append append = new Append(bytes); Append append = new Append(bytes);
// unlike checkAndXXX tests, which make most sense to do on a single value, // unlike checkAndXXX tests, which make most sense to do on a single value,
@ -2068,7 +2133,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i); final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value // checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there. // Put a known value so when we go to check it, it is there.
@ -2089,7 +2154,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i); final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value // checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there. // Put a known value so when we go to check it, it is there.
@ -2108,7 +2173,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i); final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value // checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there. // Put a known value so when we go to check it, it is there.
@ -2129,7 +2194,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
Get get = new Get(format(i)); Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) { for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@ -2167,7 +2232,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(final int i) throws IOException { boolean testRow(final int i, final long startTime) throws IOException {
byte[] row = generateRow(i); byte[] row = generateRow(i);
Put put = new Put(row); Put put = new Put(row);
for (int family = 0; family < opts.families; family++) { for (int family = 0; family < opts.families; family++) {
@ -2224,7 +2289,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
@Override @Override
boolean testRow(int i) throws IOException { boolean testRow(int i, final long startTime) throws IOException {
byte[] value = generateData(this.rand, getValueLength(this.rand)); byte[] value = generateData(this.rand, getValueLength(this.rand));
Scan scan = constructScan(value); Scan scan = constructScan(value);
ResultScanner scanner = null; ResultScanner scanner = null;
@ -2368,7 +2433,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
getAverageValueLength(opts), opts.families, opts.columns) + ")"); getAverageValueLength(opts), opts.families, opts.columns) + ")");
return new RunResult(totalElapsedTime, t.getLatencyHistogram()); return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
t.numOfReplyFromReplica, t.getLatencyHistogram());
} }
private static int getAverageValueLength(final TestOptions opts) { private static int getAverageValueLength(final TestOptions opts) {
@ -2434,6 +2500,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " + System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
"Default: 0"); "Default: 0");
System.err.println(" latency Set to report operation latencies. Default: False"); System.err.println(" latency Set to report operation latencies. Default: False");
System.err.println(" latencyThreshold Set to report number of operations with latency " +
"over lantencyThreshold, unit in millisecond, default 0");
System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" + System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" +
" rows have been treated. Default: 0"); " rows have been treated. Default: 0");
System.err.println(" valueSize Pass value size to use: Default: " System.err.println(" valueSize Pass value size to use: Default: "
@ -2631,6 +2699,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue; continue;
} }
final String latencyThreshold = "--latencyThreshold=";
if (cmd.startsWith(latencyThreshold)) {
opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
continue;
}
final String latency = "--latency"; final String latency = "--latency";
if (cmd.startsWith(latency)) { if (cmd.startsWith(latency)) {
opts.reportLatency = true; opts.reportLatency = true;