diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java new file mode 100644 index 00000000000..ca3a8f0e36e --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java @@ -0,0 +1,338 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import com.google.common.base.Objects; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction; +import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction; +import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; +import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; +import org.apache.hadoop.hbase.chaos.policies.Policy; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.ToolRunner; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; + +import static java.lang.String.format; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test for comparing the performance impact of region replicas. Uses + * components of {@link PerformanceEvaluation}. Does not run from + * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible + * with the JUnit runner. Hence no @Test annotations either. See {@code -help} + * for full list of options. + */ +@Category(IntegrationTests.class) +public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { + + private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class); + + private static final String SLEEP_TIME_KEY = "sleeptime"; + // short default interval because tests don't run very long. + private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l); + private static final String TABLE_NAME_KEY = "tableName"; + private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf"; + private static final String NOMAPRED_KEY = "nomapred"; + private static final boolean NOMAPRED_DEFAULT = false; + private static final String REPLICA_COUNT_KEY = "replicas"; + private static final String REPLICA_COUNT_DEFAULT = "" + 3; + private static final String PRIMARY_TIMEOUT_KEY = "timeout"; + private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms + private static final String NUM_RS_KEY = "numRs"; + private static final String NUM_RS_DEFAULT = "" + 3; + + private TableName tableName; + private long sleepTime; + private boolean nomapred = NOMAPRED_DEFAULT; + private int replicaCount; + private int primaryTimeout; + private int clusterSize; + + /** + * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}. + */ + static class PerfEvalCallable implements Callable { + private final Queue argv = new LinkedList(); + private final HBaseAdmin admin; + + public PerfEvalCallable(HBaseAdmin admin, String argv) { + // TODO: this API is awkward, should take HConnection, not HBaseAdmin + this.admin = admin; + this.argv.addAll(Arrays.asList(argv.split(" "))); + LOG.debug("Created PerformanceEvaluationCallable with args: " + argv); + } + + @Override + public TimingResult call() throws Exception { + PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv); + PerformanceEvaluation.checkTable(admin, opts); + long numRows = opts.totalRows; + long elapsedTime; + if (opts.nomapred) { + elapsedTime = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration()); + } else { + Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration()); + Counters counters = job.getCounters(); + numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue(); + elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue(); + } + return new TimingResult(numRows, elapsedTime); + } + } + + /** + * Record the results from a single {@link PerformanceEvaluation} job run. + */ + static class TimingResult { + public long numRows; + public long elapsedTime; + + public TimingResult(long numRows, long elapsedTime) { + this.numRows = numRows; + this.elapsedTime = elapsedTime; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("numRows", numRows) + .add("elapsedTime", elapsedTime) + .toString(); + } + } + + @Override + public void setUp() throws Exception { + super.setUp(); + Configuration conf = util.getConfiguration(); + + // sanity check cluster + // TODO: this should reach out to master and verify online state instead + assertEquals("Master must be configured with StochasticLoadBalancer", + "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer", + conf.get("hbase.master.loadbalancer.class")); + // TODO: this should reach out to master and verify online state instead + assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.", + conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0); + + // enable client-side settings + conf.setBoolean(RpcClient.ALLOWS_INTERRUPTS, true); + // TODO: expose these settings to CLI override + conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout); + conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout); + } + + @Override + public void setUpCluster() throws Exception { + util = getTestingUtil(getConf()); + util.initializeCluster(clusterSize); + } + + @Override + public void setUpMonkey() throws Exception { + Policy p = new PeriodicRandomActionPolicy(sleepTime, + new RestartRsHoldingTableAction(sleepTime, tableName.getNameAsString()), + new MoveRandomRegionOfTableAction(tableName.getNameAsString())); + this.monkey = new PolicyBasedChaosMonkey(util, p); + // don't start monkey right away + } + + @Override + protected void addOptions() { + addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '" + + TABLE_NAME_DEFAULT + "'"); + addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: " + + SLEEP_TIME_DEFAULT); + addOptNoArg(NOMAPRED_KEY, + "Run multiple clients using threads (rather than use mapreduce)"); + addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: " + + REPLICA_COUNT_DEFAULT); + addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: " + + PRIMARY_TIMEOUT_DEFAULT + " (10ms)"); + addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: " + + NUM_RS_DEFAULT); + } + + @Override + protected void processOptions(CommandLine cmd) { + tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT)); + sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT)); + nomapred = cmd.hasOption(NOMAPRED_KEY); + replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT)); + primaryTimeout = + Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT)); + clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT)); + LOG.debug(Objects.toStringHelper("Parsed Options") + .add(TABLE_NAME_KEY, tableName) + .add(SLEEP_TIME_KEY, sleepTime) + .add(NOMAPRED_KEY, nomapred) + .add(REPLICA_COUNT_KEY, replicaCount) + .add(PRIMARY_TIMEOUT_KEY, primaryTimeout) + .add(NUM_RS_KEY, clusterSize) + .toString()); + } + + @Override + public int runTestFromCommandLine() throws Exception { + test(); + return 0; + } + + @Override + public String getTablename() { + return tableName.getNameAsString(); + } + + @Override + protected Set getColumnFamilies() { + return null; + } + + /** + * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}. + */ + private static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) throws Exception { + admin.modifyTable(desc.getTableName(), desc); + Pair status = new Pair() {{ + setFirst(0); + setSecond(0); + }}; + for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds + status = admin.getAlterStatus(desc.getTableName()); + if (status.getSecond() != 0) { + LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond() + + " regions updated."); + Thread.sleep(1 * 1000l); + } else { + LOG.debug("All regions updated."); + } + } + if (status.getSecond() != 0) { + throw new Exception("Failed to update replica count after 500 seconds."); + } + } + + /** + * Set the number of Region replicas. + */ + private static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount) + throws Exception { + admin.disableTable(table); + HTableDescriptor desc = admin.getTableDescriptor(table); + desc.setRegionReplication(replicaCount); + modifyTableSync(admin, desc); + admin.enableTable(table); + } + + public void test() throws Exception { + int maxIters = 3; + String mr = nomapred ? "--nomapred" : ""; + String replicas = "--replicas=" + replicaCount; + // TODO: splits disabled until "phase 2" is complete. + String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName(); + String writeOpts = format("%s %s --table=%s --presplit=16 sequentialWrite 4", + mr, splitPolicy, tableName); + String readOpts = + format("%s --table=%s --latency --sampleRate=0.1 randomRead 4", mr, tableName); + String replicaReadOpts = format("%s %s", replicas, readOpts); + + ArrayList resultsWithoutReplica = new ArrayList(maxIters); + ArrayList resultsWithReplica = new ArrayList(maxIters); + + // create/populate the table, replicas disabled + LOG.debug("Populating table."); + new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call(); + + // one last sanity check, then send in the clowns! + assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.", + DisabledRegionSplitPolicy.class.getName(), + util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName()); + startMonkey(); + + // collect a baseline without region replicas. + for (int i = 0; i < maxIters; i++) { + LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters); + resultsWithoutReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call()); + // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary? + Thread.sleep(5000l); + } + + // disable monkey, enable region replicas, enable monkey + cleanUpMonkey("Altering table."); + LOG.debug("Altering " + tableName + " replica count to " + replicaCount); + setReplicas(util.getHBaseAdmin(), tableName, replicaCount); + setUpMonkey(); + startMonkey(); + + // run test with region replicas. + for (int i = 0; i < maxIters; i++) { + LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters); + resultsWithReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call()); + // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary? + Thread.sleep(5000l); + } + + DescriptiveStatistics withoutReplicaStats = new DescriptiveStatistics(); + for (TimingResult tr : resultsWithoutReplica) { + withoutReplicaStats.addValue(tr.elapsedTime); + } + DescriptiveStatistics withReplicaStats = new DescriptiveStatistics(); + for (TimingResult tr : resultsWithReplica) { + withReplicaStats.addValue(tr.elapsedTime); + } + + LOG.info(Objects.toStringHelper("testName") + .add("withoutReplicas", resultsWithoutReplica) + .add("withReplicas", resultsWithReplica) + .add("withoutReplicasMean", withoutReplicaStats.getMean()) + .add("withReplicasMean", withReplicaStats.getMean()) + .toString()); + + assertTrue( + "Running with region replicas under chaos should be as fast or faster than without. " + + "withReplicas.mean: " + withReplicaStats.getMean() + "ms " + + "withoutReplicas.mean: " + withoutReplicaStats.getMean() + "ms.", + withReplicaStats.getMean() <= withoutReplicaStats.getMean()); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args); + System.exit(status); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 227fc1a37ae..d9fa0b76478 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -30,7 +30,9 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; +import java.util.LinkedList; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -39,12 +41,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -84,7 +89,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.codehaus.jackson.map.ObjectMapper; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.yammer.metrics.core.Histogram; import com.yammer.metrics.stats.UniformSample; import com.yammer.metrics.stats.Snapshot; @@ -99,16 +103,17 @@ import org.htrace.impl.ProbabilitySampler; * client that steps through one of a set of hardcoded tests or 'experiments' * (e.g. a random reads test, a random writes test, etc.). Pass on the * command-line which test to run and how many clients are participating in - * this experiment. Run java PerformanceEvaluation --help to - * obtain usage. + * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage. * *

This class sets up and runs the evaluation programs described in * Section 7, Performance Evaluation, of the Bigtable * paper, pages 8-10. * - *

If number of clients > 1, we start up a MapReduce job. Each map task - * runs an individual client. Each client does about 1GB of data. + *

By default, runs as a mapreduce job where each mapper runs a single test + * client. Can also run as a non-mapreduce, multithreaded application by + * specifying {@code --nomapred}. Each client does about 1GB of data, unless + * specified otherwise. */ public class PerformanceEvaluation extends Configured implements Tool { protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); @@ -133,10 +138,35 @@ public class PerformanceEvaluation extends Configured implements Tool { private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); private static final TestOptions DEFAULT_OPTS = new TestOptions(); - protected Map commands = new TreeMap(); - + private static Map COMMANDS = new TreeMap(); private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); + static { + addCommandDescriptor(RandomReadTest.class, "randomRead", + "Run random read test"); + addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", + "Run random seek and scan 100 test"); + addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", + "Run random seek scan with both start and stop row (max 10 rows)"); + addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", + "Run random seek scan with both start and stop row (max 100 rows)"); + addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", + "Run random seek scan with both start and stop row (max 1000 rows)"); + addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", + "Run random seek scan with both start and stop row (max 10000 rows)"); + addCommandDescriptor(RandomWriteTest.class, "randomWrite", + "Run random write test"); + addCommandDescriptor(SequentialReadTest.class, "sequentialRead", + "Run sequential read test"); + addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", + "Run sequential write test"); + addCommandDescriptor(ScanTest.class, "scan", + "Run scan test (read every row)"); + addCommandDescriptor(FilteredScanTest.class, "filterScan", + "Run scan test using a filter to find a specific row based on it's value " + + "(make sure to use --rows=20)"); + } + /** * Enum for map metrics. Keep it out here rather than inside in the Map * inner-class so we can find associated properties. @@ -154,37 +184,12 @@ public class PerformanceEvaluation extends Configured implements Tool { */ public PerformanceEvaluation(final Configuration conf) { super(conf); - - addCommandDescriptor(RandomReadTest.class, "randomRead", - "Run random read test"); - addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", - "Run random seek and scan 100 test"); - addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", - "Run random seek scan with both start and stop row (max 10 rows)"); - addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", - "Run random seek scan with both start and stop row (max 100 rows)"); - addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", - "Run random seek scan with both start and stop row (max 1000 rows)"); - addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", - "Run random seek scan with both start and stop row (max 10000 rows)"); - addCommandDescriptor(RandomWriteTest.class, "randomWrite", - "Run random write test"); - addCommandDescriptor(SequentialReadTest.class, "sequentialRead", - "Run sequential read test"); - addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", - "Run sequential write test"); - addCommandDescriptor(ScanTest.class, "scan", - "Run scan test (read every row)"); - addCommandDescriptor(FilteredScanTest.class, "filterScan", - "Run scan test using a filter to find a specific row based on it's value " + - "(make sure to use --rows=20)"); } - protected void addCommandDescriptor(Class cmdClass, + protected static void addCommandDescriptor(Class cmdClass, String name, String description) { - CmdDescriptor cmdDescriptor = - new CmdDescriptor(cmdClass, name, description); - commands.put(name, cmdDescriptor); + CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); + COMMANDS.put(name, cmdDescriptor); } /** @@ -235,10 +240,12 @@ public class PerformanceEvaluation extends Configured implements Tool { } } + @Override protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException { Status status = new Status() { + @Override public void setStatus(String msg) { context.setStatus(msg); } @@ -260,35 +267,62 @@ public class PerformanceEvaluation extends Configured implements Tool { } /* - * If table does not already exist, create. - * @param c Client to use checking. - * @return True if we created the table. - * @throws IOException + * If table does not already exist, create. Also create a table when + * {@code opts.presplitRegions} is specified or when the existing table's + * region replica count doesn't match {@code opts.replicas}. */ - private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException { - HTableDescriptor tableDescriptor = getTableDescriptor(opts); - if (opts.presplitRegions > 0) { - // presplit requested - if (admin.tableExists(tableDescriptor.getTableName())) { - admin.disableTable(tableDescriptor.getTableName()); - admin.deleteTable(tableDescriptor.getTableName()); - } + static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException { + TableName tableName = TableName.valueOf(opts.tableName); + boolean needsDelete = false, exists = admin.tableExists(tableName); + boolean isReadCmd = opts.cmdName.toLowerCase().contains("read") + || opts.cmdName.toLowerCase().contains("scan"); + if (!exists && isReadCmd) { + throw new IllegalStateException( + "Must specify an existing table for read commands. Run a write command first."); + } + HTableDescriptor desc = + exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; + byte[][] splits = getSplits(opts); - byte[][] splits = getSplits(opts); - for (int i=0; i < splits.length; i++) { - LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); - } - admin.createTable(tableDescriptor, splits); - LOG.info ("Table created with " + opts.presplitRegions + " splits"); + // recreate the table when user has requested presplit or when existing + // {RegionSplitPolicy,replica count} does not match requested. + if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) + || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy) + || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)) { + needsDelete = true; + // wait, why did it delete my table?!? + LOG.debug(Objects.toStringHelper("needsDelete") + .add("needsDelete", needsDelete) + .add("isReadCmd", isReadCmd) + .add("exists", exists) + .add("desc", desc) + .add("presplit", opts.presplitRegions) + .add("splitPolicy", opts.splitPolicy) + .add("replicas", opts.replicas)); } - else { - boolean tableExists = admin.tableExists(tableDescriptor.getTableName()); - if (!tableExists) { - admin.createTable(tableDescriptor); - LOG.info("Table " + tableDescriptor + " created"); + + // remove an existing table + if (needsDelete) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); } + admin.deleteTable(tableName); } - return admin.tableExists(tableDescriptor.getTableName()); + + // table creation is necessary + if (!exists || needsDelete) { + desc = getTableDescriptor(opts); + if (splits != null) { + if (LOG.isDebugEnabled()) { + for (int i = 0; i < splits.length; i++) { + LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); + } + } + } + admin.createTable(desc, splits); + LOG.info("Table " + desc + " created"); + } + return admin.tableExists(tableName); } /** @@ -304,6 +338,12 @@ public class PerformanceEvaluation extends Configured implements Tool { family.setInMemory(true); } desc.addFamily(family); + if (opts.replicas != DEFAULT_OPTS.replicas) { + desc.setRegionReplication(opts.replicas); + } + if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) { + desc.setRegionSplitPolicyClassName(opts.splitPolicy); + } return desc; } @@ -311,8 +351,8 @@ public class PerformanceEvaluation extends Configured implements Tool { * generates splits based on total number of rows and specified split regions */ protected static byte[][] getSplits(TestOptions opts) { - if (opts.presplitRegions == 0) - return new byte [0][]; + if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) + return null; int numSplitPoints = opts.presplitRegions - 1; byte[][] splits = new byte[numSplitPoints][]; @@ -329,8 +369,10 @@ public class PerformanceEvaluation extends Configured implements Tool { * @param cmd Command to run. * @throws IOException */ - private void doLocalClients(final Class cmd, final TestOptions opts) + static long doLocalClients(final TestOptions opts, final Configuration conf) throws IOException, InterruptedException { + final Class cmd = determineCommandClass(opts.cmdName); + assert cmd != null; Future[] threads = new Future[opts.numClientThreads]; long[] timings = new long[opts.numClientThreads]; ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, @@ -342,7 +384,8 @@ public class PerformanceEvaluation extends Configured implements Tool { public Long call() throws Exception { TestOptions threadOpts = new TestOptions(opts); if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; - long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() { + long elapsedTime = runOneClient(cmd, conf, threadOpts, new Status() { + @Override public void setStatus(final String msg) throws IOException { LOG.info(msg); } @@ -370,9 +413,10 @@ public class PerformanceEvaluation extends Configured implements Tool { total += timing; } LOG.info("[" + test + "]" - + "\tMin: " + timings[0] + "ms" - + "\tMax: " + timings[timings.length - 1] + "ms" - + "\tAvg: " + (total / timings.length) + "ms"); + + "\tMin: " + timings[0] + "ms" + + "\tMax: " + timings[timings.length - 1] + "ms" + + "\tAvg: " + (total / timings.length) + "ms"); + return total; } /* @@ -382,15 +426,16 @@ public class PerformanceEvaluation extends Configured implements Tool { * @param cmd Command to run. * @throws IOException */ - private void doMapReduce(final Class cmd, TestOptions opts) throws IOException, - InterruptedException, ClassNotFoundException { - Configuration conf = getConf(); + static Job doMapReduce(TestOptions opts, final Configuration conf) + throws IOException, InterruptedException, ClassNotFoundException { + final Class cmd = determineCommandClass(opts.cmdName); + assert cmd != null; Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); - conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); + conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); Job job = new Job(conf); job.setJarByClass(PerformanceEvaluation.class); - job.setJobName("HBase Performance Evaluation"); + job.setJobName("HBase Performance Evaluation - " + opts.cmdName); job.setInputFormatClass(NLineInputFormat.class); NLineInputFormat.setInputPaths(job, inputDir); @@ -410,12 +455,13 @@ public class PerformanceEvaluation extends Configured implements Tool { TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - Histogram.class, // yammer metrics + Histogram.class, // yammer metrics ObjectMapper.class); // jackson-mapper-asl TableMapReduceUtil.initCredentials(job); job.waitForCompletion(true); + return job; } /* @@ -424,7 +470,7 @@ public class PerformanceEvaluation extends Configured implements Tool { * @return Directory that contains file written. * @throws IOException */ - private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { + private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); Path inputDir = new Path(jobdir, "inputs"); @@ -491,6 +537,7 @@ public class PerformanceEvaluation extends Configured implements Tool { * This makes tracking all these arguments a little easier. */ static class TestOptions { + String cmdName = null; boolean nomapred = false; boolean filterAll = false; int startRow = 0; @@ -511,6 +558,8 @@ public class PerformanceEvaluation extends Configured implements Tool { int multiGet = 0; boolean inMemoryCF = false; int presplitRegions = 0; + int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; + String splitPolicy = null; Compression.Algorithm compression = Compression.Algorithm.NONE; BloomType bloomType = BloomType.ROW; DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; @@ -521,6 +570,7 @@ public class PerformanceEvaluation extends Configured implements Tool { public TestOptions() {} public TestOptions(TestOptions that) { + this.cmdName = that.cmdName; this.nomapred = that.nomapred; this.startRow = that.startRow; this.size = that.size; @@ -540,6 +590,8 @@ public class PerformanceEvaluation extends Configured implements Tool { this.multiGet = that.multiGet; this.inMemoryCF = that.inMemoryCF; this.presplitRegions = that.presplitRegions; + this.replicas = that.replicas; + this.splitPolicy = that.splitPolicy; this.compression = that.compression; this.blockEncoding = that.blockEncoding; this.filterAll = that.filterAll; @@ -712,7 +764,7 @@ public class PerformanceEvaluation extends Configured implements Tool { if (rs == null || !isRandomValueSize()) return; for (Result r: rs) updateValueSize(r); } - + void updateValueSize(final Result r) throws IOException { if (r == null || !isRandomValueSize()) return; int size = 0; @@ -731,7 +783,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() + (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport()); } - + boolean isRandomValueSize() { return opts.valueRandom; } @@ -825,7 +877,7 @@ public class PerformanceEvaluation extends Configured implements Tool { valueSize.count() + " measures"); reportHistogram(this.valueSize); } - + private void reportHistogram(final Histogram h) throws IOException { Snapshot sn = h.getSnapshot(); status.setStatus(testName + " Min = " + h.min()); @@ -997,10 +1049,12 @@ public class PerformanceEvaluation extends Configured implements Tool { } static class RandomReadTest extends Test { + private final Consistency consistency; private ArrayList gets; RandomReadTest(HConnection con, TestOptions options, Status status) { super(con, options, status); + consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; if (opts.multiGet > 0) { LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); this.gets = new ArrayList(opts.multiGet); @@ -1014,6 +1068,7 @@ public class PerformanceEvaluation extends Configured implements Tool { if (opts.filterAll) { get.setFilter(new FilterAllFilter()); } + get.setConsistency(consistency); if (LOG.isTraceEnabled()) LOG.trace(get.toString()); if (opts.multiGet > 0) { this.gets.add(get); @@ -1313,9 +1368,9 @@ public class PerformanceEvaluation extends Configured implements Tool { if (admin != null) admin.close(); } if (opts.nomapred) { - doLocalClients(cmd, opts); + doLocalClients(opts, getConf()); } else { - doMapReduce(cmd, opts); + doMapReduce(opts, getConf()); } } @@ -1368,6 +1423,10 @@ public class PerformanceEvaluation extends Configured implements Tool { "'valueSize'; set on read for stats on size: Default: Not set."); System.err.println(" period Report every 'period' rows: " + "Default: opts.perClientRunRows / 10"); + System.err.println(" multiGet Batch gets together into groups of N. Only supported " + + "by randomRead. Default: disabled"); + System.err.println(" replicas Enable region replica testing. Defaults: 1."); + System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); System.err.println(); System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" For example: "); @@ -1375,7 +1434,7 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" -Dmapreduce.task.timeout=60000"); System.err.println(); System.err.println("Command:"); - for (CmdDescriptor command : commands.values()) { + for (CmdDescriptor command : COMMANDS.values()) { System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); } System.err.println(); @@ -1389,17 +1448,203 @@ public class PerformanceEvaluation extends Configured implements Tool { + " sequentialWrite 1"); } - private static int getNumClients(final int start, final String[] args) { - if(start + 1 > args.length) { - throw new IllegalArgumentException("must supply the number of clients"); + /** + * Parse options passed in via an arguments array. Assumes that array has been split + * on white-space and placed into a {@code Queue}. Any unknown arguments will remain + * in the queue at the conclusion of this method call. It's up to the caller to deal + * with these unrecognized arguments. + */ + static TestOptions parseOpts(Queue args) { + TestOptions opts = new TestOptions(); + + String cmd = null; + while ((cmd = args.poll()) != null) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { + // place item back onto queue so that caller knows parsing was incomplete + args.add(cmd); + break; + } + + final String nmr = "--nomapred"; + if (cmd.startsWith(nmr)) { + opts.nomapred = true; + continue; + } + + final String rows = "--rows="; + if (cmd.startsWith(rows)) { + opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); + continue; + } + + final String sampleRate = "--sampleRate="; + if (cmd.startsWith(sampleRate)) { + opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); + continue; + } + + final String table = "--table="; + if (cmd.startsWith(table)) { + opts.tableName = cmd.substring(table.length()); + continue; + } + + final String startRow = "--startRow="; + if (cmd.startsWith(startRow)) { + opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); + continue; + } + + final String compress = "--compress="; + if (cmd.startsWith(compress)) { + opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); + continue; + } + + final String traceRate = "--traceRate="; + if (cmd.startsWith(traceRate)) { + opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); + continue; + } + + final String blockEncoding = "--blockEncoding="; + if (cmd.startsWith(blockEncoding)) { + opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); + continue; + } + + final String flushCommits = "--flushCommits="; + if (cmd.startsWith(flushCommits)) { + opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); + continue; + } + + final String writeToWAL = "--writeToWAL="; + if (cmd.startsWith(writeToWAL)) { + opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); + continue; + } + + final String presplit = "--presplit="; + if (cmd.startsWith(presplit)) { + opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); + continue; + } + + final String inMemory = "--inmemory="; + if (cmd.startsWith(inMemory)) { + opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); + continue; + } + + final String autoFlush = "--autoFlush="; + if (cmd.startsWith(autoFlush)) { + opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); + continue; + } + + final String onceCon = "--oneCon="; + if (cmd.startsWith(onceCon)) { + opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); + continue; + } + + final String latency = "--latency"; + if (cmd.startsWith(latency)) { + opts.reportLatency = true; + continue; + } + + final String multiGet = "--multiGet="; + if (cmd.startsWith(multiGet)) { + opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); + continue; + } + + final String useTags = "--usetags="; + if (cmd.startsWith(useTags)) { + opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); + continue; + } + + final String noOfTags = "--nooftags="; + if (cmd.startsWith(noOfTags)) { + opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); + continue; + } + + final String replicas = "--replicas="; + if (cmd.startsWith(replicas)) { + opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); + continue; + } + + final String filterOutAll = "--filterAll"; + if (cmd.startsWith(filterOutAll)) { + opts.filterAll = true; + continue; + } + + final String size = "--size="; + if (cmd.startsWith(size)) { + opts.size = Float.parseFloat(cmd.substring(size.length())); + continue; + } + + final String splitPolicy = "--splitPolicy="; + if (cmd.startsWith(splitPolicy)) { + opts.splitPolicy = cmd.substring(splitPolicy.length()); + continue; + } + + final String bloomFilter = "--bloomFilter"; + if (cmd.startsWith(bloomFilter)) { + opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); + continue; + } + + final String valueSize = "--valueSize="; + if (cmd.startsWith(valueSize)) { + opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); + continue; + } + + final String valueRandom = "--valueRandom"; + if (cmd.startsWith(valueRandom)) { + opts.valueRandom = true; + continue; + } + + final String period = "--period="; + if (cmd.startsWith(period)) { + opts.period = Integer.parseInt(cmd.substring(period.length())); + continue; + } + + if (isCommandClass(cmd)) { + opts.cmdName = cmd; + opts.numClientThreads = Integer.parseInt(args.remove()); + int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize); + if (opts.size != DEFAULT_OPTS.size && + opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { + throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments."); + } + if (opts.size != DEFAULT_OPTS.size) { + // total size in GB specified + opts.totalRows = (int) opts.size * rowsPerGB; + opts.perClientRunRows = opts.totalRows / opts.numClientThreads; + } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { + // number of rows specified + opts.totalRows = opts.perClientRunRows * opts.numClientThreads; + opts.size = opts.totalRows / rowsPerGB; + } + break; + } } - int N = Integer.parseInt(args[start]); - if (N < 1) { - throw new IllegalArgumentException("Number of clients must be > 1"); - } - return N; + return opts; } + @Override public int run(String[] args) throws Exception { // Process command-line args. TODO: Better cmd-line processing // (but hopefully something not as painful as cli options). @@ -1410,194 +1655,27 @@ public class PerformanceEvaluation extends Configured implements Tool { } try { - // MR-NOTE: if you are adding a property that is used to control an operation - // like put(), get(), scan(), ... you must also add it as part of the MR - // input, take a look at writeInputFile(). - // Then you must adapt the LINE_PATTERN input regex, - // and parse the argument, take a look at PEInputFormat.getSplits(). - - TestOptions opts = new TestOptions(); - - for (int i = 0; i < args.length; i++) { - String cmd = args[i]; - if (cmd.equals("-h") || cmd.startsWith("--h")) { - printUsage(); - errCode = 0; - break; - } - - final String nmr = "--nomapred"; - if (cmd.startsWith(nmr)) { - opts.nomapred = true; - continue; - } - - final String rows = "--rows="; - if (cmd.startsWith(rows)) { - opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); - continue; - } - - final String startRow = "--startRow="; - if (cmd.startsWith(startRow)) { - opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); - continue; - } - - final String sampleRate = "--sampleRate="; - if (cmd.startsWith(sampleRate)) { - opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); - continue; - } - - final String traceRate = "--traceRate="; - if (cmd.startsWith(traceRate)) { - opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); - continue; - } - - final String table = "--table="; - if (cmd.startsWith(table)) { - opts.tableName = cmd.substring(table.length()); - continue; - } - - final String compress = "--compress="; - if (cmd.startsWith(compress)) { - opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); - continue; - } - - final String blockEncoding = "--blockEncoding="; - if (cmd.startsWith(blockEncoding)) { - opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); - continue; - } - - final String flushCommits = "--flushCommits="; - if (cmd.startsWith(flushCommits)) { - opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); - continue; - } - - final String writeToWAL = "--writeToWAL="; - if (cmd.startsWith(writeToWAL)) { - opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); - continue; - } - - final String autoFlush = "--autoFlush="; - if (cmd.startsWith(autoFlush)) { - opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); - continue; - } - - final String onceCon = "--oneCon="; - if (cmd.startsWith(onceCon)) { - opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); - continue; - } - - final String presplit = "--presplit="; - if (cmd.startsWith(presplit)) { - opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); - continue; - } - - final String inMemory = "--inmemory="; - if (cmd.startsWith(inMemory)) { - opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); - continue; - } - - final String latency = "--latency"; - if (cmd.startsWith(latency)) { - opts.reportLatency = true; - continue; - } - - final String multiGet = "--multiGet="; - if (cmd.startsWith(multiGet)) { - opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); - continue; - } - - final String useTags = "--usetags="; - if (cmd.startsWith(useTags)) { - opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); - continue; - } - - final String noOfTags = "--nooftags="; - if (cmd.startsWith(noOfTags)) { - opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); - continue; - } - - final String filterOutAll = "--filterAll"; - if (cmd.startsWith(filterOutAll)) { - opts.filterAll = true; - continue; - } - - final String size = "--size="; - if (cmd.startsWith(size)) { - opts.size = Float.parseFloat(cmd.substring(size.length())); - continue; - } - - final String bloomFilter = "--bloomFilter"; - if (cmd.startsWith(bloomFilter)) { - opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); - continue; - } - - final String valueSize = "--valueSize="; - if (cmd.startsWith(valueSize)) { - opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); - continue; - } - - final String valueRandom = "--valueRandom"; - if (cmd.startsWith(valueRandom)) { - opts.valueRandom = true; - continue; - } - - final String period = "--period="; - if (cmd.startsWith(period)) { - opts.period = Integer.parseInt(cmd.substring(period.length())); - continue; - } - - Class cmdClass = determineCommandClass(cmd); - if (cmdClass != null) { - opts.numClientThreads = getNumClients(i + 1, args); - if (opts.size != DEFAULT_OPTS.size && - opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { - throw new IllegalArgumentException(rows + " and " + size + - " are mutually exclusive arguments."); - } - // Calculate how many rows per gig. If random value size presume that that half the max - // is average row size. - int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize); - if (opts.size != DEFAULT_OPTS.size) { - // total size in GB specified - opts.totalRows = (int) opts.size * rowsPerGB; - opts.perClientRunRows = opts.totalRows / opts.numClientThreads; - } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { - // number of rows specified - opts.totalRows = opts.perClientRunRows * opts.numClientThreads; - opts.size = opts.totalRows / rowsPerGB; - } - runTest(cmdClass, opts); - errCode = 0; - break; - } + LinkedList argv = new LinkedList(); + argv.addAll(Arrays.asList(args)); + TestOptions opts = parseOpts(argv); + // args remainting, print help and exit + if (!argv.isEmpty()) { + errCode = 0; printUsage(); - break; } + + // must run at least 1 client + if (opts.numClientThreads <= 0) { + throw new IllegalArgumentException("Number of clients must be > 0"); + } + + Class cmdClass = determineCommandClass(opts.cmdName); + if (cmdClass != null) { + runTest(cmdClass, opts); + errCode = 0; + } + } catch (Exception e) { e.printStackTrace(); } @@ -1605,8 +1683,12 @@ public class PerformanceEvaluation extends Configured implements Tool { return errCode; } - private Class determineCommandClass(String cmd) { - CmdDescriptor descriptor = commands.get(cmd); + private static boolean isCommandClass(String cmd) { + return COMMANDS.containsKey(cmd); + } + + private static Class determineCommandClass(String cmd) { + CmdDescriptor descriptor = COMMANDS.get(cmd); return descriptor != null ? descriptor.getCmdClass() : null; }