hbase-8816: Add support of loading multiple tables into LoadTestTool

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1510056 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jeffreyz 2013-08-03 18:39:56 +00:00
parent 0108ff5323
commit 6b26bb0a8e
2 changed files with 142 additions and 10 deletions

View File

@ -41,8 +41,8 @@ import org.apache.hadoop.util.ToolRunner;
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class AbstractHBaseTool implements Tool { public abstract class AbstractHBaseTool implements Tool {
private static final int EXIT_SUCCESS = 0; protected static final int EXIT_SUCCESS = 0;
private static final int EXIT_FAILURE = 1; protected static final int EXIT_FAILURE = 1;
private static final String SHORT_HELP_OPTION = "h"; private static final String SHORT_HELP_OPTION = "h";
private static final String LONG_HELP_OPTION = "help"; private static final String LONG_HELP_OPTION = "help";
@ -54,6 +54,8 @@ public abstract class AbstractHBaseTool implements Tool {
protected Configuration conf = null; protected Configuration conf = null;
private static final Set<String> requiredOptions = new TreeSet<String>(); private static final Set<String> requiredOptions = new TreeSet<String>();
protected String[] cmdLineArgs = null;
/** /**
* Override this to add command-line options using {@link #addOptWithArg} * Override this to add command-line options using {@link #addOptWithArg}
@ -90,6 +92,7 @@ public abstract class AbstractHBaseTool implements Tool {
try { try {
// parse the command line arguments // parse the command line arguments
cmd = parseArgs(args); cmd = parseArgs(args);
cmdLineArgs = args;
} catch (ParseException e) { } catch (ParseException e) {
LOG.error("Error when parsing command-line arguemnts", e); LOG.error("Error when parsing command-line arguemnts", e);
printUsage(); printUsage();
@ -125,14 +128,14 @@ public abstract class AbstractHBaseTool implements Tool {
return success; return success;
} }
private CommandLine parseArgs(String[] args) throws ParseException { protected CommandLine parseArgs(String[] args) throws ParseException {
options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage"); options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage");
addOptions(); addOptions();
CommandLineParser parser = new BasicParser(); CommandLineParser parser = new BasicParser();
return parser.parse(options, args); return parser.parse(options, args);
} }
private void printUsage() { protected void printUsage() {
HelpFormatter helpFormatter = new HelpFormatter(); HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setWidth(80); helpFormatter.setWidth(80);
String usageHeader = "Options:"; String usageHeader = "Options:";

View File

@ -17,11 +17,17 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -32,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.ToolRunner;
/** /**
* A command-line utility that reads, writes, and verifies data. Unlike * A command-line utility that reads, writes, and verifies data. Unlike
@ -102,6 +109,7 @@ public class LoadTestTool extends AbstractHBaseTool {
protected static final String OPT_ZK_QUORUM = "zk"; protected static final String OPT_ZK_QUORUM = "zk";
protected static final String OPT_SKIP_INIT = "skip_init"; protected static final String OPT_SKIP_INIT = "skip_init";
protected static final String OPT_INIT_ONLY = "init_only"; protected static final String OPT_INIT_ONLY = "init_only";
private static final String NUM_TABLES = "num_tables";
protected static final long DEFAULT_START_KEY = 0; protected static final long DEFAULT_START_KEY = 0;
@ -128,10 +136,12 @@ public class LoadTestTool extends AbstractHBaseTool {
protected boolean isMultiPut; protected boolean isMultiPut;
// Reader options // Reader options
protected int numReaderThreads = DEFAULT_NUM_THREADS; private int numReaderThreads = DEFAULT_NUM_THREADS;
protected int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
protected int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
protected int verifyPercent; private int verifyPercent;
private int numTables = 1;
// TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
// console tool itself should only be used from console. // console tool itself should only be used from console.
@ -223,6 +233,11 @@ public class LoadTestTool extends AbstractHBaseTool {
DEFAULT_START_KEY + "."); DEFAULT_START_KEY + ".");
addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
+ "already exists"); + "already exists");
addOptWithArg(NUM_TABLES,
"A positive integer number. When a number n is speicfied, load test "
+ "tool will load n table parallely. -tn parameter value becomes "
+ "table name prefix. Each table name is in format <tn>_1...<tn>_n");
} }
@Override @Override
@ -308,6 +323,11 @@ public class LoadTestTool extends AbstractHBaseTool {
System.out.println("Percent of keys to verify: " + verifyPercent); System.out.println("Percent of keys to verify: " + verifyPercent);
System.out.println("Reader threads: " + numReaderThreads); System.out.println("Reader threads: " + numReaderThreads);
} }
numTables = 1;
if(cmd.hasOption(NUM_TABLES)) {
numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
}
} }
private void parseColumnFamilyOptions(CommandLine cmd) { private void parseColumnFamilyOptions(CommandLine cmd) {
@ -339,6 +359,14 @@ public class LoadTestTool extends AbstractHBaseTool {
@Override @Override
protected int doWork() throws IOException { protected int doWork() throws IOException {
if (numTables > 1) {
return parallelLoadTables();
} else {
return loadTable();
}
}
protected int loadTable() throws IOException {
if (cmd.hasOption(OPT_ZK_QUORUM)) { if (cmd.hasOption(OPT_ZK_QUORUM)) {
conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
} }
@ -399,11 +427,112 @@ public class LoadTestTool extends AbstractHBaseTool {
success = success && readerThreads.getNumReadErrors() == 0 success = success && readerThreads.getNumReadErrors() == 0
&& readerThreads.getNumReadFailures() == 0; && readerThreads.getNumReadFailures() == 0;
} }
return success ? 0 : 1; return success ? EXIT_SUCCESS : this.EXIT_FAILURE;
} }
public static void main(String[] args) { public static void main(String[] args) {
new LoadTestTool().doStaticMain(args); new LoadTestTool().doStaticMain(args);
} }
/**
* When NUM_TABLES is specified, the function starts multiple worker threads
* which individually start a LoadTestTool instance to load a table. Each
* table name is in format <tn>_<index>. For example, "-tn test -num_tables 2"
* , table names will be "test_1", "test_2"
*
* @throws IOException
*/
private int parallelLoadTables()
throws IOException {
// create new command args
String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
String[] newArgs = null;
if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
newArgs = new String[cmdLineArgs.length + 2];
newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
for (int i = 0; i < cmdLineArgs.length; i++) {
newArgs[i + 2] = cmdLineArgs[i];
}
} else {
newArgs = cmdLineArgs;
}
int tableNameValueIndex = -1;
for (int j = 0; j < newArgs.length; j++) {
if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
tableNameValueIndex = j + 1;
} else if (newArgs[j].endsWith(NUM_TABLES)) {
// change NUM_TABLES to 1 so that each worker loads one table
newArgs[j + 1] = "1";
}
}
// starting to load multiple tables
List<WorkerThread> workers = new ArrayList<WorkerThread>();
for (int i = 0; i < numTables; i++) {
String[] workerArgs = newArgs.clone();
workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
WorkerThread worker = new WorkerThread(i, workerArgs);
workers.add(worker);
LOG.info(worker + " starting");
worker.start();
}
// wait for all workers finish
LOG.info("Waiting for worker threads to finish");
for (WorkerThread t : workers) {
try {
t.join();
} catch (InterruptedException ie) {
IOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
checkForErrors();
}
return EXIT_SUCCESS;
}
// If an exception is thrown by one of worker threads, it will be
// stored here.
protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
private void workerThreadError(Throwable t) {
thrown.compareAndSet(null, t);
}
/**
* Check for errors in the writer threads. If any is found, rethrow it.
*/
private void checkForErrors() throws IOException {
Throwable thrown = this.thrown.get();
if (thrown == null) return;
if (thrown instanceof IOException) {
throw (IOException) thrown;
} else {
throw new RuntimeException(thrown);
}
}
class WorkerThread extends Thread {
private String[] workerArgs;
WorkerThread(int i, String[] args) {
super("WorkerThread-" + i);
workerArgs = args;
}
public void run() {
try {
int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
if (ret != 0) {
throw new RuntimeException("LoadTestTool exit with non-zero return code.");
}
} catch (Exception ex) {
LOG.error("Error in worker thread", ex);
workerThreadError(ex);
}
}
}
} }