diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 134ffa679de..7ea858976ae 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -219,10 +219,22 @@ public class PerformanceEvaluation extends Configured implements Tool { public RunResult(long duration, Histogram hist) { this.duration = duration; this.hist = hist; + numbOfReplyOverThreshold = 0; + numOfReplyFromReplica = 0; + } + + public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, + Histogram hist) { + this.duration = duration; + this.hist = hist; + this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; + this.numOfReplyFromReplica = numOfReplyFromReplica; } public final long duration; public final Histogram hist; + public final long numbOfReplyOverThreshold; + public final long numOfReplyFromReplica; @Override public String toString() { @@ -492,6 +504,10 @@ public class PerformanceEvaluation extends Configured implements Tool { }); LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration + "ms over " + threadOpts.perClientRunRows + " rows"); + if (opts.latencyThreshold > 0) { + LOG.info("Number of replies over latency threshold " + opts.latencyThreshold + + "(ms) is " + run.numbOfReplyOverThreshold); + } return run; } }); @@ -512,10 +528,12 @@ public class PerformanceEvaluation extends Configured implements Tool { long total = 0; float avgLatency = 0 ; float avgTPS = 0; + long replicaWins = 0; for (RunResult result : results) { total += result.duration; avgLatency += result.hist.getSnapshot().getMean(); avgTPS += opts.perClientRunRows * 1.0f / result.duration; + replicaWins += result.numOfReplyFromReplica; } avgTPS *= 1000; // ms to second avgLatency = avgLatency / results.length; @@ -525,12 +543,15 @@ public class PerformanceEvaluation extends Configured implements Tool { + "\tAvg: " + (total / results.length) + "ms"); LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); + if (opts.replicas > 1) { + LOG.info("[results from replica regions] " + replicaWins); + } + for (int i = 0; i < opts.connCount; i++) { cons[i].close(); asyncCons[i].close(); } - return results; } @@ -706,6 +727,7 @@ public class PerformanceEvaluation extends Configured implements Tool { int columns = 1; int families = 1; int caching = 30; + int latencyThreshold = 0; // in millsecond boolean addColumns = true; MemoryCompactionPolicy inMemoryCompaction = MemoryCompactionPolicy.valueOf( @@ -741,6 +763,7 @@ public class PerformanceEvaluation extends Configured implements Tool { this.useTags = that.useTags; this.noOfTags = that.noOfTags; this.reportLatency = that.reportLatency; + this.latencyThreshold = that.latencyThreshold; this.multiGet = that.multiGet; this.multiPut = that.multiPut; this.inMemoryCF = that.inMemoryCF; @@ -1130,6 +1153,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private String testName; private Histogram latencyHistogram; + private Histogram replicaLatencyHistogram; private Histogram valueSizeHistogram; private Histogram rpcCallsHistogram; private Histogram remoteRpcCallsHistogram; @@ -1138,6 +1162,8 @@ public class PerformanceEvaluation extends Configured implements Tool { private Histogram bytesInResultsHistogram; private Histogram bytesInRemoteResultsHistogram; private RandomDistribution.Zipf zipf; + private long numOfReplyOverLatencyThreshold = 0; + private long numOfReplyFromReplica = 0; /** * Note that all subclasses of this class must provide a public constructor @@ -1175,13 +1201,28 @@ public class PerformanceEvaluation extends Configured implements Tool { } void updateValueSize(final Result [] rs) throws IOException { - if (rs == null || !isRandomValueSize()) return; - for (Result r: rs) updateValueSize(r); + updateValueSize(rs, 0); + } + + void updateValueSize(final Result [] rs, final long latency) throws IOException { + if (rs == null || (latency == 0)) return; + for (Result r: rs) updateValueSize(r, latency); } void updateValueSize(final Result r) throws IOException { - if (r == null || !isRandomValueSize()) return; + updateValueSize(r, 0); + } + + void updateValueSize(final Result r, final long latency) throws IOException { + if (r == null || (latency == 0)) return; int size = 0; + // update replicaHistogram + if (r.isStale()) { + replicaLatencyHistogram.update(latency / 1000); + numOfReplyFromReplica ++; + } + if (!isRandomValueSize()) return; + for (CellScanner scanner = r.cellScanner(); scanner.advance();) { size += scanner.current().getValueLength(); } @@ -1245,6 +1286,10 @@ public class PerformanceEvaluation extends Configured implements Tool { void testSetup() throws IOException { // test metrics latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); + // If it is a replica test, set up histogram for replica. + if (opts.replicas > 1) { + replicaLatencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); + } valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); // scan metrics rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); @@ -1268,6 +1313,10 @@ public class PerformanceEvaluation extends Configured implements Tool { status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport( latencyHistogram)); + if (opts.replicas > 1) { + status.setStatus("Latency (us) from Replica Regions: " + + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); + } status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); if (valueSizeHistogram.getCount() > 0) { @@ -1349,7 +1398,7 @@ public class PerformanceEvaluation extends Configured implements Tool { long startTime = System.nanoTime(); boolean requestSent = false; try (TraceScope scope = TraceUtil.createTrace("test row");){ - requestSent = testRow(i); + requestSent = testRow(i, startTime); } if ( (i - startRow) > opts.measureAfter) { // If multiget or multiput is enabled, say set to 10, testRow() returns immediately @@ -1357,7 +1406,11 @@ public class PerformanceEvaluation extends Configured implements Tool { // We should only set latency when actual request is sent because otherwise // it turns out to be 0. if (requestSent) { - latencyHistogram.update((System.nanoTime() - startTime) / 1000); + long latency = (System.nanoTime() - startTime) / 1000; + latencyHistogram.update(latency); + if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { + numOfReplyOverLatencyThreshold ++; + } } if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { status.setStatus(generateStatus(startRow, i, lastRow)); @@ -1389,7 +1442,7 @@ public class PerformanceEvaluation extends Configured implements Tool { * False if not, multiGet and multiPut e.g., the rows are sent * to server only if enough gets/puts are gathered. */ - abstract boolean testRow(final int i) throws IOException, InterruptedException; + abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException; } static abstract class Test extends TestBase { @@ -1460,7 +1513,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException, InterruptedException { + boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { if (opts.randomSleep > 0) { Thread.sleep(rd.nextInt(opts.randomSleep)); } @@ -1569,7 +1622,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { if (this.testScanner == null) { Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) @@ -1603,7 +1656,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException, InterruptedException { + boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { Get get = new Get(format(i)); for (int family = 0; family < opts.families; family++) { byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); @@ -1645,7 +1698,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override @SuppressWarnings("ReturnValueIgnored") - boolean testRow(final int i) throws IOException, InterruptedException { + boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { byte[] row = generateRow(i); Put put = new Put(row); for (int family = 0; family < opts.families; family++) { @@ -1720,7 +1773,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)) .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) @@ -1768,7 +1821,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { Pair startAndStopRow = getStartAndStopRow(); Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) @@ -1871,6 +1924,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private final Consistency consistency; private ArrayList gets; private Random rd = new Random(); + private long numOfReplyFromReplica = 0; RandomReadTest(Connection con, TestOptions options, Status status) { super(con, options, status); @@ -1882,7 +1936,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException, InterruptedException { + boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { if (opts.randomSleep > 0) { Thread.sleep(rd.nextInt(opts.randomSleep)); } @@ -1907,13 +1961,24 @@ public class PerformanceEvaluation extends Configured implements Tool { this.gets.add(get); if (this.gets.size() == opts.multiGet) { Result [] rs = this.table.get(this.gets); - updateValueSize(rs); + if (opts.replicas > 1) { + long latency = System.nanoTime() - startTime; + updateValueSize(rs, latency); + } else { + updateValueSize(rs); + } this.gets.clear(); } else { return false; } } else { - updateValueSize(this.table.get(get)); + if (opts.replicas > 1) { + Result r = this.table.get(get); + long latency = System.nanoTime() - startTime; + updateValueSize(r, latency); + } else { + updateValueSize(this.table.get(get)); + } } return true; } @@ -1964,7 +2029,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { if (this.testScanner == null) { Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) @@ -2027,7 +2092,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { Increment increment = new Increment(format(i)); // unlike checkAndXXX tests, which make most sense to do on a single value, // if multiple families are specified for an increment test we assume it is @@ -2047,7 +2112,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { byte [] bytes = format(i); Append append = new Append(bytes); // unlike checkAndXXX tests, which make most sense to do on a single value, @@ -2068,7 +2133,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { final byte [] bytes = format(i); // checkAndXXX tests operate on only a single value // Put a known value so when we go to check it, it is there. @@ -2089,7 +2154,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { final byte [] bytes = format(i); // checkAndXXX tests operate on only a single value // Put a known value so when we go to check it, it is there. @@ -2108,7 +2173,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { final byte [] bytes = format(i); // checkAndXXX tests operate on only a single value // Put a known value so when we go to check it, it is there. @@ -2129,7 +2194,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { Get get = new Get(format(i)); for (int family = 0; family < opts.families; family++) { byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); @@ -2167,7 +2232,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(final int i) throws IOException { + boolean testRow(final int i, final long startTime) throws IOException { byte[] row = generateRow(i); Put put = new Put(row); for (int family = 0; family < opts.families; family++) { @@ -2224,7 +2289,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - boolean testRow(int i) throws IOException { + boolean testRow(int i, final long startTime) throws IOException { byte[] value = generateData(this.rand, getValueLength(this.rand)); Scan scan = constructScan(value); ResultScanner scanner = null; @@ -2368,7 +2433,8 @@ public class PerformanceEvaluation extends Configured implements Tool { " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, getAverageValueLength(opts), opts.families, opts.columns) + ")"); - return new RunResult(totalElapsedTime, t.getLatencyHistogram()); + return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, + t.numOfReplyFromReplica, t.getLatencyHistogram()); } private static int getAverageValueLength(final TestOptions opts) { @@ -2434,6 +2500,8 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); System.err.println(" latency Set to report operation latencies. Default: False"); + System.err.println(" latencyThreshold Set to report number of operations with latency " + + "over lantencyThreshold, unit in millisecond, default 0"); System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" + " rows have been treated. Default: 0"); System.err.println(" valueSize Pass value size to use: Default: " @@ -2631,6 +2699,12 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } + final String latencyThreshold = "--latencyThreshold="; + if (cmd.startsWith(latencyThreshold)) { + opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); + continue; + } + final String latency = "--latency"; if (cmd.startsWith(latency)) { opts.reportLatency = true;