HBASE-12335 IntegrationTestRegionReplicaPerf is flaky
This commit is contained in:
parent
a62f543c65
commit
f5d6314c87
@ -19,13 +19,13 @@
|
|||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import com.google.common.base.Objects;
|
import com.google.common.base.Objects;
|
||||||
|
import com.yammer.metrics.core.Histogram;
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
|
import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
|
||||||
import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction;
|
import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
|
||||||
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
|
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
|
||||||
import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
|
import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
|
||||||
import org.apache.hadoop.hbase.chaos.policies.Policy;
|
import org.apache.hadoop.hbase.chaos.policies.Policy;
|
||||||
@ -33,20 +33,18 @@ import org.apache.hadoop.hbase.client.Admin;
|
|||||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||||
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||||
|
import org.apache.hadoop.hbase.util.YammerHistogramUtils;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import static java.lang.String.format;
|
import static java.lang.String.format;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -73,6 +71,24 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||||||
private static final String NUM_RS_KEY = "numRs";
|
private static final String NUM_RS_KEY = "numRs";
|
||||||
private static final String NUM_RS_DEFAULT = "" + 3;
|
private static final String NUM_RS_DEFAULT = "" + 3;
|
||||||
|
|
||||||
|
/** Extract a descriptive statistic from a {@link com.yammer.metrics.core.Histogram}. */
|
||||||
|
private enum Stat {
|
||||||
|
STDEV {
|
||||||
|
@Override
|
||||||
|
double apply(Histogram hist) {
|
||||||
|
return hist.stdDev();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
FOUR_9S {
|
||||||
|
@Override
|
||||||
|
double apply(Histogram hist) {
|
||||||
|
return hist.getSnapshot().getValue(0.9999);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
abstract double apply(Histogram hist);
|
||||||
|
}
|
||||||
|
|
||||||
private TableName tableName;
|
private TableName tableName;
|
||||||
private long sleepTime;
|
private long sleepTime;
|
||||||
private int replicaCount;
|
private int replicaCount;
|
||||||
@ -97,17 +113,21 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||||||
public TimingResult call() throws Exception {
|
public TimingResult call() throws Exception {
|
||||||
PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
|
PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
|
||||||
PerformanceEvaluation.checkTable(admin, opts);
|
PerformanceEvaluation.checkTable(admin, opts);
|
||||||
|
PerformanceEvaluation.RunResult results[] = null;
|
||||||
long numRows = opts.totalRows;
|
long numRows = opts.totalRows;
|
||||||
long elapsedTime;
|
long elapsedTime = 0;
|
||||||
if (opts.nomapred) {
|
if (opts.nomapred) {
|
||||||
elapsedTime = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
|
results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
|
||||||
|
for (PerformanceEvaluation.RunResult r : results) {
|
||||||
|
elapsedTime = Math.max(elapsedTime, r.duration);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
|
Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
|
||||||
Counters counters = job.getCounters();
|
Counters counters = job.getCounters();
|
||||||
numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
|
numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
|
||||||
elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
|
elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
|
||||||
}
|
}
|
||||||
return new TimingResult(numRows, elapsedTime);
|
return new TimingResult(numRows, elapsedTime, results);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,12 +135,14 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||||||
* Record the results from a single {@link PerformanceEvaluation} job run.
|
* Record the results from a single {@link PerformanceEvaluation} job run.
|
||||||
*/
|
*/
|
||||||
static class TimingResult {
|
static class TimingResult {
|
||||||
public long numRows;
|
public final long numRows;
|
||||||
public long elapsedTime;
|
public final long elapsedTime;
|
||||||
|
public final PerformanceEvaluation.RunResult results[];
|
||||||
|
|
||||||
public TimingResult(long numRows, long elapsedTime) {
|
public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
|
||||||
this.numRows = numRows;
|
this.numRows = numRows;
|
||||||
this.elapsedTime = elapsedTime;
|
this.elapsedTime = elapsedTime;
|
||||||
|
this.results = results;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -162,7 +184,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||||||
@Override
|
@Override
|
||||||
public void setUpMonkey() throws Exception {
|
public void setUpMonkey() throws Exception {
|
||||||
Policy p = new PeriodicRandomActionPolicy(sleepTime,
|
Policy p = new PeriodicRandomActionPolicy(sleepTime,
|
||||||
new RestartRsHoldingTableAction(sleepTime, tableName.getNameAsString()),
|
new RestartRandomRsExceptMetaAction(sleepTime),
|
||||||
new MoveRandomRegionOfTableAction(tableName));
|
new MoveRandomRegionOfTableAction(tableName));
|
||||||
this.monkey = new PolicyBasedChaosMonkey(util, p);
|
this.monkey = new PolicyBasedChaosMonkey(util, p);
|
||||||
// don't start monkey right away
|
// don't start monkey right away
|
||||||
@ -215,6 +237,22 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Compute the mean of the given {@code stat} from a timing results. */
|
||||||
|
private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
|
||||||
|
double sum = 0;
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
for (TimingResult tr : results) {
|
||||||
|
for (PerformanceEvaluation.RunResult r : tr.results) {
|
||||||
|
assertNotNull("One of the run results is missing detailed run data.", r.hist);
|
||||||
|
sum += stat.apply(r.hist);
|
||||||
|
count += 1;
|
||||||
|
LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sum / count;
|
||||||
|
}
|
||||||
|
|
||||||
public void test() throws Exception {
|
public void test() throws Exception {
|
||||||
int maxIters = 3;
|
int maxIters = 3;
|
||||||
String replicas = "--replicas=" + replicaCount;
|
String replicas = "--replicas=" + replicaCount;
|
||||||
@ -226,8 +264,8 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||||||
format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
|
format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
|
||||||
String replicaReadOpts = format("%s %s", replicas, readOpts);
|
String replicaReadOpts = format("%s %s", replicas, readOpts);
|
||||||
|
|
||||||
ArrayList<TimingResult> resultsWithoutReplica = new ArrayList<TimingResult>(maxIters);
|
ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<TimingResult>(maxIters);
|
||||||
ArrayList<TimingResult> resultsWithReplica = new ArrayList<TimingResult>(maxIters);
|
ArrayList<TimingResult> resultsWithReplicas = new ArrayList<TimingResult>(maxIters);
|
||||||
|
|
||||||
// create/populate the table, replicas disabled
|
// create/populate the table, replicas disabled
|
||||||
LOG.debug("Populating table.");
|
LOG.debug("Populating table.");
|
||||||
@ -242,7 +280,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||||||
// collect a baseline without region replicas.
|
// collect a baseline without region replicas.
|
||||||
for (int i = 0; i < maxIters; i++) {
|
for (int i = 0; i < maxIters; i++) {
|
||||||
LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
|
LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
|
||||||
resultsWithoutReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
|
resultsWithoutReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
|
||||||
// TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
|
// TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
|
||||||
Thread.sleep(5000l);
|
Thread.sleep(5000l);
|
||||||
}
|
}
|
||||||
@ -257,32 +295,41 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
|
|||||||
// run test with region replicas.
|
// run test with region replicas.
|
||||||
for (int i = 0; i < maxIters; i++) {
|
for (int i = 0; i < maxIters; i++) {
|
||||||
LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
|
LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
|
||||||
resultsWithReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
|
resultsWithReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
|
||||||
// TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
|
// TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
|
||||||
Thread.sleep(5000l);
|
Thread.sleep(5000l);
|
||||||
}
|
}
|
||||||
|
|
||||||
DescriptiveStatistics withoutReplicaStats = new DescriptiveStatistics();
|
// compare the average of the stdev and 99.99pct across runs to determine if region replicas
|
||||||
for (TimingResult tr : resultsWithoutReplica) {
|
// are having an overall improvement on response variance experienced by clients.
|
||||||
withoutReplicaStats.addValue(tr.elapsedTime);
|
double withoutReplicasStdevMean =
|
||||||
}
|
calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
|
||||||
DescriptiveStatistics withReplicaStats = new DescriptiveStatistics();
|
double withoutReplicas9999Mean =
|
||||||
for (TimingResult tr : resultsWithReplica) {
|
calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
|
||||||
withReplicaStats.addValue(tr.elapsedTime);
|
double withReplicasStdevMean =
|
||||||
}
|
calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
|
||||||
|
double withReplicas9999Mean =
|
||||||
|
calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);
|
||||||
|
|
||||||
LOG.info(Objects.toStringHelper("testName")
|
LOG.info(Objects.toStringHelper(this)
|
||||||
.add("withoutReplicas", resultsWithoutReplica)
|
.add("withoutReplicas", resultsWithoutReplicas)
|
||||||
.add("withReplicas", resultsWithReplica)
|
.add("withReplicas", resultsWithReplicas)
|
||||||
.add("withoutReplicasMean", withoutReplicaStats.getMean())
|
.add("withoutReplicasStdevMean", withoutReplicasStdevMean)
|
||||||
.add("withReplicasMean", withReplicaStats.getMean())
|
.add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
|
||||||
|
.add("withReplicasStdevMean", withReplicasStdevMean)
|
||||||
|
.add("withReplicas99.99Mean", withReplicas9999Mean)
|
||||||
.toString());
|
.toString());
|
||||||
|
|
||||||
assertTrue(
|
assertTrue(
|
||||||
"Running with region replicas under chaos should be as fast or faster than without. "
|
"Running with region replicas under chaos should have less request variance than without. "
|
||||||
+ "withReplicas.mean: " + withReplicaStats.getMean() + "ms "
|
+ "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
|
||||||
+ "withoutReplicas.mean: " + withoutReplicaStats.getMean() + "ms.",
|
+ "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
|
||||||
withReplicaStats.getMean() <= withoutReplicaStats.getMean());
|
withReplicasStdevMean <= withoutReplicasStdevMean);
|
||||||
|
assertTrue(
|
||||||
|
"Running with region replicas under chaos should improve 99.99pct latency. "
|
||||||
|
+ "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
|
||||||
|
+ "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
|
||||||
|
withReplicas9999Mean <= withoutReplicas9999Mean);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
@ -94,7 +94,7 @@ public abstract class AbstractHBaseTool implements Tool {
|
|||||||
cmd = parseArgs(args);
|
cmd = parseArgs(args);
|
||||||
cmdLineArgs = args;
|
cmdLineArgs = args;
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
LOG.error("Error when parsing command-line arguemnts", e);
|
LOG.error("Error when parsing command-line arguments", e);
|
||||||
printUsage();
|
printUsage();
|
||||||
return EXIT_FAILURE;
|
return EXIT_FAILURE;
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,80 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import com.yammer.metrics.core.Histogram;
|
||||||
|
import com.yammer.metrics.stats.Sample;
|
||||||
|
import com.yammer.metrics.stats.Snapshot;
|
||||||
|
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.text.DecimalFormat;
|
||||||
|
|
||||||
|
/** Utility functions for working with Yammer Metrics. */
|
||||||
|
public final class YammerHistogramUtils {
|
||||||
|
|
||||||
|
// not for public consumption
|
||||||
|
private YammerHistogramUtils() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used formatting doubles so only two places after decimal point.
|
||||||
|
*/
|
||||||
|
private static DecimalFormat DOUBLE_FORMAT = new DecimalFormat("#0.00");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link com.yammer.metrics.core.Histogram} instance. These constructors are
|
||||||
|
* not public in 2.2.0, so we use reflection to find them.
|
||||||
|
*/
|
||||||
|
public static Histogram newHistogram(Sample sample) {
|
||||||
|
try {
|
||||||
|
Constructor<?> ctor =
|
||||||
|
Histogram.class.getDeclaredConstructor(Sample.class);
|
||||||
|
ctor.setAccessible(true);
|
||||||
|
return (Histogram) ctor.newInstance(sample);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @return an abbreviated summary of {@code hist}. */
|
||||||
|
public static String getShortHistogramReport(final Histogram hist) {
|
||||||
|
Snapshot sn = hist.getSnapshot();
|
||||||
|
return "mean=" + DOUBLE_FORMAT.format(hist.mean()) +
|
||||||
|
", min=" + DOUBLE_FORMAT.format(hist.min()) +
|
||||||
|
", max=" + DOUBLE_FORMAT.format(hist.max()) +
|
||||||
|
", stdDev=" + DOUBLE_FORMAT.format(hist.stdDev()) +
|
||||||
|
", 95th=" + DOUBLE_FORMAT.format(sn.get95thPercentile()) +
|
||||||
|
", 99th=" + DOUBLE_FORMAT.format(sn.get99thPercentile());
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @return a summary of {@code hist}. */
|
||||||
|
public static String getHistogramReport(final Histogram hist) {
|
||||||
|
Snapshot sn = hist.getSnapshot();
|
||||||
|
return ", mean=" + DOUBLE_FORMAT.format(hist.mean()) +
|
||||||
|
", min=" + DOUBLE_FORMAT.format(hist.min()) +
|
||||||
|
", max=" + DOUBLE_FORMAT.format(hist.max()) +
|
||||||
|
", stdDev=" + DOUBLE_FORMAT.format(hist.stdDev()) +
|
||||||
|
", 50th=" + DOUBLE_FORMAT.format(sn.getMedian()) +
|
||||||
|
", 75th=" + DOUBLE_FORMAT.format(sn.get75thPercentile()) +
|
||||||
|
", 95th=" + DOUBLE_FORMAT.format(sn.get95thPercentile()) +
|
||||||
|
", 99th=" + DOUBLE_FORMAT.format(sn.get99thPercentile()) +
|
||||||
|
", 99.9th=" + DOUBLE_FORMAT.format(sn.get999thPercentile()) +
|
||||||
|
", 99.99th=" + DOUBLE_FORMAT.format(sn.getValue(0.9999)) +
|
||||||
|
", 99.999th=" + DOUBLE_FORMAT.format(sn.getValue(0.99999));
|
||||||
|
}
|
||||||
|
}
|
@ -74,10 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
|
|||||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
|
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.*;
|
||||||
import org.apache.hadoop.hbase.util.Hash;
|
|
||||||
import org.apache.hadoop.hbase.util.MurmurHash;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
@ -179,6 +176,25 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
ROWS
|
ROWS
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static class RunResult implements Comparable<RunResult> {
|
||||||
|
public RunResult(long duration, Histogram hist) {
|
||||||
|
this.duration = duration;
|
||||||
|
this.hist = hist;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final long duration;
|
||||||
|
public final Histogram hist;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return Long.toString(duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public int compareTo(RunResult o) {
|
||||||
|
return Long.compare(this.duration, o.duration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param conf Configuration object
|
* @param conf Configuration object
|
||||||
@ -258,12 +274,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
final Connection con = ConnectionFactory.createConnection(conf);
|
final Connection con = ConnectionFactory.createConnection(conf);
|
||||||
|
|
||||||
// Evaluation task
|
// Evaluation task
|
||||||
long elapsedTime = runOneClient(this.cmd, conf, con, opts, status);
|
RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status);
|
||||||
// Collect how much time the thing took. Report as map output and
|
// Collect how much time the thing took. Report as map output and
|
||||||
// to the ELAPSED_TIME counter.
|
// to the ELAPSED_TIME counter.
|
||||||
context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
|
context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
|
||||||
context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
|
context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
|
||||||
context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime));
|
context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
|
||||||
context.progress();
|
context.progress();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -368,34 +384,32 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Run all clients in this vm each to its own thread.
|
* Run all clients in this vm each to its own thread.
|
||||||
* @param cmd Command to run.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
static long doLocalClients(final TestOptions opts, final Configuration conf)
|
static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
|
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
|
||||||
assert cmd != null;
|
assert cmd != null;
|
||||||
Future<Long>[] threads = new Future[opts.numClientThreads];
|
Future<RunResult>[] threads = new Future[opts.numClientThreads];
|
||||||
long[] timings = new long[opts.numClientThreads];
|
RunResult[] results = new RunResult[opts.numClientThreads];
|
||||||
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
|
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
|
||||||
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
|
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
|
||||||
final Connection con = ConnectionFactory.createConnection(conf);
|
final Connection con = ConnectionFactory.createConnection(conf);
|
||||||
for (int i = 0; i < threads.length; i++) {
|
for (int i = 0; i < threads.length; i++) {
|
||||||
final int index = i;
|
final int index = i;
|
||||||
threads[i] = pool.submit(new Callable<Long>() {
|
threads[i] = pool.submit(new Callable<RunResult>() {
|
||||||
@Override
|
@Override
|
||||||
public Long call() throws Exception {
|
public RunResult call() throws Exception {
|
||||||
TestOptions threadOpts = new TestOptions(opts);
|
TestOptions threadOpts = new TestOptions(opts);
|
||||||
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
|
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
|
||||||
long elapsedTime = runOneClient(cmd, conf, con, threadOpts, new Status() {
|
RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() {
|
||||||
@Override
|
@Override
|
||||||
public void setStatus(final String msg) throws IOException {
|
public void setStatus(final String msg) throws IOException {
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
LOG.info("Finished in " + elapsedTime +
|
LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
|
||||||
"ms over " + threadOpts.perClientRunRows + " rows");
|
"ms over " + threadOpts.perClientRunRows + " rows");
|
||||||
return elapsedTime;
|
return run;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -403,27 +417,27 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
|
|
||||||
for (int i = 0; i < threads.length; i++) {
|
for (int i = 0; i < threads.length; i++) {
|
||||||
try {
|
try {
|
||||||
timings[i] = threads[i].get();
|
results[i] = threads[i].get();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
throw new IOException(e.getCause());
|
throw new IOException(e.getCause());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final String test = cmd.getSimpleName();
|
final String test = cmd.getSimpleName();
|
||||||
LOG.info("[" + test + "] Summary of timings (ms): "
|
LOG.info("[" + test + "] Summary of timings (ms): "
|
||||||
+ Arrays.toString(timings));
|
+ Arrays.toString(results));
|
||||||
Arrays.sort(timings);
|
Arrays.sort(results);
|
||||||
long total = 0;
|
long total = 0;
|
||||||
for (long timing : timings) {
|
for (RunResult result : results) {
|
||||||
total += timing;
|
total += result.duration;
|
||||||
}
|
}
|
||||||
LOG.info("[" + test + "]"
|
LOG.info("[" + test + "]"
|
||||||
+ "\tMin: " + timings[0] + "ms"
|
+ "\tMin: " + results[0] + "ms"
|
||||||
+ "\tMax: " + timings[timings.length - 1] + "ms"
|
+ "\tMax: " + results[results.length - 1] + "ms"
|
||||||
+ "\tAvg: " + (total / timings.length) + "ms");
|
+ "\tAvg: " + (total / results.length) + "ms");
|
||||||
|
|
||||||
con.close();
|
con.close();
|
||||||
|
|
||||||
return total;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -993,23 +1007,21 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
return opts.period;
|
return opts.period;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populated by testTakedown. Only implemented by RandomReadTest at the moment.
|
||||||
|
*/
|
||||||
|
public Histogram getLatency() {
|
||||||
|
return latency;
|
||||||
|
}
|
||||||
|
|
||||||
void testSetup() throws IOException {
|
void testSetup() throws IOException {
|
||||||
if (!opts.oneCon) {
|
if (!opts.oneCon) {
|
||||||
this.connection = ConnectionFactory.createConnection(conf);
|
this.connection = ConnectionFactory.createConnection(conf);
|
||||||
}
|
}
|
||||||
this.table = new HTable(TableName.valueOf(opts.tableName), connection);
|
this.table = new HTable(TableName.valueOf(opts.tableName), connection);
|
||||||
this.table.setAutoFlushTo(opts.autoFlush);
|
this.table.setAutoFlushTo(opts.autoFlush);
|
||||||
|
latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
try {
|
valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
|
||||||
Constructor<?> ctor =
|
|
||||||
Histogram.class.getDeclaredConstructor(com.yammer.metrics.stats.Sample.class);
|
|
||||||
ctor.setAccessible(true);
|
|
||||||
latency = (Histogram) ctor.newInstance(new UniformSample(1024 * 500));
|
|
||||||
valueSize = (Histogram) ctor.newInstance(new UniformSample(1024 * 500));
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void testTakedown() throws IOException {
|
void testTakedown() throws IOException {
|
||||||
@ -1090,6 +1102,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
status.setStatus(testName + " Avg = " + h.mean());
|
status.setStatus(testName + " Avg = " + h.mean());
|
||||||
status.setStatus(testName + " StdDev = " + h.stdDev());
|
status.setStatus(testName + " StdDev = " + h.stdDev());
|
||||||
status.setStatus(testName + " 50th = " + sn.getMedian());
|
status.setStatus(testName + " 50th = " + sn.getMedian());
|
||||||
|
status.setStatus(testName + " 75th = " + sn.get75thPercentile());
|
||||||
status.setStatus(testName + " 95th = " + sn.get95thPercentile());
|
status.setStatus(testName + " 95th = " + sn.get95thPercentile());
|
||||||
status.setStatus(testName + " 99th = " + sn.get99thPercentile());
|
status.setStatus(testName + " 99th = " + sn.get99thPercentile());
|
||||||
status.setStatus(testName + " 99.9th = " + sn.get999thPercentile());
|
status.setStatus(testName + " 99.9th = " + sn.get999thPercentile());
|
||||||
@ -1098,33 +1111,18 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
status.setStatus(testName + " Max = " + h.max());
|
status.setStatus(testName + " Max = " + h.max());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Used formating doubles so only two places after decimal point.
|
|
||||||
*/
|
|
||||||
private static DecimalFormat DOUBLE_FORMAT = new DecimalFormat("#0.00");
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Subset of the histograms' calculation.
|
* @return Subset of the histograms' calculation.
|
||||||
*/
|
*/
|
||||||
private String getShortLatencyReport() {
|
public String getShortLatencyReport() {
|
||||||
return getShortHistogramReport(this.latency);
|
return YammerHistogramUtils.getShortHistogramReport(this.latency);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Subset of the histograms' calculation.
|
* @return Subset of the histograms' calculation.
|
||||||
*/
|
*/
|
||||||
private String getShortValueSizeReport() {
|
public String getShortValueSizeReport() {
|
||||||
return getShortHistogramReport(this.valueSize);
|
return YammerHistogramUtils.getShortHistogramReport(this.valueSize);
|
||||||
}
|
|
||||||
|
|
||||||
private String getShortHistogramReport(final Histogram h) {
|
|
||||||
Snapshot sn = h.getSnapshot();
|
|
||||||
return "mean=" + DOUBLE_FORMAT.format(h.mean()) +
|
|
||||||
", min=" + DOUBLE_FORMAT.format(h.min()) +
|
|
||||||
", max=" + DOUBLE_FORMAT.format(h.max()) +
|
|
||||||
", stdDev=" + DOUBLE_FORMAT.format(h.stdDev()) +
|
|
||||||
", 95th=" + DOUBLE_FORMAT.format(sn.get95thPercentile()) +
|
|
||||||
", 99th=" + DOUBLE_FORMAT.format(sn.get99thPercentile());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1528,7 +1526,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
|
return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
static long runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con,
|
static RunResult runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con,
|
||||||
TestOptions opts, final Status status)
|
TestOptions opts, final Status status)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
|
status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
|
||||||
@ -1555,7 +1553,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||||||
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
|
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
|
||||||
getAverageValueLength(opts)) + ")");
|
getAverageValueLength(opts)) + ")");
|
||||||
|
|
||||||
return totalElapsedTime;
|
return new RunResult(totalElapsedTime, t.getLatency());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int getAverageValueLength(final TestOptions opts) {
|
private static int getAverageValueLength(final TestOptions opts) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user