HBASE-24824 Add more stats in PE for read replica (#2205) (#2212)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
huaxiangsun 2020-08-10 09:58:50 -07:00 committed by GitHub
parent 22bf9a38c9
commit bd30555ac8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 100 additions and 26 deletions

View File

@ -216,10 +216,22 @@ public class PerformanceEvaluation extends Configured implements Tool {
public RunResult(long duration, Histogram hist) {
this.duration = duration;
this.hist = hist;
numbOfReplyOverThreshold = 0;
numOfReplyFromReplica = 0;
}
public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
Histogram hist) {
this.duration = duration;
this.hist = hist;
this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
this.numOfReplyFromReplica = numOfReplyFromReplica;
}
public final long duration;
public final Histogram hist;
public final long numbOfReplyOverThreshold;
public final long numOfReplyFromReplica;
@Override
public String toString() {
@ -482,6 +494,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
});
LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
"ms over " + threadOpts.perClientRunRows + " rows");
if (opts.latencyThreshold > 0) {
LOG.info("Number of replies over latency threshold " + opts.latencyThreshold +
"(ms) is " + run.numbOfReplyOverThreshold);
}
return run;
}
});
@ -502,10 +518,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
long total = 0;
float avgLatency = 0 ;
float avgTPS = 0;
long replicaWins = 0;
for (RunResult result : results) {
total += result.duration;
avgLatency += result.hist.getSnapshot().getMean();
avgTPS += opts.perClientRunRows * 1.0f / result.duration;
replicaWins += result.numOfReplyFromReplica;
}
avgTPS *= 1000; // ms to second
avgLatency = avgLatency / results.length;
@ -515,12 +533,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
+ "\tAvg: " + (total / results.length) + "ms");
LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
if (opts.replicas > 1) {
LOG.info("[results from replica regions] " + replicaWins);
}
for (int i = 0; i < opts.connCount; i++) {
cons[i].close();
asyncCons[i].close();
}
return results;
}
@ -696,6 +717,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
int columns = 1;
int families = 1;
int caching = 30;
int latencyThreshold = 0; // in millsecond
boolean addColumns = true;
MemoryCompactionPolicy inMemoryCompaction =
MemoryCompactionPolicy.valueOf(
@ -731,6 +753,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.useTags = that.useTags;
this.noOfTags = that.noOfTags;
this.reportLatency = that.reportLatency;
this.latencyThreshold = that.latencyThreshold;
this.multiGet = that.multiGet;
this.multiPut = that.multiPut;
this.inMemoryCF = that.inMemoryCF;
@ -1120,6 +1143,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private String testName;
private Histogram latencyHistogram;
private Histogram replicaLatencyHistogram;
private Histogram valueSizeHistogram;
private Histogram rpcCallsHistogram;
private Histogram remoteRpcCallsHistogram;
@ -1128,6 +1152,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
private Histogram bytesInResultsHistogram;
private Histogram bytesInRemoteResultsHistogram;
private RandomDistribution.Zipf zipf;
private long numOfReplyOverLatencyThreshold = 0;
private long numOfReplyFromReplica = 0;
/**
* Note that all subclasses of this class must provide a public constructor
@ -1165,13 +1191,28 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
void updateValueSize(final Result [] rs) throws IOException {
if (rs == null || !isRandomValueSize()) return;
for (Result r: rs) updateValueSize(r);
updateValueSize(rs, 0);
}
void updateValueSize(final Result [] rs, final long latency) throws IOException {
if (rs == null || (latency == 0)) return;
for (Result r: rs) updateValueSize(r, latency);
}
void updateValueSize(final Result r) throws IOException {
if (r == null || !isRandomValueSize()) return;
updateValueSize(r, 0);
}
void updateValueSize(final Result r, final long latency) throws IOException {
if (r == null || (latency == 0)) return;
int size = 0;
// update replicaHistogram
if (r.isStale()) {
replicaLatencyHistogram.update(latency / 1000);
numOfReplyFromReplica ++;
}
if (!isRandomValueSize()) return;
for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
size += scanner.current().getValueLength();
}
@ -1235,6 +1276,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testSetup() throws IOException {
// test metrics
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
// If it is a replica test, set up histogram for replica.
if (opts.replicas > 1) {
replicaLatencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
}
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
// scan metrics
rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
@ -1258,6 +1303,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
latencyHistogram));
if (opts.replicas > 1) {
status.setStatus("Latency (us) from Replica Regions: " +
YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
}
status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
if (valueSizeHistogram.getCount() > 0) {
@ -1339,7 +1388,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
long startTime = System.nanoTime();
boolean requestSent = false;
try (TraceScope scope = TraceUtil.createTrace("test row");){
requestSent = testRow(i);
requestSent = testRow(i, startTime);
}
if ( (i - startRow) > opts.measureAfter) {
// If multiget or multiput is enabled, say set to 10, testRow() returns immediately
@ -1347,7 +1396,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
// We should only set latency when actual request is sent because otherwise
// it turns out to be 0.
if (requestSent) {
latencyHistogram.update((System.nanoTime() - startTime) / 1000);
long latency = (System.nanoTime() - startTime) / 1000;
latencyHistogram.update(latency);
if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
numOfReplyOverLatencyThreshold ++;
}
}
if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
status.setStatus(generateStatus(startRow, i, lastRow));
@ -1379,7 +1432,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* False if not, multiGet and multiPut e.g., the rows are sent
* to server only if enough gets/puts are gathered.
*/
abstract boolean testRow(final int i) throws IOException, InterruptedException;
abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException;
}
static abstract class Test extends TestBase {
@ -1450,7 +1503,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
@ -1559,7 +1612,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
if (this.testScanner == null) {
Scan scan =
new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
@ -1593,7 +1646,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@ -1635,7 +1688,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override
@SuppressWarnings("ReturnValueIgnored")
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
byte[] row = generateRow(i);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
@ -1710,7 +1763,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
.setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
@ -1758,7 +1811,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
.withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
@ -1861,6 +1914,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private final Consistency consistency;
private ArrayList<Get> gets;
private Random rd = new Random();
private long numOfReplyFromReplica = 0;
RandomReadTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
@ -1872,7 +1926,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
@ -1897,13 +1951,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.gets.add(get);
if (this.gets.size() == opts.multiGet) {
Result [] rs = this.table.get(this.gets);
updateValueSize(rs);
if (opts.replicas > 1) {
long latency = System.nanoTime() - startTime;
updateValueSize(rs, latency);
} else {
updateValueSize(rs);
}
this.gets.clear();
} else {
return false;
}
} else {
updateValueSize(this.table.get(get));
if (opts.replicas > 1) {
Result r = this.table.get(get);
long latency = System.nanoTime() - startTime;
updateValueSize(r, latency);
} else {
updateValueSize(this.table.get(get));
}
}
return true;
}
@ -1954,7 +2019,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
if (this.testScanner == null) {
Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
@ -2017,7 +2082,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Increment increment = new Increment(format(i));
// unlike checkAndXXX tests, which make most sense to do on a single value,
// if multiple families are specified for an increment test we assume it is
@ -2037,7 +2102,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
byte [] bytes = format(i);
Append append = new Append(bytes);
// unlike checkAndXXX tests, which make most sense to do on a single value,
@ -2058,7 +2123,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
@ -2079,7 +2144,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
@ -2098,7 +2163,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
@ -2119,7 +2184,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@ -2157,7 +2222,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
byte[] row = generateRow(i);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
@ -2214,7 +2279,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
boolean testRow(int i) throws IOException {
boolean testRow(int i, final long startTime) throws IOException {
byte[] value = generateData(this.rand, getValueLength(this.rand));
Scan scan = constructScan(value);
ResultScanner scanner = null;
@ -2358,7 +2423,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
getAverageValueLength(opts), opts.families, opts.columns) + ")");
return new RunResult(totalElapsedTime, t.getLatencyHistogram());
return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
t.numOfReplyFromReplica, t.getLatencyHistogram());
}
private static int getAverageValueLength(final TestOptions opts) {
@ -2424,6 +2490,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
"Default: 0");
System.err.println(" latency Set to report operation latencies. Default: False");
System.err.println(" latencyThreshold Set to report number of operations with latency " +
"over lantencyThreshold, unit in millisecond, default 0");
System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" +
" rows have been treated. Default: 0");
System.err.println(" valueSize Pass value size to use: Default: "
@ -2621,6 +2689,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
final String latencyThreshold = "--latencyThreshold=";
if (cmd.startsWith(latencyThreshold)) {
opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
continue;
}
final String latency = "--latency";
if (cmd.startsWith(latency)) {
opts.reportLatency = true;