HBASE-17994 Add async client test to Performance Evaluation tool
This commit is contained in:
@ -53,6 +53,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RawAsyncTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
@ -99,9 +102,9 @@ import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.htrace.impl.ProbabilitySampler;
import org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.UniformReservoir;
@ -153,6 +156,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
static {
addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
"Run async random read test");
addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
"Run async random write test");
addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
"Run async sequential read test");
addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
"Run async sequential write test");
addCommandDescriptor(AsyncScanTest.class, "asyncScan",
"Run async scan test (read every row)");
addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
"Run random read test");
addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
@ -226,7 +239,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
String name, String description) {
CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
COMMANDS.put(name, cmdDescriptor);
@ -295,9 +308,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
Configuration conf = HBaseConfiguration.create(context.getConfiguration());
final Connection con = ConnectionFactory.createConnection(conf);
AsyncConnection asyncCon = null;
try {
asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
} catch (ExecutionException e) {
throw new IOException(e);
// Evaluation task
RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status);
RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
// Collect how much time the thing took. Report as map output and
// to the ELAPSED_TIME counter.
@ -412,8 +431,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
* Run all clients in this vm each to its own thread.
static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
throws IOException, InterruptedException {
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
throws IOException, InterruptedException, ExecutionException {
final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
assert cmd != null;
Future<RunResult>[] threads = new Future[opts.numClientThreads];
@ -421,6 +440,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
final Connection con = ConnectionFactory.createConnection(conf);
final AsyncConnection asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
for (int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = pool.submit(new Callable<RunResult>() {
@ -428,7 +448,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
public RunResult call() throws Exception {
TestOptions threadOpts = new TestOptions(opts);
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() {
RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
public void setStatus(final String msg) throws IOException {
@ -463,6 +483,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
+ "\tAvg: " + (total / results.length) + "ms");
return results;
@ -476,7 +497,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
static Job doMapReduce(TestOptions opts, final Configuration conf)
throws IOException, InterruptedException, ClassNotFoundException {
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
assert cmd != null;
Path inputDir = writeInputFile(conf, opts);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
@ -567,17 +588,17 @@ public class PerformanceEvaluation extends Configured implements Tool {
* Describes a command.
static class CmdDescriptor {
private Class<? extends Test> cmdClass;
private Class<? extends TestBase> cmdClass;
private String name;
private String description;
CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
this.cmdClass = cmdClass;
this.name = name;
this.description = description;
public Class<? extends Test> getCmdClass() {
public Class<? extends TestBase> getCmdClass() {
return cmdClass;
@ -1001,7 +1022,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* A test.
* Subclass to particularize what happens per row.
static abstract class Test {
static abstract class TestBase {
// Below is make it so when Tests are all running in the one
// jvm, that they each have a differently seeded Random.
private static final Random randomSeed = new Random(System.currentTimeMillis());
@ -1018,8 +1039,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
private final Status status;
private final Sampler<?> traceSampler;
private final SpanReceiverHost receiverHost;
protected Connection connection;
// protected Table table;
private String testName;
private Histogram latencyHistogram;
@ -1030,9 +1049,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
* Note that all subclasses of this class must provide a public constructor
* that has the exact same list of arguments.
Test(final Connection con, final TestOptions options, final Status status) {
this.connection = con;
this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration();
TestBase(final Configuration conf, final TestOptions options, final Status status) {
this.conf = conf;
this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
this.opts = options;
this.status = status;
@ -1098,14 +1116,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testSetup() throws IOException {
if (!opts.oneCon) {
this.connection = ConnectionFactory.createConnection(conf);
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
abstract void createConnection() throws IOException;
abstract void onStartup() throws IOException;
void testTakedown() throws IOException {
@ -1124,14 +1142,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
if (!opts.oneCon) {
abstract void onTakedown() throws IOException;
abstract void closeConnection() throws IOException;
* Run test
* @return Elapsed time.
@ -1211,6 +1229,56 @@ public class PerformanceEvaluation extends Configured implements Tool {
abstract void testRow(final int i) throws IOException, InterruptedException;
static abstract class Test extends TestBase {
protected Connection connection;
Test(final Connection con, final TestOptions options, final Status status) {
super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
this.connection = con;
void createConnection() throws IOException {
if (!opts.isOneCon()) {
this.connection = ConnectionFactory.createConnection(conf);
void closeConnection() throws IOException {
if (!opts.isOneCon()) {
static abstract class AsyncTest extends TestBase {
protected AsyncConnection connection;
AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
this.connection = con;
void createConnection() {
if (!opts.isOneCon()) {
try {
this.connection = ConnectionFactory.createAsyncConnection(conf).get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to create async connection", e);
void closeConnection() throws IOException {
if (!opts.isOneCon()) {
static abstract class TableTest extends Test {
protected Table table;
@ -1229,6 +1297,242 @@ public class PerformanceEvaluation extends Configured implements Tool {
static abstract class AsyncTableTest extends AsyncTest {
protected RawAsyncTable table;
AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
super(con, options, status);
void onStartup() throws IOException {
this.table = connection.getRawTable(TableName.valueOf(opts.tableName));
void onTakedown() throws IOException {
static class AsyncRandomReadTest extends AsyncTableTest {
private final Consistency consistency;
private ArrayList<Get> gets;
private Random rd = new Random();
AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
super(con, options, status);
consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
if (opts.multiGet > 0) {
LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
this.gets = new ArrayList<>(opts.multiGet);
void testRow(final int i) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Get get = new Get(getRandomRow(this.rand, opts.totalRows));
if (opts.addColumns) {
} else {
if (opts.filterAll) {
get.setFilter(new FilterAllFilter());
if (LOG.isTraceEnabled()) LOG.trace(get.toString());
try {
if (opts.multiGet > 0) {
if (this.gets.size() == opts.multiGet) {
Result[] rs =
this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
} else {
} catch (ExecutionException e) {
throw new IOException(e);
public static RuntimeException runtime(Throwable e) {
if (e instanceof RuntimeException) {
return (RuntimeException) e;
return new RuntimeException(e);
public static <V> V propagate(Callable<V> callable) {
try {
return callable.call();
} catch (Exception e) {
throw runtime(e);
protected int getReportingPeriod() {
int period = opts.perClientRunRows / 10;
return period == 0 ? opts.perClientRunRows : period;
protected void testTakedown() throws IOException {
if (this.gets != null && this.gets.size() > 0) {
static class AsyncRandomWriteTest extends AsyncTableTest {
AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
super(con, options, status);
void testRow(final int i) throws IOException, InterruptedException {
byte[] row = getRandomRow(this.rand, opts.totalRows);
Put put = new Put(row);
for (int column = 0; column < opts.columns; column++) {
byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
byte[] value = generateData(this.rand, getValueLength(this.rand));
if (opts.useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
Tag[] tags = new Tag[opts.noOfTags];
for (int n = 0; n < opts.noOfTags; n++) {
Tag t = new ArrayBackedTag((byte) n, tag);
tags[n] = t;
KeyValue kv =
new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
} else {
put.addColumn(FAMILY_NAME, qualifier, value);
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
try {
} catch (ExecutionException e) {
throw new IOException(e);
static class AsyncScanTest extends AsyncTableTest {
private ResultScanner testScanner;
private AsyncTable asyncTable;
AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
super(con, options, status);
void onStartup() throws IOException {
this.asyncTable =
void testTakedown() throws IOException {
if (this.testScanner != null) {
void testRow(final int i) throws IOException {
if (this.testScanner == null) {
Scan scan =
new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
if (opts.addColumns) {
} else {
if (opts.filterAll) {
scan.setFilter(new FilterAllFilter());
this.testScanner = asyncTable.getScanner(scan);
Result r = testScanner.next();
static class AsyncSequentialReadTest extends AsyncTableTest {
AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
super(con, options, status);
void testRow(final int i) throws IOException, InterruptedException {
Get get = new Get(format(i));
if (opts.addColumns) {
if (opts.filterAll) {
get.setFilter(new FilterAllFilter());
try {
} catch (ExecutionException e) {
throw new IOException(e);
static class AsyncSequentialWriteTest extends AsyncTableTest {
AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
super(con, options, status);
void testRow(final int i) throws IOException, InterruptedException {
byte[] row = format(i);
Put put = new Put(row);
for (int column = 0; column < opts.columns; column++) {
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
byte[] value = generateData(this.rand, getValueLength(this.rand));
if (opts.useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
Tag[] tags = new Tag[opts.noOfTags];
for (int n = 0; n < opts.noOfTags; n++) {
Tag t = new ArrayBackedTag((byte) n, tag);
tags[n] = t;
KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
value, tags);
} else {
put.addColumn(FAMILY_NAME, qualifier, value);
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
try {
} catch (ExecutionException e) {
throw new IOException(e);
static abstract class BufferedMutatorTest extends Test {
protected BufferedMutator mutator;
protected Table table;
@ -1789,23 +2093,31 @@ public class PerformanceEvaluation extends Configured implements Tool {
return random.nextInt(Integer.MAX_VALUE) % totalRows;
static RunResult runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con,
TestOptions opts, final Status status)
static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
throws IOException, InterruptedException {
status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
opts.perClientRunRows + " rows");
status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
+ opts.perClientRunRows + " rows");
long totalElapsedTime;
final Test t;
final TestBase t;
try {
Constructor<? extends Test> constructor =
cmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
t = constructor.newInstance(con, opts, status);
if (AsyncTest.class.isAssignableFrom(cmd)) {
Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
Constructor<? extends AsyncTest> constructor =
newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
t = constructor.newInstance(asyncCon, opts, status);
} else {
Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
Constructor<? extends Test> constructor =
newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
t = constructor.newInstance(con, opts, status);
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Invalid command class: " +
cmd.getName() + ". It does not provide a constructor as described by " +
"the javadoc comment. Available constructors are: " +
throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
+ ". It does not provide a constructor as described by "
+ "the javadoc comment. Available constructors are: "
+ Arrays.toString(cmd.getConstructors()));
} catch (Exception e) {
throw new IllegalStateException("Failed to construct command class", e);
@ -1823,8 +2135,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
return opts.valueRandom? opts.valueSize/2: opts.valueSize;
private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
InterruptedException, ClassNotFoundException {
private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
InterruptedException, ClassNotFoundException, ExecutionException {
// Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
// the TestOptions introspection for us and dump the output in a readable format.
LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
@ -1944,7 +2256,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
for (CmdDescriptor command : COMMANDS.values()) {
System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
@ -2285,7 +2597,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
return errCode;
Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
if (cmdClass != null) {
runTest(cmdClass, opts);
errCode = 0;
@ -2302,7 +2614,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
return COMMANDS.containsKey(cmd);
private static Class<? extends Test> determineCommandClass(String cmd) {
private static Class<? extends TestBase> determineCommandClass(String cmd) {
CmdDescriptor descriptor = COMMANDS.get(cmd);
return descriptor != null ? descriptor.getCmdClass() : null;
Reference in New Issue
Block a user