From 2a9cdd5e75ab6e51e9ca0539caba29cc4350b98e Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 12 Jul 2017 15:37:55 +0800 Subject: [PATCH] HBASE-17994 Add async client test to Performance Evaluation tool --- .../hadoop/hbase/PerformanceEvaluation.java | 390 ++++++++++++++++-- 1 file changed, 351 insertions(+), 39 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 162e76133f6..eebb0f3967a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -53,6 +53,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RawAsyncTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RowMutations; @@ -99,9 +102,9 @@ import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import org.apache.htrace.impl.ProbabilitySampler; - import org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; + import com.codahale.metrics.Histogram; import com.codahale.metrics.UniformReservoir; @@ -153,6 +156,16 @@ public class PerformanceEvaluation extends Configured implements Tool { private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); static { + addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", + "Run async random read test"); + addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", + "Run async random write test"); + addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", + "Run async sequential read test"); + addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", + "Run async sequential write test"); + addCommandDescriptor(AsyncScanTest.class, "asyncScan", + "Run async scan test (read every row)"); addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, @@ -226,7 +239,7 @@ public class PerformanceEvaluation extends Configured implements Tool { super(conf); } - protected static void addCommandDescriptor(Class cmdClass, + protected static void addCommandDescriptor(Class cmdClass, String name, String description) { CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); COMMANDS.put(name, cmdDescriptor); @@ -295,9 +308,15 @@ public class PerformanceEvaluation extends Configured implements Tool { TestOptions opts = mapper.readValue(value.toString(), TestOptions.class); Configuration conf = HBaseConfiguration.create(context.getConfiguration()); final Connection con = ConnectionFactory.createConnection(conf); + AsyncConnection asyncCon = null; + try { + asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); + } catch (ExecutionException e) { + throw new IOException(e); + } // Evaluation task - RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status); + RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); @@ -412,8 +431,8 @@ public class PerformanceEvaluation extends Configured implements Tool { * Run all clients in this vm each to its own thread. */ static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) - throws IOException, InterruptedException { - final Class cmd = determineCommandClass(opts.cmdName); + throws IOException, InterruptedException, ExecutionException { + final Class cmd = determineCommandClass(opts.cmdName); assert cmd != null; @SuppressWarnings("unchecked") Future[] threads = new Future[opts.numClientThreads]; @@ -421,6 +440,7 @@ public class PerformanceEvaluation extends Configured implements Tool { ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); final Connection con = ConnectionFactory.createConnection(conf); + final AsyncConnection asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); for (int i = 0; i < threads.length; i++) { final int index = i; threads[i] = pool.submit(new Callable() { @@ -428,7 +448,7 @@ public class PerformanceEvaluation extends Configured implements Tool { public RunResult call() throws Exception { TestOptions threadOpts = new TestOptions(opts); if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; - RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() { + RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { @Override public void setStatus(final String msg) throws IOException { LOG.info(msg); @@ -463,6 +483,7 @@ public class PerformanceEvaluation extends Configured implements Tool { + "\tAvg: " + (total / results.length) + "ms"); con.close(); + asyncCon.close(); return results; } @@ -476,7 +497,7 @@ public class PerformanceEvaluation extends Configured implements Tool { */ static Job doMapReduce(TestOptions opts, final Configuration conf) throws IOException, InterruptedException, ClassNotFoundException { - final Class cmd = determineCommandClass(opts.cmdName); + final Class cmd = determineCommandClass(opts.cmdName); assert cmd != null; Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); @@ -567,17 +588,17 @@ public class PerformanceEvaluation extends Configured implements Tool { * Describes a command. */ static class CmdDescriptor { - private Class cmdClass; + private Class cmdClass; private String name; private String description; - CmdDescriptor(Class cmdClass, String name, String description) { + CmdDescriptor(Class cmdClass, String name, String description) { this.cmdClass = cmdClass; this.name = name; this.description = description; } - public Class getCmdClass() { + public Class getCmdClass() { return cmdClass; } @@ -1001,7 +1022,7 @@ public class PerformanceEvaluation extends Configured implements Tool { * A test. * Subclass to particularize what happens per row. */ - static abstract class Test { + static abstract class TestBase { // Below is make it so when Tests are all running in the one // jvm, that they each have a differently seeded Random. private static final Random randomSeed = new Random(System.currentTimeMillis()); @@ -1018,8 +1039,6 @@ public class PerformanceEvaluation extends Configured implements Tool { private final Status status; private final Sampler traceSampler; private final SpanReceiverHost receiverHost; - protected Connection connection; -// protected Table table; private String testName; private Histogram latencyHistogram; @@ -1030,9 +1049,8 @@ public class PerformanceEvaluation extends Configured implements Tool { * Note that all subclasses of this class must provide a public constructor * that has the exact same list of arguments. */ - Test(final Connection con, final TestOptions options, final Status status) { - this.connection = con; - this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration(); + TestBase(final Configuration conf, final TestOptions options, final Status status) { + this.conf = conf; this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf); this.opts = options; this.status = status; @@ -1098,14 +1116,14 @@ public class PerformanceEvaluation extends Configured implements Tool { } void testSetup() throws IOException { - if (!opts.oneCon) { - this.connection = ConnectionFactory.createConnection(conf); - } + createConnection(); onStartup(); latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); } + abstract void createConnection() throws IOException; + abstract void onStartup() throws IOException; void testTakedown() throws IOException { @@ -1124,14 +1142,14 @@ public class PerformanceEvaluation extends Configured implements Tool { status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); } - if (!opts.oneCon) { - connection.close(); - } + closeConnection(); receiverHost.closeReceivers(); } abstract void onTakedown() throws IOException; + abstract void closeConnection() throws IOException; + /* * Run test * @return Elapsed time. @@ -1211,6 +1229,56 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException, InterruptedException; } + static abstract class Test extends TestBase { + protected Connection connection; + + Test(final Connection con, final TestOptions options, final Status status) { + super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); + this.connection = con; + } + + @Override + void createConnection() throws IOException { + if (!opts.isOneCon()) { + this.connection = ConnectionFactory.createConnection(conf); + } + } + + @Override + void closeConnection() throws IOException { + if (!opts.isOneCon()) { + this.connection.close(); + } + } + } + + static abstract class AsyncTest extends TestBase { + protected AsyncConnection connection; + + AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { + super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); + this.connection = con; + } + + @Override + void createConnection() { + if (!opts.isOneCon()) { + try { + this.connection = ConnectionFactory.createAsyncConnection(conf).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to create async connection", e); + } + } + } + + @Override + void closeConnection() throws IOException { + if (!opts.isOneCon()) { + this.connection.close(); + } + } + } + static abstract class TableTest extends Test { protected Table table; @@ -1229,6 +1297,242 @@ public class PerformanceEvaluation extends Configured implements Tool { } } + static abstract class AsyncTableTest extends AsyncTest { + protected RawAsyncTable table; + + AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.table = connection.getRawTable(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + } + } + + static class AsyncRandomReadTest extends AsyncTableTest { + private final Consistency consistency; + private ArrayList gets; + private Random rd = new Random(); + + AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { + super(con, options, status); + consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; + if (opts.multiGet > 0) { + LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); + this.gets = new ArrayList<>(opts.multiGet); + } + } + + @Override + void testRow(final int i) throws IOException, InterruptedException { + if (opts.randomSleep > 0) { + Thread.sleep(rd.nextInt(opts.randomSleep)); + } + Get get = new Get(getRandomRow(this.rand, opts.totalRows)); + if (opts.addColumns) { + get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + } else { + get.addFamily(FAMILY_NAME); + } + if (opts.filterAll) { + get.setFilter(new FilterAllFilter()); + } + get.setConsistency(consistency); + if (LOG.isTraceEnabled()) LOG.trace(get.toString()); + try { + if (opts.multiGet > 0) { + this.gets.add(get); + if (this.gets.size() == opts.multiGet) { + Result[] rs = + this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); + updateValueSize(rs); + this.gets.clear(); + } + } else { + updateValueSize(this.table.get(get).get()); + } + } catch (ExecutionException e) { + throw new IOException(e); + } + } + + public static RuntimeException runtime(Throwable e) { + if (e instanceof RuntimeException) { + return (RuntimeException) e; + } + return new RuntimeException(e); + } + + public static V propagate(Callable callable) { + try { + return callable.call(); + } catch (Exception e) { + throw runtime(e); + } + } + + @Override + protected int getReportingPeriod() { + int period = opts.perClientRunRows / 10; + return period == 0 ? opts.perClientRunRows : period; + } + + @Override + protected void testTakedown() throws IOException { + if (this.gets != null && this.gets.size() > 0) { + this.table.get(gets); + this.gets.clear(); + } + super.testTakedown(); + } + } + + static class AsyncRandomWriteTest extends AsyncTableTest { + AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void testRow(final int i) throws IOException, InterruptedException { + byte[] row = getRandomRow(this.rand, opts.totalRows); + Put put = new Put(row); + for (int column = 0; column < opts.columns; column++) { + byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); + byte[] value = generateData(this.rand, getValueLength(this.rand)); + if (opts.useTags) { + byte[] tag = generateData(this.rand, TAG_LENGTH); + Tag[] tags = new Tag[opts.noOfTags]; + for (int n = 0; n < opts.noOfTags; n++) { + Tag t = new ArrayBackedTag((byte) n, tag); + tags[n] = t; + } + KeyValue kv = + new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); + put.add(kv); + updateValueSize(kv.getValueLength()); + } else { + put.addColumn(FAMILY_NAME, qualifier, value); + updateValueSize(value.length); + } + } + put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + try { + table.put(put).get(); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + } + + static class AsyncScanTest extends AsyncTableTest { + private ResultScanner testScanner; + private AsyncTable asyncTable; + + AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.asyncTable = + connection.getTable(TableName.valueOf(opts.tableName), + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); + } + + @Override + void testTakedown() throws IOException { + if (this.testScanner != null) { + this.testScanner.close(); + } + super.testTakedown(); + } + + @Override + void testRow(final int i) throws IOException { + if (this.testScanner == null) { + Scan scan = + new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) + .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) + .setReadType(opts.scanReadType); + if (opts.addColumns) { + scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + } else { + scan.addFamily(FAMILY_NAME); + } + if (opts.filterAll) { + scan.setFilter(new FilterAllFilter()); + } + this.testScanner = asyncTable.getScanner(scan); + } + Result r = testScanner.next(); + updateValueSize(r); + } + } + + static class AsyncSequentialReadTest extends AsyncTableTest { + AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void testRow(final int i) throws IOException, InterruptedException { + Get get = new Get(format(i)); + if (opts.addColumns) { + get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + } + if (opts.filterAll) { + get.setFilter(new FilterAllFilter()); + } + try { + updateValueSize(table.get(get).get()); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + } + + static class AsyncSequentialWriteTest extends AsyncTableTest { + AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void testRow(final int i) throws IOException, InterruptedException { + byte[] row = format(i); + Put put = new Put(row); + for (int column = 0; column < opts.columns; column++) { + byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); + byte[] value = generateData(this.rand, getValueLength(this.rand)); + if (opts.useTags) { + byte[] tag = generateData(this.rand, TAG_LENGTH); + Tag[] tags = new Tag[opts.noOfTags]; + for (int n = 0; n < opts.noOfTags; n++) { + Tag t = new ArrayBackedTag((byte) n, tag); + tags[n] = t; + } + KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, + value, tags); + put.add(kv); + updateValueSize(kv.getValueLength()); + } else { + put.addColumn(FAMILY_NAME, qualifier, value); + updateValueSize(value.length); + } + } + put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + try { + table.put(put).get(); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + } + static abstract class BufferedMutatorTest extends Test { protected BufferedMutator mutator; protected Table table; @@ -1789,23 +2093,31 @@ public class PerformanceEvaluation extends Configured implements Tool { return random.nextInt(Integer.MAX_VALUE) % totalRows; } - static RunResult runOneClient(final Class cmd, Configuration conf, Connection con, - TestOptions opts, final Status status) + static RunResult runOneClient(final Class cmd, Configuration conf, + Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) throws IOException, InterruptedException { - status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " + - opts.perClientRunRows + " rows"); + status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " + + opts.perClientRunRows + " rows"); long totalElapsedTime; - final Test t; + final TestBase t; try { - Constructor constructor = - cmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); - t = constructor.newInstance(con, opts, status); + if (AsyncTest.class.isAssignableFrom(cmd)) { + Class newCmd = (Class) cmd; + Constructor constructor = + newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); + t = constructor.newInstance(asyncCon, opts, status); + } else { + Class newCmd = (Class) cmd; + Constructor constructor = + newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); + t = constructor.newInstance(con, opts, status); + } } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("Invalid command class: " + - cmd.getName() + ". It does not provide a constructor as described by " + - "the javadoc comment. Available constructors are: " + - Arrays.toString(cmd.getConstructors())); + throw new IllegalArgumentException("Invalid command class: " + cmd.getName() + + ". It does not provide a constructor as described by " + + "the javadoc comment. Available constructors are: " + + Arrays.toString(cmd.getConstructors())); } catch (Exception e) { throw new IllegalStateException("Failed to construct command class", e); } @@ -1823,8 +2135,8 @@ public class PerformanceEvaluation extends Configured implements Tool { return opts.valueRandom? opts.valueSize/2: opts.valueSize; } - private void runTest(final Class cmd, TestOptions opts) throws IOException, - InterruptedException, ClassNotFoundException { + private void runTest(final Class cmd, TestOptions opts) throws IOException, + InterruptedException, ClassNotFoundException, ExecutionException { // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do // the TestOptions introspection for us and dump the output in a readable format. LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts)); @@ -1944,7 +2256,7 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(); System.err.println("Command:"); for (CmdDescriptor command : COMMANDS.values()) { - System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); + System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); } System.err.println(); System.err.println("Args:"); @@ -2285,7 +2597,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return errCode; } - Class cmdClass = determineCommandClass(opts.cmdName); + Class cmdClass = determineCommandClass(opts.cmdName); if (cmdClass != null) { runTest(cmdClass, opts); errCode = 0; @@ -2302,7 +2614,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return COMMANDS.containsKey(cmd); } - private static Class determineCommandClass(String cmd) { + private static Class determineCommandClass(String cmd) { CmdDescriptor descriptor = COMMANDS.get(cmd); return descriptor != null ? descriptor.getCmdClass() : null; }