HBASE-20601 Add multiPut support and other miscellaneous to PE

This commit is contained in:
Allan Yang 2018-05-24 19:52:40 +08:00 committed by Allan Yang
parent 4005a0c4d3
commit f3d1c021de
2 changed files with 250 additions and 140 deletions

View File

@ -438,6 +438,17 @@ public class PerformanceEvaluation extends Configured implements Tool {
return splits;
}
static void setupConnectionCount(final TestOptions opts) {
if (opts.oneCon) {
opts.connCount = 1;
} else {
if (opts.connCount == -1) {
// set to thread number if connCount is not set
opts.connCount = opts.numClientThreads;
}
}
}
/*
* Run all clients in this vm each to its own thread.
*/
@ -450,14 +461,23 @@ public class PerformanceEvaluation extends Configured implements Tool {
RunResult[] results = new RunResult[opts.numClientThreads];
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
final Connection con = ConnectionFactory.createConnection(conf);
final AsyncConnection asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
setupConnectionCount(opts);
final Connection[] cons = new Connection[opts.connCount];
final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
for (int i = 0; i < opts.connCount; i++) {
cons[i] = ConnectionFactory.createConnection(conf);
asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
}
LOG.info("Created " + opts.connCount + " connections for " +
opts.numClientThreads + " threads");
for (int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = pool.submit(new Callable<RunResult>() {
@Override
public RunResult call() throws Exception {
TestOptions threadOpts = new TestOptions(opts);
final Connection con = cons[index % cons.length];
final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
@Override
@ -485,16 +505,26 @@ public class PerformanceEvaluation extends Configured implements Tool {
+ Arrays.toString(results));
Arrays.sort(results);
long total = 0;
float avgLatency = 0 ;
float avgTPS = 0;
for (RunResult result : results) {
total += result.duration;
avgLatency += result.hist.getSnapshot().getMean();
avgTPS += opts.perClientRunRows * 1.0f / result.duration;
}
LOG.info("[" + test + "]"
avgTPS *= 1000; // ms to second
avgLatency = avgLatency / results.length;
LOG.info("[" + test + " duration ]"
+ "\tMin: " + results[0] + "ms"
+ "\tMax: " + results[results.length - 1] + "ms"
+ "\tAvg: " + (total / results.length) + "ms");
LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
for (int i = 0; i < opts.connCount; i++) {
cons[i].close();
asyncCons[i].close();
}
con.close();
asyncCon.close();
return results;
}
@ -648,10 +678,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
boolean writeToWAL = true;
boolean autoFlush = false;
boolean oneCon = false;
int connCount = -1; //wil decide the actual num later
boolean useTags = false;
int noOfTags = 1;
boolean reportLatency = false;
int multiGet = 0;
int multiPut = 0;
int randomSleep = 0;
boolean inMemoryCF = false;
int presplitRegions = 0;
@ -700,10 +732,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.writeToWAL = that.writeToWAL;
this.autoFlush = that.autoFlush;
this.oneCon = that.oneCon;
this.connCount = that.connCount;
this.useTags = that.useTags;
this.noOfTags = that.noOfTags;
this.reportLatency = that.reportLatency;
this.multiGet = that.multiGet;
this.multiPut = that.multiPut;
this.inMemoryCF = that.inMemoryCF;
this.presplitRegions = that.presplitRegions;
this.replicas = that.replicas;
@ -858,6 +892,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.oneCon = oneCon;
}
public int getConnCount() {
return connCount;
}
public void setConnCount(int connCount) {
this.connCount = connCount;
}
public void setUseTags(boolean useTags) {
this.useTags = useTags;
}
@ -874,6 +916,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.multiGet = multiGet;
}
public void setMultiPut(int multiPut) {
this.multiPut = multiPut;
}
public void setInMemoryCF(boolean inMemoryCF) {
this.inMemoryCF = inMemoryCF;
}
@ -982,6 +1028,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
return multiGet;
}
public int getMultiPut() {
return multiPut;
}
public boolean isInMemoryCF() {
return inMemoryCF;
}
@ -1198,12 +1248,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
createConnection();
onStartup();
}
abstract void createConnection() throws IOException;
abstract void onStartup() throws IOException;
void testTakedown() throws IOException {
@ -1217,10 +1264,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
latencyHistogram));
status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
if (valueSizeHistogram.getCount() > 0) {
status.setStatus("ValueSize (bytes) : "
+ YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
} else {
status.setStatus("No valueSize statistics available");
}
if (rpcCallsHistogram.getCount() > 0) {
status.setStatus("rpcCalls (count): " +
YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
@ -1246,13 +1297,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
}
}
closeConnection();
receiverHost.closeReceivers();
}
abstract void onTakedown() throws IOException;
abstract void closeConnection() throws IOException;
/*
* Run test
@ -1292,14 +1341,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
for (int i = startRow; i < lastRow; i++) {
if (i % everyN != 0) continue;
long startTime = System.nanoTime();
try (TraceScope scope = TraceUtil.createTrace("test row")){
testRow(i);
boolean requestSent = false;
try (TraceScope scope = TraceUtil.createTrace("test row");){
requestSent = testRow(i);
}
if ( (i - startRow) > opts.measureAfter) {
// If multiget is enabled, say set to 10, testRow() returns immediately first 9 times
// and sends the actual get request in the 10th iteration. We should only set latency
// when actual request is sent because otherwise it turns out to be 0.
if (opts.multiGet == 0 || (i - startRow + 1) % opts.multiGet == 0) {
// If multiget or multiput is enabled, say set to 10, testRow() returns immediately
// first 9 times and sends the actual get request in the 10th iteration.
// We should only set latency when actual request is sent because otherwise
// it turns out to be 0.
if (requestSent) {
latencyHistogram.update((System.nanoTime() - startTime) / 1000);
}
if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
@ -1324,11 +1375,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
}
/*
/**
* Test for individual row.
* @param i Row index.
* @return true if the row was sent to server and need to record metrics.
* False if not, multiGet and multiPut e.g., the rows are sent
* to server only if enough gets/puts are gathered.
*/
abstract void testRow(final int i) throws IOException, InterruptedException;
abstract boolean testRow(final int i) throws IOException, InterruptedException;
}
static abstract class Test extends TestBase {
@ -1338,20 +1393,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
this.connection = con;
}
@Override
void createConnection() throws IOException {
if (!opts.isOneCon()) {
this.connection = ConnectionFactory.createConnection(conf);
}
}
@Override
void closeConnection() throws IOException {
if (!opts.isOneCon()) {
this.connection.close();
}
}
}
static abstract class AsyncTest extends TestBase {
@ -1361,24 +1402,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
this.connection = con;
}
@Override
void createConnection() {
if (!opts.isOneCon()) {
try {
this.connection = ConnectionFactory.createAsyncConnection(conf).get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to create async connection", e);
}
}
}
@Override
void closeConnection() throws IOException {
if (!opts.isOneCon()) {
this.connection.close();
}
}
}
static abstract class TableTest extends Test {
@ -1431,7 +1454,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
@ -1460,6 +1483,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
updateValueSize(rs);
this.gets.clear();
} else {
return false;
}
} else {
updateValueSize(this.table.get(get).get());
@ -1467,6 +1492,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} catch (ExecutionException e) {
throw new IOException(e);
}
return true;
}
public static RuntimeException runtime(Throwable e) {
@ -1500,43 +1526,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
static class AsyncRandomWriteTest extends AsyncTableTest {
static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void testRow(final int i) throws IOException, InterruptedException {
byte[] row = getRandomRow(this.rand, opts.totalRows);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
for (int column = 0; column < opts.columns; column++) {
byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
byte[] value = generateData(this.rand, getValueLength(this.rand));
if (opts.useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
Tag[] tags = new Tag[opts.noOfTags];
for (int n = 0; n < opts.noOfTags; n++) {
Tag t = new ArrayBackedTag((byte) n, tag);
tags[n] = t;
}
KeyValue kv =
new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
put.add(kv);
updateValueSize(kv.getValueLength());
} else {
put.addColumn(familyName, qualifier, value);
updateValueSize(value.length);
}
}
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
try {
table.put(put).get();
} catch (ExecutionException e) {
throw new IOException(e);
}
protected byte[] generateRow(final int i) {
return getRandomRow(this.rand, opts.totalRows);
}
}
@ -1565,7 +1563,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
if (this.testScanner == null) {
Scan scan =
new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
@ -1589,6 +1587,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
Result r = testScanner.next();
updateValueSize(r);
return true;
}
}
@ -1598,7 +1597,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i) throws IOException, InterruptedException {
Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@ -1619,17 +1618,29 @@ public class PerformanceEvaluation extends Configured implements Tool {
} catch (ExecutionException e) {
throw new IOException(e);
}
return true;
}
}
static class AsyncSequentialWriteTest extends AsyncTableTest {
private ArrayList<Put> puts;
AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
super(con, options, status);
if (opts.multiPut > 0) {
LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
this.puts = new ArrayList<>(opts.multiPut);
}
}
protected byte[] generateRow(final int i) {
return format(i);
}
@Override
void testRow(final int i) throws IOException, InterruptedException {
byte[] row = format(i);
@SuppressWarnings("ReturnValueIgnored")
boolean testRow(final int i) throws IOException, InterruptedException {
byte[] row = generateRow(i);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@ -1656,9 +1667,21 @@ public class PerformanceEvaluation extends Configured implements Tool {
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
try {
table.put(put).get();
if (opts.multiPut > 0) {
this.puts.add(put);
if (this.puts.size() == opts.multiPut) {
this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
this.puts.clear();
} else {
return false;
}
} else {
table.put(put).get();
}
} catch (ExecutionException e) {
throw new IOException(e);
}
return true;
}
}
@ -1691,7 +1714,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
.setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
@ -1722,6 +1745,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
updateScanMetrics(s.getScanMetrics());
s.close();
}
return true;
}
@Override
@ -1738,7 +1762,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
.withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
@ -1775,6 +1799,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
updateScanMetrics(s.getScanMetrics());
s.close();
}
return true;
}
protected abstract Pair<byte[],byte[]> getStartAndStopRow();
@ -1851,7 +1876,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
@ -1878,10 +1903,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
Result [] rs = this.table.get(this.gets);
updateValueSize(rs);
this.gets.clear();
} else {
return false;
}
} else {
updateValueSize(this.table.get(get));
}
return true;
}
@Override
@ -1900,44 +1928,17 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
static class RandomWriteTest extends BufferedMutatorTest {
static class RandomWriteTest extends SequentialWriteTest {
RandomWriteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
byte[] row = getRandomRow(this.rand, opts.totalRows);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
for (int column = 0; column < opts.columns; column++) {
byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
byte[] value = generateData(this.rand, getValueLength(this.rand));
if (opts.useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
Tag[] tags = new Tag[opts.noOfTags];
for (int n = 0; n < opts.noOfTags; n++) {
Tag t = new ArrayBackedTag((byte) n, tag);
tags[n] = t;
}
KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
updateValueSize(kv.getValueLength());
} else {
put.addColumn(familyName, qualifier, value);
updateValueSize(value.length);
}
}
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
if (opts.autoFlush) {
table.put(put);
} else {
mutator.mutate(put);
}
protected byte[] generateRow(final int i) {
return getRandomRow(this.rand, opts.totalRows);
}
}
static class ScanTest extends TableTest {
@ -1957,7 +1958,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
if (this.testScanner == null) {
Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
@ -1980,6 +1981,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
Result r = testScanner.next();
updateValueSize(r);
return true;
}
}
@ -2019,7 +2021,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
Increment increment = new Increment(format(i));
// unlike checkAndXXX tests, which make most sense to do on a single value,
// if multiple families are specified for an increment test we assume it is
@ -2029,6 +2031,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
increment.addColumn(familyName, getQualifier(), 1l);
}
updateValueSize(this.table.increment(increment));
return true;
}
}
@ -2038,7 +2041,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
byte [] bytes = format(i);
Append append = new Append(bytes);
// unlike checkAndXXX tests, which make most sense to do on a single value,
@ -2049,6 +2052,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
append.addColumn(familyName, getQualifier(), bytes);
}
updateValueSize(this.table.append(append));
return true;
}
}
@ -2058,7 +2062,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
@ -2069,6 +2073,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
mutations.add(put);
this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
.ifEquals(bytes).thenMutate(mutations);
return true;
}
}
@ -2078,7 +2083,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
@ -2087,6 +2092,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.table.put(put);
this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
.ifEquals(bytes).thenPut(put);
return true;
}
}
@ -2096,7 +2102,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
@ -2107,6 +2113,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
delete.addColumn(FAMILY_ZERO, getQualifier());
this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
.ifEquals(bytes).thenDelete(delete);
return true;
}
}
@ -2116,7 +2123,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
boolean testRow(final int i) throws IOException {
Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
@ -2133,17 +2140,29 @@ public class PerformanceEvaluation extends Configured implements Tool {
get.setFilter(new FilterAllFilter());
}
updateValueSize(table.get(get));
return true;
}
}
static class SequentialWriteTest extends BufferedMutatorTest {
private ArrayList<Put> puts;
SequentialWriteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
if (opts.multiPut > 0) {
LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
this.puts = new ArrayList<>(opts.multiPut);
}
}
protected byte[] generateRow(final int i) {
return format(i);
}
@Override
void testRow(final int i) throws IOException {
byte[] row = format(i);
boolean testRow(final int i) throws IOException {
byte[] row = generateRow(i);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
@ -2169,10 +2188,21 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
if (opts.autoFlush) {
if (opts.multiPut > 0) {
this.puts.add(put);
if (this.puts.size() == opts.multiPut) {
table.put(this.puts);
this.puts.clear();
} else {
return false;
}
} else {
table.put(put);
}
} else {
mutator.mutate(put);
}
return true;
}
}
@ -2184,7 +2214,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(int i) throws IOException {
boolean testRow(int i) throws IOException {
byte[] value = generateData(this.rand, getValueLength(this.rand));
Scan scan = constructScan(value);
ResultScanner scanner = null;
@ -2199,6 +2229,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
scanner.close();
}
}
return true;
}
protected Scan constructScan(byte[] valuePrefix) throws IOException {
@ -2380,6 +2411,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" nomapred Run multiple clients using threads " +
"(rather than use mapreduce)");
System.err.println(" oneCon all the threads share the same connection. Default: False");
System.err.println(" connCount connections all threads share. "
+ "For example, if set to 2, then all thread share 2 connection. "
+ "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
+ "if not, connCount=thread number");
System.err.println(" sampleRate Execute test on a sample of total " +
"rows. Only supported by randomRead. Default: 1.0");
System.err.println(" period Report every 'period' rows: " +
@ -2416,6 +2452,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
"'valueSize' in zipf form: Default: Not set.");
System.err.println(" writeToWAL Set writeToWAL on puts. Default: True");
System.err.println(" autoFlush Set autoFlush on htable. Default: False");
System.err.println(" multiPut Batch puts together into groups of N. Only supported " +
"by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
System.err.println(" presplit Create presplit table. If a table with same name exists,"
+ " it'll be deleted and recreated (instead of verifying count of its existing regions). "
+ "Recommended for accurate perf analysis (see guide). Default: disabled");
@ -2568,12 +2606,29 @@ public class PerformanceEvaluation extends Configured implements Tool {
final String autoFlush = "--autoFlush=";
if (cmd.startsWith(autoFlush)) {
opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
if (!opts.autoFlush && opts.multiPut > 0) {
throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
}
continue;
}
final String onceCon = "--oneCon=";
if (cmd.startsWith(onceCon)) {
opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
if (opts.oneCon && opts.connCount > 1) {
throw new IllegalArgumentException("oneCon is set to true, "
+ "connCount should not bigger than 1");
}
continue;
}
final String connCount = "--connCount=";
if (cmd.startsWith(connCount)) {
opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
if (opts.oneCon && opts.connCount > 1) {
throw new IllegalArgumentException("oneCon is set to true, "
+ "connCount should not bigger than 1");
}
continue;
}
@ -2589,6 +2644,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
final String multiPut = "--multiPut=";
if (cmd.startsWith(multiPut)) {
opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
if (!opts.autoFlush && opts.multiPut > 0) {
throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
}
continue;
}
final String useTags = "--usetags=";
if (cmd.startsWith(useTags)) {
opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));

View File

@ -243,4 +243,50 @@ public class TestPerformanceEvaluation {
assertTrue(e.getCause() instanceof NoSuchElementException);
}
}
@Test
public void testParseOptsMultiPuts() {
Queue<String> opts = new LinkedList<>();
String cmdName = "sequentialWrite";
opts.offer("--multiPut=10");
opts.offer(cmdName);
opts.offer("64");
PerformanceEvaluation.TestOptions options = null;
try {
options = PerformanceEvaluation.parseOpts(opts);
fail("should fail");
} catch (IllegalArgumentException e) {
System.out.println(e.getMessage());
}
((LinkedList<String>) opts).offerFirst("--multiPut=10");
((LinkedList<String>) opts).offerFirst("--autoFlush=true");
options = PerformanceEvaluation.parseOpts(opts);
assertNotNull(options);
assertNotNull(options.getCmdName());
assertEquals(cmdName, options.getCmdName());
assertTrue(options.getMultiPut() == 10);
}
@Test
public void testParseOptsConnCount() {
Queue<String> opts = new LinkedList<>();
String cmdName = "sequentialWrite";
opts.offer("--oneCon=true");
opts.offer("--connCount=10");
opts.offer(cmdName);
opts.offer("64");
PerformanceEvaluation.TestOptions options = null;
try {
options = PerformanceEvaluation.parseOpts(opts);
fail("should fail");
} catch (IllegalArgumentException e) {
System.out.println(e.getMessage());
}
((LinkedList<String>) opts).offerFirst("--connCount=10");
options = PerformanceEvaluation.parseOpts(opts);
assertNotNull(options);
assertNotNull(options.getCmdName());
assertEquals(cmdName, options.getCmdName());
assertTrue(options.getConnCount() == 10);
}
}