HBASE-5539 asynchbase PerformanceEvaluation (Benoit)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1352764 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-06-22 03:09:37 +00:00
parent 72aa146d01
commit 877030a5b9
3 changed files with 372 additions and 18 deletions

View File

@ -289,6 +289,11 @@
<artifactId>hbase-common</artifactId>
</dependency>
<!-- General dependencies -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.5.0.Final-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
@ -450,6 +455,34 @@
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</dependency>
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
<version>[1.3.1,)</version>
<!--
This is needed otherwise Maven complains because asynchbase depends on SLF4J 1.6:
"The requested version 1.5.8 by your slf4j binding is not compatible with [1.6]"
See http://stackoverflow.com/questions/5477942/slf4j-version-problem-while-building-through-the-maven
Note that we can't do what Ceki suggests here, because v1.6 removed some interface
that the Hadoop jar calls into, so we have to stick to the 1.5 version they pull.
-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<!-- We also have to exclude the other slf4j libraries pulled by asynchbase. -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<!-- Skip the tests in this module -->

View File

@ -32,6 +32,8 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.lang.reflect.Constructor;
@ -79,6 +81,14 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.LineReader;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.PleaseThrottleException;
import org.hbase.async.PutRequest;
import org.hbase.async.Scanner;
/**
* Script used evaluating HBase performance and scalability. Runs a HBase
* client that steps through one of a set of hardcoded tests or 'experiments'
@ -155,6 +165,8 @@ public class PerformanceEvaluation {
addCommandDescriptor(RandomReadTest.class, "randomRead",
"Run random read test");
addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
"Run random read test with asynchbase");
addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
"Run random seek and scan 100 test");
addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
@ -167,12 +179,20 @@ public class PerformanceEvaluation {
"Run random seek scan with both start and stop row (max 10000 rows)");
addCommandDescriptor(RandomWriteTest.class, "randomWrite",
"Run random write test");
addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
"Run random write test with asynchbase");
addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
"Run sequential read test");
addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
"Run sequential read test with asynchbase");
addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
"Run sequential write test");
addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
"Run sequential write test with asynchbase");
addCommandDescriptor(ScanTest.class, "scan",
"Run scan test (read every row)");
addCommandDescriptor(AsyncScanTest.class, "asyncScan",
"Run scan test with asynchbase (read every row)");
addCommandDescriptor(FilteredScanTest.class, "filterScan",
"Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
}
@ -536,14 +556,16 @@ public class PerformanceEvaluation {
*/
private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
final List<Thread> threads = new ArrayList<Thread>(this.N);
final long[] timings = new long[this.N];
final int perClientRows = R/N;
for (int i = 0; i < this.N; i++) {
Thread t = new Thread (Integer.toString(i)) {
final int index = i;
Thread t = new Thread ("TestClient-" + i) {
@Override
public void run() {
super.run();
PerformanceEvaluation pe = new PerformanceEvaluation(conf);
int index = Integer.parseInt(getName());
pe.N = N;
try {
long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
perClientRows, R,
@ -552,6 +574,7 @@ public class PerformanceEvaluation {
LOG.info("client-" + getName() + " " + msg);
}
});
timings[index] = elapsedTime;
LOG.info("Finished " + getName() + " in " + elapsedTime +
"ms writing " + perClientRows + " rows");
} catch (IOException e) {
@ -573,6 +596,18 @@ public class PerformanceEvaluation {
}
}
}
final String test = cmd.getSimpleName();
LOG.info("[" + test + "] Summary of timings (ms): "
+ Arrays.toString(timings));
Arrays.sort(timings);
long total = 0;
for (int i = 0; i < this.N; i++) {
total += timings[i];
}
LOG.info("[" + test + "]"
+ "\tMin: " + timings[0] + "ms"
+ "\tMax: " + timings[this.N - 1] + "ms"
+ "\tAvg: " + (total / this.N) + "ms");
}
/*
@ -692,6 +727,7 @@ public class PerformanceEvaluation {
private int startRow;
private int perClientRunRows;
private int totalRows;
private int numClientThreads;
private byte[] tableName;
private boolean flushCommits;
private boolean writeToWAL = true;
@ -699,10 +735,13 @@ public class PerformanceEvaluation {
TestOptions() {
}
TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, boolean flushCommits, boolean writeToWAL) {
TestOptions(int startRow, int perClientRunRows, int totalRows,
int numClientThreads, byte[] tableName,
boolean flushCommits, boolean writeToWAL) {
this.startRow = startRow;
this.perClientRunRows = perClientRunRows;
this.totalRows = totalRows;
this.numClientThreads = numClientThreads;
this.tableName = tableName;
this.flushCommits = flushCommits;
this.writeToWAL = writeToWAL;
@ -720,6 +759,10 @@ public class PerformanceEvaluation {
return totalRows;
}
public int getNumClientThreads() {
return numClientThreads;
}
public byte[] getTableName() {
return tableName;
}
@ -752,7 +795,6 @@ public class PerformanceEvaluation {
protected final int totalRows;
private final Status status;
protected byte[] tableName;
protected HBaseAdmin admin;
protected HTable table;
protected volatile Configuration conf;
protected boolean flushCommits;
@ -785,13 +827,12 @@ public class PerformanceEvaluation {
}
void testSetup() throws IOException {
this.admin = new HBaseAdmin(conf);
this.table = new HTable(conf, tableName);
this.table.setAutoFlush(false);
this.table.setScannerCaching(30);
}
void testTakedown() throws IOException {
void testTakedown() throws IOException {
if (flushCommits) {
this.table.flushCommits();
}
@ -804,16 +845,15 @@ public class PerformanceEvaluation {
* @throws IOException
*/
long test() throws IOException {
long elapsedTime;
testSetup();
long startTime = System.currentTimeMillis();
LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
final long startTime = System.nanoTime();
try {
testTimed();
elapsedTime = System.currentTimeMillis() - startTime;
} finally {
testTakedown();
}
return elapsedTime;
return (System.nanoTime() - startTime) / 1000000;
}
/**
@ -834,7 +874,178 @@ public class PerformanceEvaluation {
* Test for individual row.
* @param i Row index.
*/
void testRow(final int i) throws IOException {
abstract void testRow(final int i) throws IOException;
}
static abstract class AsyncTest extends Test {
/** Maximum number of RPCs we're allowed in flight at a time. */
private static final int MAX_OUTSTANDING_RPCS = 200000; // Sized for 2G heap.
private static HBaseClient client; // Only one client regardless of number of threads.
AsyncTest(final Configuration conf, final TestOptions options, final Status status) {
super(null, options, status);
final String zkquorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
final String znode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
synchronized (AsyncTest.class) {
if (client == null) {
client = new HBaseClient(zkquorum, znode);
// Sanity check.
try {
client.ensureTableFamilyExists(TABLE_NAME, FAMILY_NAME).joinUninterruptibly();
} catch (Exception e) {
throw new RuntimeException("Missing test table/family?", e);
}
}
}
latch = new CountDownLatch(super.perClientRunRows);
final int maxrpcs = MAX_OUTSTANDING_RPCS / options.getNumClientThreads();
sem = new Semaphore(Math.max(100, maxrpcs));
}
/**
* If true, make sure that every read returns a valid-looking KeyValue.
*/
private static final boolean CHECK_READS = false;
/** Checks that the row retrieved from HBase looks valid. */
protected static void check(final ArrayList<org.hbase.async.KeyValue> row) throws IOException {
if (!CHECK_READS) {
return;
}
if (row.size() != 1) {
throw new IOException((row.isEmpty() ? "No" : "Multiple (" + row.size() + ')')
+ " KeyValue found in row");
} else if (row.get(0).value().length != ROW_LENGTH) {
throw new IOException("Invalid value length (found: " + row.get(0).value().length
+ ", expected: " + ROW_LENGTH + ") in row \""
+ new String(row.get(0).key()) + '"');
}
}
private Exception error = null; // Last exception caught asynchronously.
private volatile boolean failed = false; // True if we caught an exception asynchronously.
/** Used by sub-classes to handle asynchronous exceptions. */
protected final Callback<Exception, Exception> errback = new Callback<Exception, Exception>() {
public Exception call(final Exception e) throws Exception {
rpcCompleted();
if (e instanceof PleaseThrottleException) {
LOG.warn("Throttling thread " + Thread.currentThread().getName()
+ ", HBase isn't keeping up", e);
final int permits = sem.drainPermits(); // Prevent creation of further RPCs.
((PleaseThrottleException) e).getDeferred().addBoth(new Callback<Object, Object>() {
public Object call(final Object arg) {
sem.release(permits);
LOG.warn("Done throttling thread " + Thread.currentThread().getName());
return arg;
}
public String toString() {
return "error recovery after " + e;
}
});
return null;
}
error = e;
failed = true; // Volatile-write.
LOG.error(this + " caught an exception", e);
return e;
}
private final String toString = "errback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
public String toString() {
return toString;
}
};
/**
* Latch to guarantee we have gotten a response for every single RPC sent.
* This latch is initialized up with the number of RPCs we intend to send.
* Every time an RPC completes successfully, we decrement its count down
* by one. This way we guarantee that all RPCs have completed and their
* responses have been handled within the section of the code we're
* timing.
*/
private final CountDownLatch latch;
/**
* Semaphore to control the number of outstanding RPCs.
* Because the producer code is synchronous and asynchbase is
* non-blocking, the tests will try to create and send all RPCs at once,
* thus running the app out of memory. In order to limit the number of
* RPCs in flight at the same time, we acquire a permit from this
* semaphore each time we access the client to send an RPC, and we release
* the permit each time the RPC completes.
*/
private final Semaphore sem;
/** Records the completion of an RPC. */
protected final void rpcCompleted() {
sem.release();
latch.countDown();
}
/** Callback used on successful read RPCs. */
protected final Callback<Object, ArrayList<org.hbase.async.KeyValue>> readCallback = new Callback<Object, ArrayList<org.hbase.async.KeyValue>>() {
public Object call(final ArrayList<org.hbase.async.KeyValue> row) throws IOException {
rpcCompleted();
check(row);
return row;
}
private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
public String toString() {
return toString;
}
};
/** Callback used on other successful RPCs. */
protected final Callback<Object, Object> callback = new Callback<Object, Object>() {
public Object call(final Object arg) {
rpcCompleted();
return arg;
}
private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
public String toString() {
return toString;
}
};
@Override
final void testSetup() {
// Nothing.
}
@Override
final void testTakedown() throws IOException {
try {
// For tests with few writes, asking for a flush before waiting on the
// latch tells asynchbase to start flushing writes instead of waiting
// until the timer flushes them.
client.flush().join();
latch.await(); // Make sure the last RPC completed.
if (failed) { // Volatile-read
throw error;
}
} catch (RuntimeException e) {
throw e;
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException("Uncaught exception from flush()", e);
}
}
/** Returns the client to use to send an RPC. Call once per RPC. */
protected final HBaseClient client() {
try {
sem.acquire();
} catch (InterruptedException e) {
LOG.error("Shouldn't happen!", e);
return null;
}
return client;
}
}
@ -970,6 +1181,27 @@ public class PerformanceEvaluation {
}
static class AsyncRandomReadTest extends AsyncTest {
AsyncRandomReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@Override
void testRow(final int i) throws IOException {
final GetRequest get = new GetRequest(TABLE_NAME, getRandomRow(this.rand, this.totalRows));
get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
client().get(get).addCallback(readCallback).addErrback(errback);
}
@Override
protected int getReportingPeriod() {
int period = this.perClientRunRows / 100;
return period == 0 ? this.perClientRunRows : period;
}
}
static class RandomWriteTest extends Test {
RandomWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
@ -986,6 +1218,22 @@ public class PerformanceEvaluation {
}
}
static class AsyncRandomWriteTest extends AsyncTest {
AsyncRandomWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@Override
void testRow(final int i) {
final PutRequest put = new PutRequest(TABLE_NAME, getRandomRow(this.rand, this.totalRows),
FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
put.setDurable(writeToWAL);
put.setBufferable(flushCommits);
client().put(put).addCallbacks(callback, errback);
}
}
static class ScanTest extends Test {
private ResultScanner testScanner;
@ -993,11 +1241,6 @@ public class PerformanceEvaluation {
super(conf, options, status);
}
@Override
void testSetup() throws IOException {
super.testSetup();
}
@Override
void testTakedown() throws IOException {
if (this.testScanner != null) {
@ -1019,6 +1262,50 @@ public class PerformanceEvaluation {
}
static class AsyncScanTest extends AsyncTest {
private final Scanner scanner;
private final Callback continueScan = new Callback<Object, ArrayList<ArrayList<org.hbase.async.KeyValue>>>() {
public Object call(final ArrayList<ArrayList<org.hbase.async.KeyValue>> rows) throws Exception {
if (rows != null) {
testTimed();
for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
int n = row.size();
while (n-- >= 0) {
rpcCompleted();
}
}
for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
check(row); // Do this separate as it might throw.
}
} // else arg is null, we're done scanning.
return rows;
}
public String toString() {
return "continueScan on " + scanner;
}
};
AsyncScanTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
scanner = client().newScanner(TABLE_NAME);
scanner.setStartKey(format(this.startRow));
scanner.setFamily(FAMILY_NAME);
scanner.setQualifier(QUALIFIER_NAME);
}
@Override
void testTimed() {
scanner.nextRows()
.addCallback(continueScan)
.addCallbacks(callback, errback);
}
@Override
void testRow(final int i) {
// Unused because we completely redefined testTimed().
}
}
static class SequentialReadTest extends Test {
SequentialReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
@ -1033,6 +1320,20 @@ public class PerformanceEvaluation {
}
static class AsyncSequentialReadTest extends AsyncTest {
AsyncSequentialReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@Override
void testRow(final int i) throws IOException {
final GetRequest get = new GetRequest(TABLE_NAME, format(i));
get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
client().get(get).addCallback(readCallback).addErrback(errback);
}
}
static class SequentialWriteTest extends Test {
SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
@ -1049,6 +1350,22 @@ public class PerformanceEvaluation {
}
static class AsyncSequentialWriteTest extends AsyncTest {
AsyncSequentialWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
@Override
void testRow(final int i) {
final PutRequest put = new PutRequest(TABLE_NAME, format(i),
FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
put.setDurable(writeToWAL);
put.setBufferable(flushCommits);
client().put(put).addCallbacks(callback, errback);
}
}
static class FilteredScanTest extends Test {
protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
@ -1123,9 +1440,9 @@ public class PerformanceEvaluation {
perClientRunRows + " rows");
long totalElapsedTime = 0;
Test t = null;
TestOptions options = new TestOptions(startRow, perClientRunRows,
totalRows, getTableDescriptor().getName(), flushCommits, writeToWAL);
totalRows, N, TABLE_NAME, flushCommits, writeToWAL);
final Test t;
try {
Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
Configuration.class, TestOptions.class, Status.class);

View File

@ -255,6 +255,10 @@
</developer>
</developers>
<repositories>
<repository>
<id>cloudbees netty</id>
<url>http://repository-netty.forge.cloudbees.com/snapshot/</url>
</repository>
<repository>
<id>apache release</id>
<url>https://repository.apache.org/content/repositories/releases/</url>