HBASE-8407 Remove Async HBase
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1477251 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b33878f6fc
commit
741bbb5839
|
@ -459,34 +459,6 @@
|
|||
<groupId>stax</groupId>
|
||||
<artifactId>stax-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hbase</groupId>
|
||||
<artifactId>asynchbase</artifactId>
|
||||
<version>[1.3.1,)</version>
|
||||
<!--
|
||||
This is needed otherwise Maven complains because asynchbase depends on SLF4J 1.6:
|
||||
"The requested version 1.5.8 by your slf4j binding is not compatible with [1.6]"
|
||||
See http://stackoverflow.com/questions/5477942/slf4j-version-problem-while-building-through-the-maven
|
||||
Note that we can't do what Ceki suggests here, because v1.6 removed some interface
|
||||
that the Hadoop jar calls into, so we have to stick to the 1.5 version they pull.
|
||||
-->
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</exclusion>
|
||||
<!-- We also have to exclude the other slf4j libraries pulled by asynchbase. -->
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>jcl-over-slf4j</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- Test Dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jettison</groupId>
|
||||
|
|
|
@ -87,13 +87,6 @@ import org.apache.hadoop.util.Tool;
|
|||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.util.LineReader;
|
||||
|
||||
import com.stumbleupon.async.Callback;
|
||||
import com.stumbleupon.async.Deferred;
|
||||
import org.hbase.async.GetRequest;
|
||||
import org.hbase.async.HBaseClient;
|
||||
import org.hbase.async.PleaseThrottleException;
|
||||
import org.hbase.async.PutRequest;
|
||||
import org.hbase.async.Scanner;
|
||||
|
||||
/**
|
||||
* Script used evaluating HBase performance and scalability. Runs a HBase
|
||||
|
@ -172,8 +165,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
addCommandDescriptor(RandomReadTest.class, "randomRead",
|
||||
"Run random read test");
|
||||
addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
|
||||
"Run random read test with asynchbase");
|
||||
addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
|
||||
"Run random seek and scan 100 test");
|
||||
addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
|
||||
|
@ -186,20 +177,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
"Run random seek scan with both start and stop row (max 10000 rows)");
|
||||
addCommandDescriptor(RandomWriteTest.class, "randomWrite",
|
||||
"Run random write test");
|
||||
addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
|
||||
"Run random write test with asynchbase");
|
||||
addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
|
||||
"Run sequential read test");
|
||||
addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
|
||||
"Run sequential read test with asynchbase");
|
||||
addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
|
||||
"Run sequential write test");
|
||||
addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
|
||||
"Run sequential write test with asynchbase");
|
||||
addCommandDescriptor(ScanTest.class, "scan",
|
||||
"Run scan test (read every row)");
|
||||
addCommandDescriptor(AsyncScanTest.class, "asyncScan",
|
||||
"Run scan test with asynchbase (read every row)");
|
||||
addCommandDescriptor(FilteredScanTest.class, "filterScan",
|
||||
"Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
|
||||
}
|
||||
|
@ -908,177 +891,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
abstract void testRow(final int i) throws IOException;
|
||||
}
|
||||
|
||||
static abstract class AsyncTest extends Test {
|
||||
/** Maximum number of RPCs we're allowed in flight at a time. */
|
||||
private static final int MAX_OUTSTANDING_RPCS = 200000; // Sized for 2G heap.
|
||||
|
||||
private static HBaseClient client; // Only one client regardless of number of threads.
|
||||
|
||||
AsyncTest(final Configuration conf, final TestOptions options, final Status status) {
|
||||
super(null, options, status);
|
||||
final String zkquorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
|
||||
final String znode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
|
||||
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||
synchronized (AsyncTest.class) {
|
||||
if (client == null) {
|
||||
client = new HBaseClient(zkquorum, znode);
|
||||
// Sanity check.
|
||||
try {
|
||||
client.ensureTableFamilyExists(tableName, FAMILY_NAME).joinUninterruptibly();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Missing test table/family?", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
latch = new CountDownLatch(super.perClientRunRows);
|
||||
final int maxrpcs = MAX_OUTSTANDING_RPCS / options.getNumClientThreads();
|
||||
sem = new Semaphore(Math.max(100, maxrpcs));
|
||||
}
|
||||
|
||||
/**
|
||||
* If true, make sure that every read returns a valid-looking KeyValue.
|
||||
*/
|
||||
private static final boolean CHECK_READS = false;
|
||||
|
||||
/** Checks that the row retrieved from HBase looks valid. */
|
||||
protected static void check(final ArrayList<org.hbase.async.KeyValue> row) throws IOException {
|
||||
if (!CHECK_READS) {
|
||||
return;
|
||||
}
|
||||
if (row.size() != 1) {
|
||||
throw new IOException((row.isEmpty() ? "No" : "Multiple (" + row.size() + ')')
|
||||
+ " KeyValue found in row");
|
||||
} else if (row.get(0).value().length != VALUE_LENGTH) {
|
||||
throw new IOException("Invalid value length (found: " + row.get(0).value().length
|
||||
+ ", expected: " + VALUE_LENGTH + ") in row \""
|
||||
+ new String(row.get(0).key()) + '"');
|
||||
}
|
||||
}
|
||||
|
||||
private Exception error = null; // Last exception caught asynchronously.
|
||||
private volatile boolean failed = false; // True if we caught an exception asynchronously.
|
||||
/** Used by sub-classes to handle asynchronous exceptions. */
|
||||
protected final Callback<Exception, Exception> errback = new Callback<Exception, Exception>() {
|
||||
public Exception call(final Exception e) throws Exception {
|
||||
rpcCompleted();
|
||||
if (e instanceof PleaseThrottleException) {
|
||||
LOG.warn("Throttling thread " + Thread.currentThread().getName()
|
||||
+ ", HBase isn't keeping up", e);
|
||||
final int permits = sem.drainPermits(); // Prevent creation of further RPCs.
|
||||
((PleaseThrottleException) e).getDeferred().addBoth(new Callback<Object, Object>() {
|
||||
public Object call(final Object arg) {
|
||||
sem.release(permits);
|
||||
LOG.warn("Done throttling thread " + Thread.currentThread().getName());
|
||||
return arg;
|
||||
}
|
||||
public String toString() {
|
||||
return "error recovery after " + e;
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
error = e;
|
||||
failed = true; // Volatile-write.
|
||||
LOG.error(this + " caught an exception", e);
|
||||
return e;
|
||||
}
|
||||
|
||||
private final String toString = "errback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
|
||||
public String toString() {
|
||||
return toString;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Latch to guarantee we have gotten a response for every single RPC sent.
|
||||
* This latch is initialized up with the number of RPCs we intend to send.
|
||||
* Every time an RPC completes successfully, we decrement its count down
|
||||
* by one. This way we guarantee that all RPCs have completed and their
|
||||
* responses have been handled within the section of the code we're
|
||||
* timing.
|
||||
*/
|
||||
private final CountDownLatch latch;
|
||||
|
||||
/**
|
||||
* Semaphore to control the number of outstanding RPCs.
|
||||
* Because the producer code is synchronous and asynchbase is
|
||||
* non-blocking, the tests will try to create and send all RPCs at once,
|
||||
* thus running the app out of memory. In order to limit the number of
|
||||
* RPCs in flight at the same time, we acquire a permit from this
|
||||
* semaphore each time we access the client to send an RPC, and we release
|
||||
* the permit each time the RPC completes.
|
||||
*/
|
||||
private final Semaphore sem;
|
||||
|
||||
/** Records the completion of an RPC. */
|
||||
protected final void rpcCompleted() {
|
||||
sem.release();
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
/** Callback used on successful read RPCs. */
|
||||
protected final Callback<Object, ArrayList<org.hbase.async.KeyValue>> readCallback = new Callback<Object, ArrayList<org.hbase.async.KeyValue>>() {
|
||||
public Object call(final ArrayList<org.hbase.async.KeyValue> row) throws IOException {
|
||||
rpcCompleted();
|
||||
check(row);
|
||||
return row;
|
||||
}
|
||||
|
||||
private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
|
||||
public String toString() {
|
||||
return toString;
|
||||
}
|
||||
};
|
||||
|
||||
/** Callback used on other successful RPCs. */
|
||||
protected final Callback<Object, Object> callback = new Callback<Object, Object>() {
|
||||
public Object call(final Object arg) {
|
||||
rpcCompleted();
|
||||
return arg;
|
||||
}
|
||||
|
||||
private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
|
||||
public String toString() {
|
||||
return toString;
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
final void testSetup() {
|
||||
// Nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
final void testTakedown() throws IOException {
|
||||
try {
|
||||
// For tests with few writes, asking for a flush before waiting on the
|
||||
// latch tells asynchbase to start flushing writes instead of waiting
|
||||
// until the timer flushes them.
|
||||
client.flush().join();
|
||||
latch.await(); // Make sure the last RPC completed.
|
||||
if (failed) { // Volatile-read
|
||||
throw error;
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Uncaught exception from flush()", e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns the client to use to send an RPC. Call once per RPC. */
|
||||
protected final HBaseClient client() {
|
||||
try {
|
||||
sem.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Shouldn't happen!", e);
|
||||
return null;
|
||||
}
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
static class RandomSeekScanTest extends Test {
|
||||
|
@ -1212,27 +1024,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
}
|
||||
|
||||
static class AsyncRandomReadTest extends AsyncTest {
|
||||
AsyncRandomReadTest(Configuration conf, TestOptions options, Status status) {
|
||||
super(conf, options, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
void testRow(final int i) throws IOException {
|
||||
final GetRequest get = new GetRequest(tableName, getRandomRow(this.rand, this.totalRows));
|
||||
get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
|
||||
|
||||
client().get(get).addCallback(readCallback).addErrback(errback);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getReportingPeriod() {
|
||||
int period = this.perClientRunRows / 100;
|
||||
return period == 0 ? this.perClientRunRows : period;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class RandomWriteTest extends Test {
|
||||
RandomWriteTest(Configuration conf, TestOptions options, Status status) {
|
||||
super(conf, options, status);
|
||||
|
@ -1249,21 +1040,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
static class AsyncRandomWriteTest extends AsyncTest {
|
||||
AsyncRandomWriteTest(Configuration conf, TestOptions options, Status status) {
|
||||
super(conf, options, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
void testRow(final int i) {
|
||||
final PutRequest put = new PutRequest(tableName, getRandomRow(this.rand, this.totalRows),
|
||||
FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
|
||||
put.setDurable(writeToWAL);
|
||||
put.setBufferable(flushCommits);
|
||||
client().put(put).addCallbacks(callback, errback);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class ScanTest extends Test {
|
||||
private ResultScanner testScanner;
|
||||
|
@ -1293,50 +1069,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
}
|
||||
|
||||
static class AsyncScanTest extends AsyncTest {
|
||||
private final Scanner scanner;
|
||||
private final Callback continueScan = new Callback<Object, ArrayList<ArrayList<org.hbase.async.KeyValue>>>() {
|
||||
public Object call(final ArrayList<ArrayList<org.hbase.async.KeyValue>> rows) throws Exception {
|
||||
if (rows != null) {
|
||||
testTimed();
|
||||
for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
|
||||
int n = row.size();
|
||||
while (n-- >= 0) {
|
||||
rpcCompleted();
|
||||
}
|
||||
}
|
||||
for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
|
||||
check(row); // Do this separate as it might throw.
|
||||
}
|
||||
} // else arg is null, we're done scanning.
|
||||
return rows;
|
||||
}
|
||||
public String toString() {
|
||||
return "continueScan on " + scanner;
|
||||
}
|
||||
};
|
||||
|
||||
AsyncScanTest(Configuration conf, TestOptions options, Status status) {
|
||||
super(conf, options, status);
|
||||
scanner = client().newScanner(tableName);
|
||||
scanner.setStartKey(format(this.startRow));
|
||||
scanner.setFamily(FAMILY_NAME);
|
||||
scanner.setQualifier(QUALIFIER_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
void testTimed() {
|
||||
scanner.nextRows()
|
||||
.addCallback(continueScan)
|
||||
.addCallbacks(callback, errback);
|
||||
}
|
||||
|
||||
@Override
|
||||
void testRow(final int i) {
|
||||
// Unused because we completely redefined testTimed().
|
||||
}
|
||||
}
|
||||
|
||||
static class SequentialReadTest extends Test {
|
||||
SequentialReadTest(Configuration conf, TestOptions options, Status status) {
|
||||
super(conf, options, status);
|
||||
|
@ -1351,20 +1083,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
}
|
||||
|
||||
static class AsyncSequentialReadTest extends AsyncTest {
|
||||
AsyncSequentialReadTest(Configuration conf, TestOptions options, Status status) {
|
||||
super(conf, options, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
void testRow(final int i) throws IOException {
|
||||
final GetRequest get = new GetRequest(tableName, format(i));
|
||||
get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
|
||||
client().get(get).addCallback(readCallback).addErrback(errback);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class SequentialWriteTest extends Test {
|
||||
SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
|
||||
super(conf, options, status);
|
||||
|
@ -1381,22 +1099,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
}
|
||||
|
||||
static class AsyncSequentialWriteTest extends AsyncTest {
|
||||
AsyncSequentialWriteTest(Configuration conf, TestOptions options, Status status) {
|
||||
super(conf, options, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
void testRow(final int i) {
|
||||
final PutRequest put = new PutRequest(tableName, format(i),
|
||||
FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
|
||||
put.setDurable(writeToWAL);
|
||||
put.setBufferable(flushCommits);
|
||||
client().put(put).addCallbacks(callback, errback);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class FilteredScanTest extends Test {
|
||||
protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
|
||||
|
||||
|
|
Loading…
Reference in New Issue