HBASE-17994 Add async client test to Performance Evaluation tool
This commit is contained in:
parent
9da4e6906e
commit
2a9cdd5e75
|
@ -53,6 +53,8 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
|
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.AsyncTable;
|
||||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RawAsyncTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.RowMutations;
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
|
@ -99,9 +102,9 @@ import org.apache.htrace.Sampler;
|
||||||
import org.apache.htrace.Trace;
|
import org.apache.htrace.Trace;
|
||||||
import org.apache.htrace.TraceScope;
|
import org.apache.htrace.TraceScope;
|
||||||
import org.apache.htrace.impl.ProbabilitySampler;
|
import org.apache.htrace.impl.ProbabilitySampler;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects;
|
import org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
import com.codahale.metrics.Histogram;
|
import com.codahale.metrics.Histogram;
|
||||||
import com.codahale.metrics.UniformReservoir;
|
import com.codahale.metrics.UniformReservoir;
|
||||||
|
|
||||||
|
@ -153,6 +156,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
|
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
|
||||||
|
"Run async random read test");
|
||||||
|
addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
|
||||||
|
"Run async random write test");
|
||||||
|
addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
|
||||||
|
"Run async sequential read test");
|
||||||
|
addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
|
||||||
|
"Run async sequential write test");
|
||||||
|
addCommandDescriptor(AsyncScanTest.class, "asyncScan",
|
||||||
|
"Run async scan test (read every row)");
|
||||||
addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
|
addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
|
||||||
"Run random read test");
|
"Run random read test");
|
||||||
addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
|
addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
|
||||||
|
@ -226,7 +239,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
super(conf);
|
super(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
|
protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
|
||||||
String name, String description) {
|
String name, String description) {
|
||||||
CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
|
CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
|
||||||
COMMANDS.put(name, cmdDescriptor);
|
COMMANDS.put(name, cmdDescriptor);
|
||||||
|
@ -295,9 +308,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
|
TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
|
||||||
Configuration conf = HBaseConfiguration.create(context.getConfiguration());
|
Configuration conf = HBaseConfiguration.create(context.getConfiguration());
|
||||||
final Connection con = ConnectionFactory.createConnection(conf);
|
final Connection con = ConnectionFactory.createConnection(conf);
|
||||||
|
AsyncConnection asyncCon = null;
|
||||||
|
try {
|
||||||
|
asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
// Evaluation task
|
// Evaluation task
|
||||||
RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status);
|
RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
|
||||||
// Collect how much time the thing took. Report as map output and
|
// Collect how much time the thing took. Report as map output and
|
||||||
// to the ELAPSED_TIME counter.
|
// to the ELAPSED_TIME counter.
|
||||||
context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
|
context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
|
||||||
|
@ -412,8 +431,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
* Run all clients in this vm each to its own thread.
|
* Run all clients in this vm each to its own thread.
|
||||||
*/
|
*/
|
||||||
static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
|
static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, ExecutionException {
|
||||||
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
|
final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
|
||||||
assert cmd != null;
|
assert cmd != null;
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Future<RunResult>[] threads = new Future[opts.numClientThreads];
|
Future<RunResult>[] threads = new Future[opts.numClientThreads];
|
||||||
|
@ -421,6 +440,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
|
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
|
||||||
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
|
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
|
||||||
final Connection con = ConnectionFactory.createConnection(conf);
|
final Connection con = ConnectionFactory.createConnection(conf);
|
||||||
|
final AsyncConnection asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
|
||||||
for (int i = 0; i < threads.length; i++) {
|
for (int i = 0; i < threads.length; i++) {
|
||||||
final int index = i;
|
final int index = i;
|
||||||
threads[i] = pool.submit(new Callable<RunResult>() {
|
threads[i] = pool.submit(new Callable<RunResult>() {
|
||||||
|
@ -428,7 +448,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
public RunResult call() throws Exception {
|
public RunResult call() throws Exception {
|
||||||
TestOptions threadOpts = new TestOptions(opts);
|
TestOptions threadOpts = new TestOptions(opts);
|
||||||
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
|
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
|
||||||
RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() {
|
RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
|
||||||
@Override
|
@Override
|
||||||
public void setStatus(final String msg) throws IOException {
|
public void setStatus(final String msg) throws IOException {
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
|
@ -463,6 +483,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
+ "\tAvg: " + (total / results.length) + "ms");
|
+ "\tAvg: " + (total / results.length) + "ms");
|
||||||
|
|
||||||
con.close();
|
con.close();
|
||||||
|
asyncCon.close();
|
||||||
|
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
@ -476,7 +497,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
*/
|
*/
|
||||||
static Job doMapReduce(TestOptions opts, final Configuration conf)
|
static Job doMapReduce(TestOptions opts, final Configuration conf)
|
||||||
throws IOException, InterruptedException, ClassNotFoundException {
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
|
final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
|
||||||
assert cmd != null;
|
assert cmd != null;
|
||||||
Path inputDir = writeInputFile(conf, opts);
|
Path inputDir = writeInputFile(conf, opts);
|
||||||
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
|
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
|
||||||
|
@ -567,17 +588,17 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
* Describes a command.
|
* Describes a command.
|
||||||
*/
|
*/
|
||||||
static class CmdDescriptor {
|
static class CmdDescriptor {
|
||||||
private Class<? extends Test> cmdClass;
|
private Class<? extends TestBase> cmdClass;
|
||||||
private String name;
|
private String name;
|
||||||
private String description;
|
private String description;
|
||||||
|
|
||||||
CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
|
CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
|
||||||
this.cmdClass = cmdClass;
|
this.cmdClass = cmdClass;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.description = description;
|
this.description = description;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Class<? extends Test> getCmdClass() {
|
public Class<? extends TestBase> getCmdClass() {
|
||||||
return cmdClass;
|
return cmdClass;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1001,7 +1022,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
* A test.
|
* A test.
|
||||||
* Subclass to particularize what happens per row.
|
* Subclass to particularize what happens per row.
|
||||||
*/
|
*/
|
||||||
static abstract class Test {
|
static abstract class TestBase {
|
||||||
// Below is make it so when Tests are all running in the one
|
// Below is make it so when Tests are all running in the one
|
||||||
// jvm, that they each have a differently seeded Random.
|
// jvm, that they each have a differently seeded Random.
|
||||||
private static final Random randomSeed = new Random(System.currentTimeMillis());
|
private static final Random randomSeed = new Random(System.currentTimeMillis());
|
||||||
|
@ -1018,8 +1039,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
private final Status status;
|
private final Status status;
|
||||||
private final Sampler<?> traceSampler;
|
private final Sampler<?> traceSampler;
|
||||||
private final SpanReceiverHost receiverHost;
|
private final SpanReceiverHost receiverHost;
|
||||||
protected Connection connection;
|
|
||||||
// protected Table table;
|
|
||||||
|
|
||||||
private String testName;
|
private String testName;
|
||||||
private Histogram latencyHistogram;
|
private Histogram latencyHistogram;
|
||||||
|
@ -1030,9 +1049,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
* Note that all subclasses of this class must provide a public constructor
|
* Note that all subclasses of this class must provide a public constructor
|
||||||
* that has the exact same list of arguments.
|
* that has the exact same list of arguments.
|
||||||
*/
|
*/
|
||||||
Test(final Connection con, final TestOptions options, final Status status) {
|
TestBase(final Configuration conf, final TestOptions options, final Status status) {
|
||||||
this.connection = con;
|
this.conf = conf;
|
||||||
this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration();
|
|
||||||
this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
|
this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
|
||||||
this.opts = options;
|
this.opts = options;
|
||||||
this.status = status;
|
this.status = status;
|
||||||
|
@ -1098,14 +1116,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
void testSetup() throws IOException {
|
void testSetup() throws IOException {
|
||||||
if (!opts.oneCon) {
|
createConnection();
|
||||||
this.connection = ConnectionFactory.createConnection(conf);
|
|
||||||
}
|
|
||||||
onStartup();
|
onStartup();
|
||||||
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||||
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract void createConnection() throws IOException;
|
||||||
|
|
||||||
abstract void onStartup() throws IOException;
|
abstract void onStartup() throws IOException;
|
||||||
|
|
||||||
void testTakedown() throws IOException {
|
void testTakedown() throws IOException {
|
||||||
|
@ -1124,14 +1142,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
|
status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
|
||||||
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
|
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
|
||||||
}
|
}
|
||||||
if (!opts.oneCon) {
|
closeConnection();
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
receiverHost.closeReceivers();
|
receiverHost.closeReceivers();
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract void onTakedown() throws IOException;
|
abstract void onTakedown() throws IOException;
|
||||||
|
|
||||||
|
abstract void closeConnection() throws IOException;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Run test
|
* Run test
|
||||||
* @return Elapsed time.
|
* @return Elapsed time.
|
||||||
|
@ -1211,6 +1229,56 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
abstract void testRow(final int i) throws IOException, InterruptedException;
|
abstract void testRow(final int i) throws IOException, InterruptedException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static abstract class Test extends TestBase {
|
||||||
|
protected Connection connection;
|
||||||
|
|
||||||
|
Test(final Connection con, final TestOptions options, final Status status) {
|
||||||
|
super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
|
||||||
|
this.connection = con;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void createConnection() throws IOException {
|
||||||
|
if (!opts.isOneCon()) {
|
||||||
|
this.connection = ConnectionFactory.createConnection(conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void closeConnection() throws IOException {
|
||||||
|
if (!opts.isOneCon()) {
|
||||||
|
this.connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static abstract class AsyncTest extends TestBase {
|
||||||
|
protected AsyncConnection connection;
|
||||||
|
|
||||||
|
AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
|
||||||
|
super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
|
||||||
|
this.connection = con;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void createConnection() {
|
||||||
|
if (!opts.isOneCon()) {
|
||||||
|
try {
|
||||||
|
this.connection = ConnectionFactory.createAsyncConnection(conf).get();
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
LOG.error("Failed to create async connection", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void closeConnection() throws IOException {
|
||||||
|
if (!opts.isOneCon()) {
|
||||||
|
this.connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static abstract class TableTest extends Test {
|
static abstract class TableTest extends Test {
|
||||||
protected Table table;
|
protected Table table;
|
||||||
|
|
||||||
|
@ -1229,6 +1297,242 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static abstract class AsyncTableTest extends AsyncTest {
|
||||||
|
protected RawAsyncTable table;
|
||||||
|
|
||||||
|
AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
|
||||||
|
super(con, options, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void onStartup() throws IOException {
|
||||||
|
this.table = connection.getRawTable(TableName.valueOf(opts.tableName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void onTakedown() throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class AsyncRandomReadTest extends AsyncTableTest {
|
||||||
|
private final Consistency consistency;
|
||||||
|
private ArrayList<Get> gets;
|
||||||
|
private Random rd = new Random();
|
||||||
|
|
||||||
|
AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
|
||||||
|
super(con, options, status);
|
||||||
|
consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
|
||||||
|
if (opts.multiGet > 0) {
|
||||||
|
LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
|
||||||
|
this.gets = new ArrayList<>(opts.multiGet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void testRow(final int i) throws IOException, InterruptedException {
|
||||||
|
if (opts.randomSleep > 0) {
|
||||||
|
Thread.sleep(rd.nextInt(opts.randomSleep));
|
||||||
|
}
|
||||||
|
Get get = new Get(getRandomRow(this.rand, opts.totalRows));
|
||||||
|
if (opts.addColumns) {
|
||||||
|
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
|
||||||
|
} else {
|
||||||
|
get.addFamily(FAMILY_NAME);
|
||||||
|
}
|
||||||
|
if (opts.filterAll) {
|
||||||
|
get.setFilter(new FilterAllFilter());
|
||||||
|
}
|
||||||
|
get.setConsistency(consistency);
|
||||||
|
if (LOG.isTraceEnabled()) LOG.trace(get.toString());
|
||||||
|
try {
|
||||||
|
if (opts.multiGet > 0) {
|
||||||
|
this.gets.add(get);
|
||||||
|
if (this.gets.size() == opts.multiGet) {
|
||||||
|
Result[] rs =
|
||||||
|
this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
|
||||||
|
updateValueSize(rs);
|
||||||
|
this.gets.clear();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
updateValueSize(this.table.get(get).get());
|
||||||
|
}
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RuntimeException runtime(Throwable e) {
|
||||||
|
if (e instanceof RuntimeException) {
|
||||||
|
return (RuntimeException) e;
|
||||||
|
}
|
||||||
|
return new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <V> V propagate(Callable<V> callable) {
|
||||||
|
try {
|
||||||
|
return callable.call();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw runtime(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getReportingPeriod() {
|
||||||
|
int period = opts.perClientRunRows / 10;
|
||||||
|
return period == 0 ? opts.perClientRunRows : period;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void testTakedown() throws IOException {
|
||||||
|
if (this.gets != null && this.gets.size() > 0) {
|
||||||
|
this.table.get(gets);
|
||||||
|
this.gets.clear();
|
||||||
|
}
|
||||||
|
super.testTakedown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class AsyncRandomWriteTest extends AsyncTableTest {
|
||||||
|
AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
|
||||||
|
super(con, options, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void testRow(final int i) throws IOException, InterruptedException {
|
||||||
|
byte[] row = getRandomRow(this.rand, opts.totalRows);
|
||||||
|
Put put = new Put(row);
|
||||||
|
for (int column = 0; column < opts.columns; column++) {
|
||||||
|
byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
|
||||||
|
byte[] value = generateData(this.rand, getValueLength(this.rand));
|
||||||
|
if (opts.useTags) {
|
||||||
|
byte[] tag = generateData(this.rand, TAG_LENGTH);
|
||||||
|
Tag[] tags = new Tag[opts.noOfTags];
|
||||||
|
for (int n = 0; n < opts.noOfTags; n++) {
|
||||||
|
Tag t = new ArrayBackedTag((byte) n, tag);
|
||||||
|
tags[n] = t;
|
||||||
|
}
|
||||||
|
KeyValue kv =
|
||||||
|
new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
|
||||||
|
put.add(kv);
|
||||||
|
updateValueSize(kv.getValueLength());
|
||||||
|
} else {
|
||||||
|
put.addColumn(FAMILY_NAME, qualifier, value);
|
||||||
|
updateValueSize(value.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||||
|
try {
|
||||||
|
table.put(put).get();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class AsyncScanTest extends AsyncTableTest {
|
||||||
|
private ResultScanner testScanner;
|
||||||
|
private AsyncTable asyncTable;
|
||||||
|
|
||||||
|
AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
|
||||||
|
super(con, options, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void onStartup() throws IOException {
|
||||||
|
this.asyncTable =
|
||||||
|
connection.getTable(TableName.valueOf(opts.tableName),
|
||||||
|
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void testTakedown() throws IOException {
|
||||||
|
if (this.testScanner != null) {
|
||||||
|
this.testScanner.close();
|
||||||
|
}
|
||||||
|
super.testTakedown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void testRow(final int i) throws IOException {
|
||||||
|
if (this.testScanner == null) {
|
||||||
|
Scan scan =
|
||||||
|
new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
|
||||||
|
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
|
||||||
|
.setReadType(opts.scanReadType);
|
||||||
|
if (opts.addColumns) {
|
||||||
|
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
|
||||||
|
} else {
|
||||||
|
scan.addFamily(FAMILY_NAME);
|
||||||
|
}
|
||||||
|
if (opts.filterAll) {
|
||||||
|
scan.setFilter(new FilterAllFilter());
|
||||||
|
}
|
||||||
|
this.testScanner = asyncTable.getScanner(scan);
|
||||||
|
}
|
||||||
|
Result r = testScanner.next();
|
||||||
|
updateValueSize(r);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class AsyncSequentialReadTest extends AsyncTableTest {
|
||||||
|
AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
|
||||||
|
super(con, options, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void testRow(final int i) throws IOException, InterruptedException {
|
||||||
|
Get get = new Get(format(i));
|
||||||
|
if (opts.addColumns) {
|
||||||
|
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
|
||||||
|
}
|
||||||
|
if (opts.filterAll) {
|
||||||
|
get.setFilter(new FilterAllFilter());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
updateValueSize(table.get(get).get());
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class AsyncSequentialWriteTest extends AsyncTableTest {
|
||||||
|
AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
|
||||||
|
super(con, options, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void testRow(final int i) throws IOException, InterruptedException {
|
||||||
|
byte[] row = format(i);
|
||||||
|
Put put = new Put(row);
|
||||||
|
for (int column = 0; column < opts.columns; column++) {
|
||||||
|
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
|
||||||
|
byte[] value = generateData(this.rand, getValueLength(this.rand));
|
||||||
|
if (opts.useTags) {
|
||||||
|
byte[] tag = generateData(this.rand, TAG_LENGTH);
|
||||||
|
Tag[] tags = new Tag[opts.noOfTags];
|
||||||
|
for (int n = 0; n < opts.noOfTags; n++) {
|
||||||
|
Tag t = new ArrayBackedTag((byte) n, tag);
|
||||||
|
tags[n] = t;
|
||||||
|
}
|
||||||
|
KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
|
||||||
|
value, tags);
|
||||||
|
put.add(kv);
|
||||||
|
updateValueSize(kv.getValueLength());
|
||||||
|
} else {
|
||||||
|
put.addColumn(FAMILY_NAME, qualifier, value);
|
||||||
|
updateValueSize(value.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||||
|
try {
|
||||||
|
table.put(put).get();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static abstract class BufferedMutatorTest extends Test {
|
static abstract class BufferedMutatorTest extends Test {
|
||||||
protected BufferedMutator mutator;
|
protected BufferedMutator mutator;
|
||||||
protected Table table;
|
protected Table table;
|
||||||
|
@ -1789,23 +2093,31 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
return random.nextInt(Integer.MAX_VALUE) % totalRows;
|
return random.nextInt(Integer.MAX_VALUE) % totalRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
static RunResult runOneClient(final Class<? extends Test> cmd, Configuration conf, Connection con,
|
static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
|
||||||
TestOptions opts, final Status status)
|
Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
|
status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
|
||||||
opts.perClientRunRows + " rows");
|
+ opts.perClientRunRows + " rows");
|
||||||
long totalElapsedTime;
|
long totalElapsedTime;
|
||||||
|
|
||||||
final Test t;
|
final TestBase t;
|
||||||
try {
|
try {
|
||||||
|
if (AsyncTest.class.isAssignableFrom(cmd)) {
|
||||||
|
Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
|
||||||
|
Constructor<? extends AsyncTest> constructor =
|
||||||
|
newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
|
||||||
|
t = constructor.newInstance(asyncCon, opts, status);
|
||||||
|
} else {
|
||||||
|
Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
|
||||||
Constructor<? extends Test> constructor =
|
Constructor<? extends Test> constructor =
|
||||||
cmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
|
newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
|
||||||
t = constructor.newInstance(con, opts, status);
|
t = constructor.newInstance(con, opts, status);
|
||||||
|
}
|
||||||
} catch (NoSuchMethodException e) {
|
} catch (NoSuchMethodException e) {
|
||||||
throw new IllegalArgumentException("Invalid command class: " +
|
throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
|
||||||
cmd.getName() + ". It does not provide a constructor as described by " +
|
+ ". It does not provide a constructor as described by "
|
||||||
"the javadoc comment. Available constructors are: " +
|
+ "the javadoc comment. Available constructors are: "
|
||||||
Arrays.toString(cmd.getConstructors()));
|
+ Arrays.toString(cmd.getConstructors()));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IllegalStateException("Failed to construct command class", e);
|
throw new IllegalStateException("Failed to construct command class", e);
|
||||||
}
|
}
|
||||||
|
@ -1823,8 +2135,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
return opts.valueRandom? opts.valueSize/2: opts.valueSize;
|
return opts.valueRandom? opts.valueSize/2: opts.valueSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
|
private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
|
||||||
InterruptedException, ClassNotFoundException {
|
InterruptedException, ClassNotFoundException, ExecutionException {
|
||||||
// Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
|
// Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
|
||||||
// the TestOptions introspection for us and dump the output in a readable format.
|
// the TestOptions introspection for us and dump the output in a readable format.
|
||||||
LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
|
LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
|
||||||
|
@ -1944,7 +2256,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Command:");
|
System.err.println("Command:");
|
||||||
for (CmdDescriptor command : COMMANDS.values()) {
|
for (CmdDescriptor command : COMMANDS.values()) {
|
||||||
System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
|
System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
|
||||||
}
|
}
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Args:");
|
System.err.println("Args:");
|
||||||
|
@ -2285,7 +2597,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
return errCode;
|
return errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
|
Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
|
||||||
if (cmdClass != null) {
|
if (cmdClass != null) {
|
||||||
runTest(cmdClass, opts);
|
runTest(cmdClass, opts);
|
||||||
errCode = 0;
|
errCode = 0;
|
||||||
|
@ -2302,7 +2614,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
||||||
return COMMANDS.containsKey(cmd);
|
return COMMANDS.containsKey(cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Class<? extends Test> determineCommandClass(String cmd) {
|
private static Class<? extends TestBase> determineCommandClass(String cmd) {
|
||||||
CmdDescriptor descriptor = COMMANDS.get(cmd);
|
CmdDescriptor descriptor = COMMANDS.get(cmd);
|
||||||
return descriptor != null ? descriptor.getCmdClass() : null;
|
return descriptor != null ? descriptor.getCmdClass() : null;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue