HBASE-11491 Add an option to sleep randomly during the tests with the PE tool

This commit is contained in:
Nicolas Liochon 2014-07-11 14:06:28 +02:00
parent 77554df881
commit 4f3d5bbab9
1 changed files with 30 additions and 12 deletions

View File

@ -254,9 +254,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
ObjectMapper mapper = new ObjectMapper();
TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
Configuration conf = HBaseConfiguration.create(context.getConfiguration());
final HConnection con = HConnectionManager.createConnection(conf);
// Evaluation task
long elapsedTime = runOneClient(this.cmd, conf, opts, status);
long elapsedTime = runOneClient(this.cmd, conf, con, opts, status);
// Collect how much time the thing took. Report as map output and
// to the ELAPSED_TIME counter.
context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
@ -377,6 +378,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
long[] timings = new long[opts.numClientThreads];
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
final HConnection con = HConnectionManager.createConnection(conf);
for (int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = pool.submit(new Callable<Long>() {
@ -384,7 +386,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
public Long call() throws Exception {
TestOptions threadOpts = new TestOptions(opts);
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
long elapsedTime = runOneClient(cmd, conf, threadOpts, new Status() {
long elapsedTime = runOneClient(cmd, conf, con, threadOpts, new Status() {
@Override
public void setStatus(final String msg) throws IOException {
LOG.info(msg);
@ -397,6 +399,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
});
}
pool.shutdown();
for (int i = 0; i < threads.length; i++) {
try {
timings[i] = threads[i].get();
@ -416,6 +419,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
+ "\tMin: " + timings[0] + "ms"
+ "\tMax: " + timings[timings.length - 1] + "ms"
+ "\tAvg: " + (total / timings.length) + "ms");
con.close();
return total;
}
@ -556,6 +562,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
int noOfTags = 1;
boolean reportLatency = false;
int multiGet = 0;
int randomSleep = 0;
boolean inMemoryCF = false;
int presplitRegions = 0;
int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
@ -599,6 +606,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.valueRandom = that.valueRandom;
this.valueSize = that.valueSize;
this.period = that.period;
this.randomSleep = that.randomSleep;
}
public boolean isNomapred() {
@ -735,7 +743,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
private Histogram valueSize;
/**
* Note that all subclasses of this class must provide a public contructor
* Note that all subclasses of this class must provide a public constructor
* that has the exact same list of arguments.
*/
Test(final HConnection con, final TestOptions options, final Status status) {
@ -829,7 +837,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @return Elapsed time.
* @throws IOException
*/
long test() throws IOException {
long test() throws IOException, InterruptedException {
testSetup();
LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
final long startTime = System.nanoTime();
@ -844,7 +852,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
/**
* Provides an extension point for tests that don't want a per row invocation.
*/
void testTimed() throws IOException {
void testTimed() throws IOException, InterruptedException {
int lastRow = opts.startRow + opts.perClientRunRows;
// Report on completion of 1/10th of total.
for (int i = opts.startRow; i < lastRow; i++) {
@ -925,7 +933,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* Test for individual row.
* @param i Row index.
*/
abstract void testRow(final int i) throws IOException;
abstract void testRow(final int i) throws IOException, InterruptedException;
}
@ -1051,6 +1059,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
static class RandomReadTest extends Test {
private final Consistency consistency;
private ArrayList<Get> gets;
private Random rd = new Random();
RandomReadTest(HConnection con, TestOptions options, Status status) {
super(con, options, status);
@ -1062,7 +1071,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
@Override
void testRow(final int i) throws IOException {
void testRow(final int i) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
Get get = new Get(getRandomRow(this.rand, opts.totalRows));
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
if (opts.filterAll) {
@ -1320,15 +1332,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
}
static long runOneClient(final Class<? extends Test> cmd, Configuration conf, TestOptions opts,
final Status status)
throws IOException {
static long runOneClient(final Class<? extends Test> cmd, Configuration conf, HConnection con,
TestOptions opts, final Status status)
throws IOException, InterruptedException {
status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
opts.perClientRunRows + " rows");
long totalElapsedTime;
final Test t;
HConnection con = HConnectionManager.createConnection(conf);
try {
Constructor<? extends Test> constructor =
cmd.getDeclaredConstructor(HConnection.class, TestOptions.class, Status.class);
@ -1347,7 +1358,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
"ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
getAverageValueLength(opts)) + ")");
con.close();
return totalElapsedTime;
}
@ -1427,6 +1438,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
"by randomRead. Default: disabled");
System.err.println(" replicas Enable region replica testing. Defaults: 1.");
System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: ");
@ -1597,6 +1609,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
final String randomSleep = "--randomSleep=";
if (cmd.startsWith(randomSleep)) {
opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
continue;
}
final String bloomFilter = "--bloomFilter";
if (cmd.startsWith(bloomFilter)) {
opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));