diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index 57d2fb57861..28b42505bde 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -41,8 +41,8 @@ import org.apache.hadoop.util.ToolRunner; @InterfaceAudience.Private public abstract class AbstractHBaseTool implements Tool { - private static final int EXIT_SUCCESS = 0; - private static final int EXIT_FAILURE = 1; + protected static final int EXIT_SUCCESS = 0; + protected static final int EXIT_FAILURE = 1; private static final String SHORT_HELP_OPTION = "h"; private static final String LONG_HELP_OPTION = "help"; @@ -54,6 +54,8 @@ public abstract class AbstractHBaseTool implements Tool { protected Configuration conf = null; private static final Set requiredOptions = new TreeSet(); + + protected String[] cmdLineArgs = null; /** * Override this to add command-line options using {@link #addOptWithArg} @@ -90,6 +92,7 @@ public abstract class AbstractHBaseTool implements Tool { try { // parse the command line arguments cmd = parseArgs(args); + cmdLineArgs = args; } catch (ParseException e) { LOG.error("Error when parsing command-line arguemnts", e); printUsage(); @@ -125,14 +128,14 @@ public abstract class AbstractHBaseTool implements Tool { return success; } - private CommandLine parseArgs(String[] args) throws ParseException { + protected CommandLine parseArgs(String[] args) throws ParseException { options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage"); addOptions(); CommandLineParser parser = new BasicParser(); return parser.parse(options, args); } - private void printUsage() { + protected void printUsage() { HelpFormatter helpFormatter = new HelpFormatter(); helpFormatter.setWidth(80); String usageHeader = "Options:"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 37f431b2383..eb7fb46894c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -17,11 +17,17 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -32,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.ToolRunner; /** * A command-line utility that reads, writes, and verifies data. Unlike @@ -102,6 +109,7 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_ZK_QUORUM = "zk"; protected static final String OPT_SKIP_INIT = "skip_init"; protected static final String OPT_INIT_ONLY = "init_only"; + private static final String NUM_TABLES = "num_tables"; protected static final long DEFAULT_START_KEY = 0; @@ -128,10 +136,12 @@ public class LoadTestTool extends AbstractHBaseTool { protected boolean isMultiPut; // Reader options - protected int numReaderThreads = DEFAULT_NUM_THREADS; - protected int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; - protected int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; - protected int verifyPercent; + private int numReaderThreads = DEFAULT_NUM_THREADS; + private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; + private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; + private int verifyPercent; + + private int numTables = 1; // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, // console tool itself should only be used from console. @@ -223,6 +233,11 @@ public class LoadTestTool extends AbstractHBaseTool { DEFAULT_START_KEY + "."); addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists"); + + addOptWithArg(NUM_TABLES, + "A positive integer number. When a number n is speicfied, load test " + + "tool will load n table parallely. -tn parameter value becomes " + + "table name prefix. Each table name is in format _1..._n"); } @Override @@ -308,6 +323,11 @@ public class LoadTestTool extends AbstractHBaseTool { System.out.println("Percent of keys to verify: " + verifyPercent); System.out.println("Reader threads: " + numReaderThreads); } + + numTables = 1; + if(cmd.hasOption(NUM_TABLES)) { + numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE); + } } private void parseColumnFamilyOptions(CommandLine cmd) { @@ -339,6 +359,14 @@ public class LoadTestTool extends AbstractHBaseTool { @Override protected int doWork() throws IOException { + if (numTables > 1) { + return parallelLoadTables(); + } else { + return loadTable(); + } + } + + protected int loadTable() throws IOException { if (cmd.hasOption(OPT_ZK_QUORUM)) { conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); } @@ -399,11 +427,112 @@ public class LoadTestTool extends AbstractHBaseTool { success = success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0; } - return success ? 0 : 1; + return success ? EXIT_SUCCESS : this.EXIT_FAILURE; } - + public static void main(String[] args) { new LoadTestTool().doStaticMain(args); } + /** + * When NUM_TABLES is specified, the function starts multiple worker threads + * which individually start a LoadTestTool instance to load a table. Each + * table name is in format _. For example, "-tn test -num_tables 2" + * , table names will be "test_1", "test_2" + * + * @throws IOException + */ + private int parallelLoadTables() + throws IOException { + // create new command args + String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); + String[] newArgs = null; + if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { + newArgs = new String[cmdLineArgs.length + 2]; + newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; + for (int i = 0; i < cmdLineArgs.length; i++) { + newArgs[i + 2] = cmdLineArgs[i]; + } + } else { + newArgs = cmdLineArgs; + } + + int tableNameValueIndex = -1; + for (int j = 0; j < newArgs.length; j++) { + if (newArgs[j].endsWith(OPT_TABLE_NAME)) { + tableNameValueIndex = j + 1; + } else if (newArgs[j].endsWith(NUM_TABLES)) { + // change NUM_TABLES to 1 so that each worker loads one table + newArgs[j + 1] = "1"; + } + } + + // starting to load multiple tables + List workers = new ArrayList(); + for (int i = 0; i < numTables; i++) { + String[] workerArgs = newArgs.clone(); + workerArgs[tableNameValueIndex] = tableName + "_" + (i+1); + WorkerThread worker = new WorkerThread(i, workerArgs); + workers.add(worker); + LOG.info(worker + " starting"); + worker.start(); + } + + // wait for all workers finish + LOG.info("Waiting for worker threads to finish"); + for (WorkerThread t : workers) { + try { + t.join(); + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + checkForErrors(); + } + + return EXIT_SUCCESS; + } + + // If an exception is thrown by one of worker threads, it will be + // stored here. + protected AtomicReference thrown = new AtomicReference(); + + private void workerThreadError(Throwable t) { + thrown.compareAndSet(null, t); + } + + /** + * Check for errors in the writer threads. If any is found, rethrow it. + */ + private void checkForErrors() throws IOException { + Throwable thrown = this.thrown.get(); + if (thrown == null) return; + if (thrown instanceof IOException) { + throw (IOException) thrown; + } else { + throw new RuntimeException(thrown); + } + } + + class WorkerThread extends Thread { + private String[] workerArgs; + + WorkerThread(int i, String[] args) { + super("WorkerThread-" + i); + workerArgs = args; + } + + public void run() { + try { + int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); + if (ret != 0) { + throw new RuntimeException("LoadTestTool exit with non-zero return code."); + } + } catch (Exception ex) { + LOG.error("Error in worker thread", ex); + workerThreadError(ex); + } + } + } }