HBASE-10791 Add integration test to demonstrate performance improvement

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1585807 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ndimiduk 2014-04-08 18:18:41 +00:00 committed by Enis Soztutar
parent 7d247524b3
commit d313103aeb
2 changed files with 694 additions and 274 deletions

View File

@ -0,0 +1,338 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import com.google.common.base.Objects;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
import org.apache.hadoop.hbase.chaos.policies.Policy;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test for comparing the performance impact of region replicas. Uses
* components of {@link PerformanceEvaluation}. Does not run from
* {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
* with the JUnit runner. Hence no @Test annotations either. See {@code -help}
* for full list of options.
*/
@Category(IntegrationTests.class)
public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);
private static final String SLEEP_TIME_KEY = "sleeptime";
// short default interval because tests don't run very long.
private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
private static final String TABLE_NAME_KEY = "tableName";
private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
private static final String NOMAPRED_KEY = "nomapred";
private static final boolean NOMAPRED_DEFAULT = false;
private static final String REPLICA_COUNT_KEY = "replicas";
private static final String REPLICA_COUNT_DEFAULT = "" + 3;
private static final String PRIMARY_TIMEOUT_KEY = "timeout";
private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
private static final String NUM_RS_KEY = "numRs";
private static final String NUM_RS_DEFAULT = "" + 3;
private TableName tableName;
private long sleepTime;
private boolean nomapred = NOMAPRED_DEFAULT;
private int replicaCount;
private int primaryTimeout;
private int clusterSize;
/**
* Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
*/
static class PerfEvalCallable implements Callable<TimingResult> {
private final Queue<String> argv = new LinkedList<String>();
private final HBaseAdmin admin;
public PerfEvalCallable(HBaseAdmin admin, String argv) {
// TODO: this API is awkward, should take HConnection, not HBaseAdmin
this.admin = admin;
this.argv.addAll(Arrays.asList(argv.split(" ")));
LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
}
@Override
public TimingResult call() throws Exception {
PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
PerformanceEvaluation.checkTable(admin, opts);
long numRows = opts.totalRows;
long elapsedTime;
if (opts.nomapred) {
elapsedTime = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
} else {
Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
Counters counters = job.getCounters();
numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
}
return new TimingResult(numRows, elapsedTime);
}
}
/**
* Record the results from a single {@link PerformanceEvaluation} job run.
*/
static class TimingResult {
public long numRows;
public long elapsedTime;
public TimingResult(long numRows, long elapsedTime) {
this.numRows = numRows;
this.elapsedTime = elapsedTime;
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("numRows", numRows)
.add("elapsedTime", elapsedTime)
.toString();
}
}
@Override
public void setUp() throws Exception {
super.setUp();
Configuration conf = util.getConfiguration();
// sanity check cluster
// TODO: this should reach out to master and verify online state instead
assertEquals("Master must be configured with StochasticLoadBalancer",
"org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
conf.get("hbase.master.loadbalancer.class"));
// TODO: this should reach out to master and verify online state instead
assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
// enable client-side settings
conf.setBoolean(RpcClient.ALLOWS_INTERRUPTS, true);
// TODO: expose these settings to CLI override
conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
}
@Override
public void setUpCluster() throws Exception {
util = getTestingUtil(getConf());
util.initializeCluster(clusterSize);
}
@Override
public void setUpMonkey() throws Exception {
Policy p = new PeriodicRandomActionPolicy(sleepTime,
new RestartRsHoldingTableAction(sleepTime, tableName.getNameAsString()),
new MoveRandomRegionOfTableAction(tableName.getNameAsString()));
this.monkey = new PolicyBasedChaosMonkey(util, p);
// don't start monkey right away
}
@Override
protected void addOptions() {
addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
+ TABLE_NAME_DEFAULT + "'");
addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
+ SLEEP_TIME_DEFAULT);
addOptNoArg(NOMAPRED_KEY,
"Run multiple clients using threads (rather than use mapreduce)");
addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
+ REPLICA_COUNT_DEFAULT);
addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
+ PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
+ NUM_RS_DEFAULT);
}
@Override
protected void processOptions(CommandLine cmd) {
tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
nomapred = cmd.hasOption(NOMAPRED_KEY);
replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
primaryTimeout =
Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
LOG.debug(Objects.toStringHelper("Parsed Options")
.add(TABLE_NAME_KEY, tableName)
.add(SLEEP_TIME_KEY, sleepTime)
.add(NOMAPRED_KEY, nomapred)
.add(REPLICA_COUNT_KEY, replicaCount)
.add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
.add(NUM_RS_KEY, clusterSize)
.toString());
}
@Override
public int runTestFromCommandLine() throws Exception {
test();
return 0;
}
@Override
public String getTablename() {
return tableName.getNameAsString();
}
@Override
protected Set<String> getColumnFamilies() {
return null;
}
/**
* Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
*/
private static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) throws Exception {
admin.modifyTable(desc.getTableName(), desc);
Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
setFirst(0);
setSecond(0);
}};
for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds
status = admin.getAlterStatus(desc.getTableName());
if (status.getSecond() != 0) {
LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
+ " regions updated.");
Thread.sleep(1 * 1000l);
} else {
LOG.debug("All regions updated.");
}
}
if (status.getSecond() != 0) {
throw new Exception("Failed to update replica count after 500 seconds.");
}
}
/**
* Set the number of Region replicas.
*/
private static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount)
throws Exception {
admin.disableTable(table);
HTableDescriptor desc = admin.getTableDescriptor(table);
desc.setRegionReplication(replicaCount);
modifyTableSync(admin, desc);
admin.enableTable(table);
}
public void test() throws Exception {
int maxIters = 3;
String mr = nomapred ? "--nomapred" : "";
String replicas = "--replicas=" + replicaCount;
// TODO: splits disabled until "phase 2" is complete.
String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
String writeOpts = format("%s %s --table=%s --presplit=16 sequentialWrite 4",
mr, splitPolicy, tableName);
String readOpts =
format("%s --table=%s --latency --sampleRate=0.1 randomRead 4", mr, tableName);
String replicaReadOpts = format("%s %s", replicas, readOpts);
ArrayList<TimingResult> resultsWithoutReplica = new ArrayList<TimingResult>(maxIters);
ArrayList<TimingResult> resultsWithReplica = new ArrayList<TimingResult>(maxIters);
// create/populate the table, replicas disabled
LOG.debug("Populating table.");
new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();
// one last sanity check, then send in the clowns!
assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
DisabledRegionSplitPolicy.class.getName(),
util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
startMonkey();
// collect a baseline without region replicas.
for (int i = 0; i < maxIters; i++) {
LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
resultsWithoutReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
// TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
Thread.sleep(5000l);
}
// disable monkey, enable region replicas, enable monkey
cleanUpMonkey("Altering table.");
LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
setUpMonkey();
startMonkey();
// run test with region replicas.
for (int i = 0; i < maxIters; i++) {
LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
resultsWithReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
// TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
Thread.sleep(5000l);
}
DescriptiveStatistics withoutReplicaStats = new DescriptiveStatistics();
for (TimingResult tr : resultsWithoutReplica) {
withoutReplicaStats.addValue(tr.elapsedTime);
}
DescriptiveStatistics withReplicaStats = new DescriptiveStatistics();
for (TimingResult tr : resultsWithReplica) {
withReplicaStats.addValue(tr.elapsedTime);
}
LOG.info(Objects.toStringHelper("testName")
.add("withoutReplicas", resultsWithoutReplica)
.add("withReplicas", resultsWithReplica)
.add("withoutReplicasMean", withoutReplicaStats.getMean())
.add("withReplicasMean", withReplicaStats.getMean())
.toString());
assertTrue(
"Running with region replicas under chaos should be as fast or faster than without. "
+ "withReplicas.mean: " + withReplicaStats.getMean() + "ms "
+ "withoutReplicas.mean: " + withoutReplicaStats.getMean() + "ms.",
withReplicaStats.getMean() <= withoutReplicaStats.getMean());
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
System.exit(status);
}
}

View File

@ -30,7 +30,9 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@ -39,12 +41,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -84,7 +89,6 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.stats.UniformSample;
import com.yammer.metrics.stats.Snapshot;
@ -99,16 +103,17 @@ import org.htrace.impl.ProbabilitySampler;
* client that steps through one of a set of hardcoded tests or 'experiments'
* (e.g. a random reads test, a random writes test, etc.). Pass on the
* command-line which test to run and how many clients are participating in
* this experiment. Run <code>java PerformanceEvaluation --help</code> to
* obtain usage.
* this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
*
* <p>This class sets up and runs the evaluation programs described in
* Section 7, <i>Performance Evaluation</i>, of the <a
* href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
* paper, pages 8-10.
*
* <p>If number of clients > 1, we start up a MapReduce job. Each map task
* runs an individual client. Each client does about 1GB of data.
* <p>By default, runs as a mapreduce job where each mapper runs a single test
* client. Can also run as a non-mapreduce, multithreaded application by
* specifying {@code --nomapred}. Each client does about 1GB of data, unless
* specified otherwise.
*/
public class PerformanceEvaluation extends Configured implements Tool {
protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
@ -133,28 +138,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
private static final TestOptions DEFAULT_OPTS = new TestOptions();
protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>();
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
/**
* Enum for map metrics. Keep it out here rather than inside in the Map
* inner-class so we can find associated properties.
*/
protected static enum Counter {
/** elapsed time */
ELAPSED_TIME,
/** number of rows */
ROWS
}
/**
* Constructor
* @param conf Configuration object
*/
public PerformanceEvaluation(final Configuration conf) {
super(conf);
static {
addCommandDescriptor(RandomReadTest.class, "randomRead",
"Run random read test");
addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
@ -180,11 +167,29 @@ public class PerformanceEvaluation extends Configured implements Tool {
"(make sure to use --rows=20)");
}
protected void addCommandDescriptor(Class<? extends Test> cmdClass,
/**
* Enum for map metrics. Keep it out here rather than inside in the Map
* inner-class so we can find associated properties.
*/
protected static enum Counter {
/** elapsed time */
ELAPSED_TIME,
/** number of rows */
ROWS
}
/**
* Constructor
* @param conf Configuration object
*/
public PerformanceEvaluation(final Configuration conf) {
super(conf);
}
protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
String name, String description) {
CmdDescriptor cmdDescriptor =
new CmdDescriptor(cmdClass, name, description);
commands.put(name, cmdDescriptor);
CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
COMMANDS.put(name, cmdDescriptor);
}
/**
@ -235,10 +240,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
@Override
protected void map(LongWritable key, Text value, final Context context)
throws IOException, InterruptedException {
Status status = new Status() {
@Override
public void setStatus(String msg) {
context.setStatus(msg);
}
@ -260,35 +267,62 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
/*
* If table does not already exist, create.
* @param c Client to use checking.
* @return True if we created the table.
* @throws IOException
* If table does not already exist, create. Also create a table when
* {@code opts.presplitRegions} is specified or when the existing table's
* region replica count doesn't match {@code opts.replicas}.
*/
private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
HTableDescriptor tableDescriptor = getTableDescriptor(opts);
if (opts.presplitRegions > 0) {
// presplit requested
if (admin.tableExists(tableDescriptor.getTableName())) {
admin.disableTable(tableDescriptor.getTableName());
admin.deleteTable(tableDescriptor.getTableName());
static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
TableName tableName = TableName.valueOf(opts.tableName);
boolean needsDelete = false, exists = admin.tableExists(tableName);
boolean isReadCmd = opts.cmdName.toLowerCase().contains("read")
|| opts.cmdName.toLowerCase().contains("scan");
if (!exists && isReadCmd) {
throw new IllegalStateException(
"Must specify an existing table for read commands. Run a write command first.");
}
HTableDescriptor desc =
exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
byte[][] splits = getSplits(opts);
// recreate the table when user has requested presplit or when existing
// {RegionSplitPolicy,replica count} does not match requested.
if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
|| (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy)
|| (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)) {
needsDelete = true;
// wait, why did it delete my table?!?
LOG.debug(Objects.toStringHelper("needsDelete")
.add("needsDelete", needsDelete)
.add("isReadCmd", isReadCmd)
.add("exists", exists)
.add("desc", desc)
.add("presplit", opts.presplitRegions)
.add("splitPolicy", opts.splitPolicy)
.add("replicas", opts.replicas));
}
byte[][] splits = getSplits(opts);
for (int i=0; i < splits.length; i++) {
// remove an existing table
if (needsDelete) {
if (admin.isTableEnabled(tableName)) {
admin.disableTable(tableName);
}
admin.deleteTable(tableName);
}
// table creation is necessary
if (!exists || needsDelete) {
desc = getTableDescriptor(opts);
if (splits != null) {
if (LOG.isDebugEnabled()) {
for (int i = 0; i < splits.length; i++) {
LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
}
admin.createTable(tableDescriptor, splits);
LOG.info ("Table created with " + opts.presplitRegions + " splits");
}
else {
boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
if (!tableExists) {
admin.createTable(tableDescriptor);
LOG.info("Table " + tableDescriptor + " created");
}
}
return admin.tableExists(tableDescriptor.getTableName());
admin.createTable(desc, splits);
LOG.info("Table " + desc + " created");
}
return admin.tableExists(tableName);
}
/**
@ -304,6 +338,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
family.setInMemory(true);
}
desc.addFamily(family);
if (opts.replicas != DEFAULT_OPTS.replicas) {
desc.setRegionReplication(opts.replicas);
}
if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
desc.setRegionSplitPolicyClassName(opts.splitPolicy);
}
return desc;
}
@ -311,8 +351,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
* generates splits based on total number of rows and specified split regions
*/
protected static byte[][] getSplits(TestOptions opts) {
if (opts.presplitRegions == 0)
return new byte [0][];
if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
return null;
int numSplitPoints = opts.presplitRegions - 1;
byte[][] splits = new byte[numSplitPoints][];
@ -329,8 +369,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @param cmd Command to run.
* @throws IOException
*/
private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
static long doLocalClients(final TestOptions opts, final Configuration conf)
throws IOException, InterruptedException {
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
assert cmd != null;
Future<Long>[] threads = new Future[opts.numClientThreads];
long[] timings = new long[opts.numClientThreads];
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
@ -342,7 +384,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
public Long call() throws Exception {
TestOptions threadOpts = new TestOptions(opts);
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
long elapsedTime = runOneClient(cmd, conf, threadOpts, new Status() {
@Override
public void setStatus(final String msg) throws IOException {
LOG.info(msg);
}
@ -373,6 +416,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
+ "\tMin: " + timings[0] + "ms"
+ "\tMax: " + timings[timings.length - 1] + "ms"
+ "\tAvg: " + (total / timings.length) + "ms");
return total;
}
/*
@ -382,15 +426,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @param cmd Command to run.
* @throws IOException
*/
private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
static Job doMapReduce(TestOptions opts, final Configuration conf)
throws IOException, InterruptedException, ClassNotFoundException {
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
assert cmd != null;
Path inputDir = writeInputFile(conf, opts);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
Job job = new Job(conf);
job.setJarByClass(PerformanceEvaluation.class);
job.setJobName("HBase Performance Evaluation");
job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setInputPaths(job, inputDir);
@ -416,6 +461,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
TableMapReduceUtil.initCredentials(job);
job.waitForCompletion(true);
return job;
}
/*
@ -424,7 +470,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @return Directory that contains file written.
* @throws IOException
*/
private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
Path inputDir = new Path(jobdir, "inputs");
@ -491,6 +537,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* This makes tracking all these arguments a little easier.
*/
static class TestOptions {
String cmdName = null;
boolean nomapred = false;
boolean filterAll = false;
int startRow = 0;
@ -511,6 +558,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
int multiGet = 0;
boolean inMemoryCF = false;
int presplitRegions = 0;
int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
String splitPolicy = null;
Compression.Algorithm compression = Compression.Algorithm.NONE;
BloomType bloomType = BloomType.ROW;
DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
@ -521,6 +570,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
public TestOptions() {}
public TestOptions(TestOptions that) {
this.cmdName = that.cmdName;
this.nomapred = that.nomapred;
this.startRow = that.startRow;
this.size = that.size;
@ -540,6 +590,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.multiGet = that.multiGet;
this.inMemoryCF = that.inMemoryCF;
this.presplitRegions = that.presplitRegions;
this.replicas = that.replicas;
this.splitPolicy = that.splitPolicy;
this.compression = that.compression;
this.blockEncoding = that.blockEncoding;
this.filterAll = that.filterAll;
@ -997,10 +1049,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class RandomReadTest extends Test {
private final Consistency consistency;
private ArrayList<Get> gets;
RandomReadTest(HConnection con, TestOptions options, Status status) {
super(con, options, status);
consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
if (opts.multiGet > 0) {
LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
this.gets = new ArrayList<Get>(opts.multiGet);
@ -1014,6 +1068,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (opts.filterAll) {
get.setFilter(new FilterAllFilter());
}
get.setConsistency(consistency);
if (LOG.isTraceEnabled()) LOG.trace(get.toString());
if (opts.multiGet > 0) {
this.gets.add(get);
@ -1313,9 +1368,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (admin != null) admin.close();
}
if (opts.nomapred) {
doLocalClients(cmd, opts);
doLocalClients(opts, getConf());
} else {
doMapReduce(cmd, opts);
doMapReduce(opts, getConf());
}
}
@ -1368,6 +1423,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
"'valueSize'; set on read for stats on size: Default: Not set.");
System.err.println(" period Report every 'period' rows: " +
"Default: opts.perClientRunRows / 10");
System.err.println(" multiGet Batch gets together into groups of N. Only supported " +
"by randomRead. Default: disabled");
System.err.println(" replicas Enable region replica testing. Defaults: 1.");
System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: ");
@ -1375,7 +1434,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" -Dmapreduce.task.timeout=60000");
System.err.println();
System.err.println("Command:");
for (CmdDescriptor command : commands.values()) {
for (CmdDescriptor command : COMMANDS.values()) {
System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
}
System.err.println();
@ -1389,40 +1448,20 @@ public class PerformanceEvaluation extends Configured implements Tool {
+ " sequentialWrite 1");
}
private static int getNumClients(final int start, final String[] args) {
if(start + 1 > args.length) {
throw new IllegalArgumentException("must supply the number of clients");
}
int N = Integer.parseInt(args[start]);
if (N < 1) {
throw new IllegalArgumentException("Number of clients must be > 1");
}
return N;
}
public int run(String[] args) throws Exception {
// Process command-line args. TODO: Better cmd-line processing
// (but hopefully something not as painful as cli options).
int errCode = -1;
if (args.length < 1) {
printUsage();
return errCode;
}
try {
// MR-NOTE: if you are adding a property that is used to control an operation
// like put(), get(), scan(), ... you must also add it as part of the MR
// input, take a look at writeInputFile().
// Then you must adapt the LINE_PATTERN input regex,
// and parse the argument, take a look at PEInputFormat.getSplits().
/**
* Parse options passed in via an arguments array. Assumes that array has been split
* on white-space and placed into a {@code Queue}. Any unknown arguments will remain
* in the queue at the conclusion of this method call. It's up to the caller to deal
* with these unrecognized arguments.
*/
static TestOptions parseOpts(Queue<String> args) {
TestOptions opts = new TestOptions();
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
String cmd = null;
while ((cmd = args.poll()) != null) {
if (cmd.equals("-h") || cmd.startsWith("--h")) {
printUsage();
errCode = 0;
// place item back onto queue so that caller knows parsing was incomplete
args.add(cmd);
break;
}
@ -1438,36 +1477,36 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
final String startRow = "--startRow=";
if (cmd.startsWith(startRow)) {
opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
continue;
}
final String sampleRate = "--sampleRate=";
if (cmd.startsWith(sampleRate)) {
opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
continue;
}
final String traceRate = "--traceRate=";
if (cmd.startsWith(traceRate)) {
opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
continue;
}
final String table = "--table=";
if (cmd.startsWith(table)) {
opts.tableName = cmd.substring(table.length());
continue;
}
final String startRow = "--startRow=";
if (cmd.startsWith(startRow)) {
opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
continue;
}
final String compress = "--compress=";
if (cmd.startsWith(compress)) {
opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
continue;
}
final String traceRate = "--traceRate=";
if (cmd.startsWith(traceRate)) {
opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
continue;
}
final String blockEncoding = "--blockEncoding=";
if (cmd.startsWith(blockEncoding)) {
opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
@ -1486,18 +1525,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
final String autoFlush = "--autoFlush=";
if (cmd.startsWith(autoFlush)) {
opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
continue;
}
final String onceCon = "--oneCon=";
if (cmd.startsWith(onceCon)) {
opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
continue;
}
final String presplit = "--presplit=";
if (cmd.startsWith(presplit)) {
opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
@ -1510,6 +1537,18 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
final String autoFlush = "--autoFlush=";
if (cmd.startsWith(autoFlush)) {
opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
continue;
}
final String onceCon = "--oneCon=";
if (cmd.startsWith(onceCon)) {
opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
continue;
}
final String latency = "--latency";
if (cmd.startsWith(latency)) {
opts.reportLatency = true;
@ -1534,6 +1573,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
final String replicas = "--replicas=";
if (cmd.startsWith(replicas)) {
opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
continue;
}
final String filterOutAll = "--filterAll";
if (cmd.startsWith(filterOutAll)) {
opts.filterAll = true;
@ -1546,6 +1591,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
final String splitPolicy = "--splitPolicy=";
if (cmd.startsWith(splitPolicy)) {
opts.splitPolicy = cmd.substring(splitPolicy.length());
continue;
}
final String bloomFilter = "--bloomFilter";
if (cmd.startsWith(bloomFilter)) {
opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
@ -1570,17 +1621,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
Class<? extends Test> cmdClass = determineCommandClass(cmd);
if (cmdClass != null) {
opts.numClientThreads = getNumClients(i + 1, args);
if (isCommandClass(cmd)) {
opts.cmdName = cmd;
opts.numClientThreads = Integer.parseInt(args.remove());
int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
if (opts.size != DEFAULT_OPTS.size &&
opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
throw new IllegalArgumentException(rows + " and " + size +
" are mutually exclusive arguments.");
throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
}
// Calculate how many rows per gig. If random value size presume that that half the max
// is average row size.
int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
if (opts.size != DEFAULT_OPTS.size) {
// total size in GB specified
opts.totalRows = (int) opts.size * rowsPerGB;
@ -1590,14 +1638,44 @@ public class PerformanceEvaluation extends Configured implements Tool {
opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
opts.size = opts.totalRows / rowsPerGB;
}
runTest(cmdClass, opts);
errCode = 0;
break;
}
}
return opts;
}
@Override
public int run(String[] args) throws Exception {
// Process command-line args. TODO: Better cmd-line processing
// (but hopefully something not as painful as cli options).
int errCode = -1;
if (args.length < 1) {
printUsage();
break;
return errCode;
}
try {
LinkedList<String> argv = new LinkedList<String>();
argv.addAll(Arrays.asList(args));
TestOptions opts = parseOpts(argv);
// args remainting, print help and exit
if (!argv.isEmpty()) {
errCode = 0;
printUsage();
}
// must run at least 1 client
if (opts.numClientThreads <= 0) {
throw new IllegalArgumentException("Number of clients must be > 0");
}
Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
if (cmdClass != null) {
runTest(cmdClass, opts);
errCode = 0;
}
} catch (Exception e) {
e.printStackTrace();
}
@ -1605,8 +1683,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
return errCode;
}
private Class<? extends Test> determineCommandClass(String cmd) {
CmdDescriptor descriptor = commands.get(cmd);
private static boolean isCommandClass(String cmd) {
return COMMANDS.containsKey(cmd);
}
private static Class<? extends Test> determineCommandClass(String cmd) {
CmdDescriptor descriptor = COMMANDS.get(cmd);
return descriptor != null ? descriptor.getCmdClass() : null;
}