diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 39cb66f03ac..b7c0ee1f600 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -459,34 +459,6 @@ stax stax-api - - org.hbase - asynchbase - [1.3.1,) - - - - org.slf4j - slf4j-api - - - - org.slf4j - jcl-over-slf4j - - - org.slf4j - log4j-over-slf4j - - - test - org.codehaus.jettison diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 701b30c7338..8b8bac30f30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -87,13 +87,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.LineReader; -import com.stumbleupon.async.Callback; -import com.stumbleupon.async.Deferred; -import org.hbase.async.GetRequest; -import org.hbase.async.HBaseClient; -import org.hbase.async.PleaseThrottleException; -import org.hbase.async.PutRequest; -import org.hbase.async.Scanner; /** * Script used evaluating HBase performance and scalability. Runs a HBase @@ -172,8 +165,6 @@ public class PerformanceEvaluation extends Configured implements Tool { addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test"); - addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", - "Run random read test with asynchbase"); addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", "Run random seek and scan 100 test"); addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", @@ -186,20 +177,12 @@ public class PerformanceEvaluation extends Configured implements Tool { "Run random seek scan with both start and stop row (max 10000 rows)"); addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); - addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", - "Run random write test with asynchbase"); addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); - addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", - "Run sequential read test with asynchbase"); addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); - addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", - "Run sequential write test with asynchbase"); addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); - addCommandDescriptor(AsyncScanTest.class, "asyncScan", - "Run scan test with asynchbase (read every row)"); addCommandDescriptor(FilteredScanTest.class, "filterScan", "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)"); } @@ -908,177 +891,6 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException; } - static abstract class AsyncTest extends Test { - /** Maximum number of RPCs we're allowed in flight at a time. */ - private static final int MAX_OUTSTANDING_RPCS = 200000; // Sized for 2G heap. - - private static HBaseClient client; // Only one client regardless of number of threads. - - AsyncTest(final Configuration conf, final TestOptions options, final Status status) { - super(null, options, status); - final String zkquorum = conf.get(HConstants.ZOOKEEPER_QUORUM); - final String znode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); - synchronized (AsyncTest.class) { - if (client == null) { - client = new HBaseClient(zkquorum, znode); - // Sanity check. - try { - client.ensureTableFamilyExists(tableName, FAMILY_NAME).joinUninterruptibly(); - } catch (Exception e) { - throw new RuntimeException("Missing test table/family?", e); - } - } - } - latch = new CountDownLatch(super.perClientRunRows); - final int maxrpcs = MAX_OUTSTANDING_RPCS / options.getNumClientThreads(); - sem = new Semaphore(Math.max(100, maxrpcs)); - } - - /** - * If true, make sure that every read returns a valid-looking KeyValue. - */ - private static final boolean CHECK_READS = false; - - /** Checks that the row retrieved from HBase looks valid. */ - protected static void check(final ArrayList row) throws IOException { - if (!CHECK_READS) { - return; - } - if (row.size() != 1) { - throw new IOException((row.isEmpty() ? "No" : "Multiple (" + row.size() + ')') - + " KeyValue found in row"); - } else if (row.get(0).value().length != VALUE_LENGTH) { - throw new IOException("Invalid value length (found: " + row.get(0).value().length - + ", expected: " + VALUE_LENGTH + ") in row \"" - + new String(row.get(0).key()) + '"'); - } - } - - private Exception error = null; // Last exception caught asynchronously. - private volatile boolean failed = false; // True if we caught an exception asynchronously. - /** Used by sub-classes to handle asynchronous exceptions. */ - protected final Callback errback = new Callback() { - public Exception call(final Exception e) throws Exception { - rpcCompleted(); - if (e instanceof PleaseThrottleException) { - LOG.warn("Throttling thread " + Thread.currentThread().getName() - + ", HBase isn't keeping up", e); - final int permits = sem.drainPermits(); // Prevent creation of further RPCs. - ((PleaseThrottleException) e).getDeferred().addBoth(new Callback() { - public Object call(final Object arg) { - sem.release(permits); - LOG.warn("Done throttling thread " + Thread.currentThread().getName()); - return arg; - } - public String toString() { - return "error recovery after " + e; - } - }); - return null; - } - error = e; - failed = true; // Volatile-write. - LOG.error(this + " caught an exception", e); - return e; - } - - private final String toString = "errback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); - public String toString() { - return toString; - } - }; - - /** - * Latch to guarantee we have gotten a response for every single RPC sent. - * This latch is initialized up with the number of RPCs we intend to send. - * Every time an RPC completes successfully, we decrement its count down - * by one. This way we guarantee that all RPCs have completed and their - * responses have been handled within the section of the code we're - * timing. - */ - private final CountDownLatch latch; - - /** - * Semaphore to control the number of outstanding RPCs. - * Because the producer code is synchronous and asynchbase is - * non-blocking, the tests will try to create and send all RPCs at once, - * thus running the app out of memory. In order to limit the number of - * RPCs in flight at the same time, we acquire a permit from this - * semaphore each time we access the client to send an RPC, and we release - * the permit each time the RPC completes. - */ - private final Semaphore sem; - - /** Records the completion of an RPC. */ - protected final void rpcCompleted() { - sem.release(); - latch.countDown(); - } - - /** Callback used on successful read RPCs. */ - protected final Callback> readCallback = new Callback>() { - public Object call(final ArrayList row) throws IOException { - rpcCompleted(); - check(row); - return row; - } - - private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); - public String toString() { - return toString; - } - }; - - /** Callback used on other successful RPCs. */ - protected final Callback callback = new Callback() { - public Object call(final Object arg) { - rpcCompleted(); - return arg; - } - - private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); - public String toString() { - return toString; - } - }; - - @Override - final void testSetup() { - // Nothing. - } - - @Override - final void testTakedown() throws IOException { - try { - // For tests with few writes, asking for a flush before waiting on the - // latch tells asynchbase to start flushing writes instead of waiting - // until the timer flushes them. - client.flush().join(); - latch.await(); // Make sure the last RPC completed. - if (failed) { // Volatile-read - throw error; - } - } catch (RuntimeException e) { - throw e; - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException("Uncaught exception from flush()", e); - } - } - - /** Returns the client to use to send an RPC. Call once per RPC. */ - protected final HBaseClient client() { - try { - sem.acquire(); - } catch (InterruptedException e) { - LOG.error("Shouldn't happen!", e); - return null; - } - return client; - } - } @SuppressWarnings("unused") static class RandomSeekScanTest extends Test { @@ -1212,27 +1024,6 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class AsyncRandomReadTest extends AsyncTest { - AsyncRandomReadTest(Configuration conf, TestOptions options, Status status) { - super(conf, options, status); - } - - @Override - void testRow(final int i) throws IOException { - final GetRequest get = new GetRequest(tableName, getRandomRow(this.rand, this.totalRows)); - get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME); - - client().get(get).addCallback(readCallback).addErrback(errback); - } - - @Override - protected int getReportingPeriod() { - int period = this.perClientRunRows / 100; - return period == 0 ? this.perClientRunRows : period; - } - - } - static class RandomWriteTest extends Test { RandomWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1249,21 +1040,6 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class AsyncRandomWriteTest extends AsyncTest { - AsyncRandomWriteTest(Configuration conf, TestOptions options, Status status) { - super(conf, options, status); - } - - @Override - void testRow(final int i) { - final PutRequest put = new PutRequest(tableName, getRandomRow(this.rand, this.totalRows), - FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand)); - put.setDurable(writeToWAL); - put.setBufferable(flushCommits); - client().put(put).addCallbacks(callback, errback); - } - - } static class ScanTest extends Test { private ResultScanner testScanner; @@ -1293,50 +1069,6 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class AsyncScanTest extends AsyncTest { - private final Scanner scanner; - private final Callback continueScan = new Callback>>() { - public Object call(final ArrayList> rows) throws Exception { - if (rows != null) { - testTimed(); - for (final ArrayList row : rows) { - int n = row.size(); - while (n-- >= 0) { - rpcCompleted(); - } - } - for (final ArrayList row : rows) { - check(row); // Do this separate as it might throw. - } - } // else arg is null, we're done scanning. - return rows; - } - public String toString() { - return "continueScan on " + scanner; - } - }; - - AsyncScanTest(Configuration conf, TestOptions options, Status status) { - super(conf, options, status); - scanner = client().newScanner(tableName); - scanner.setStartKey(format(this.startRow)); - scanner.setFamily(FAMILY_NAME); - scanner.setQualifier(QUALIFIER_NAME); - } - - @Override - void testTimed() { - scanner.nextRows() - .addCallback(continueScan) - .addCallbacks(callback, errback); - } - - @Override - void testRow(final int i) { - // Unused because we completely redefined testTimed(). - } - } - static class SequentialReadTest extends Test { SequentialReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1351,20 +1083,6 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class AsyncSequentialReadTest extends AsyncTest { - AsyncSequentialReadTest(Configuration conf, TestOptions options, Status status) { - super(conf, options, status); - } - - @Override - void testRow(final int i) throws IOException { - final GetRequest get = new GetRequest(tableName, format(i)); - get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME); - client().get(get).addCallback(readCallback).addErrback(errback); - } - - } - static class SequentialWriteTest extends Test { SequentialWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1381,22 +1099,6 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class AsyncSequentialWriteTest extends AsyncTest { - AsyncSequentialWriteTest(Configuration conf, TestOptions options, Status status) { - super(conf, options, status); - } - - @Override - void testRow(final int i) { - final PutRequest put = new PutRequest(tableName, format(i), - FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand)); - put.setDurable(writeToWAL); - put.setBufferable(flushCommits); - client().put(put).addCallbacks(callback, errback); - } - - } - static class FilteredScanTest extends Test { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());