diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java deleted file mode 100644 index aed3d0a8e2f..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; - - -/** - * Simple Increments Performance Test. Run this from main. It is to go against a cluster. - * Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181, - * a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by - * default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as - * in -DtableName="newTableName". It prints out configuration it is running with at the start and - * on the end it prints out percentiles. - */ -public class IncrementPerformanceTest implements Tool { - private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class); - private static final byte [] QUALIFIER = new byte [] {'q'}; - private Configuration conf; - private final MetricRegistry metrics = new MetricRegistry(); - private static final String TABLENAME = "tableName"; - private static final String COLUMN_FAMILY = "columnFamilyName"; - private static final String THREAD_COUNT = "threadCount"; - private static final int DEFAULT_THREAD_COUNT = 80; - private static final String INCREMENT_COUNT = "incrementCount"; - private static final int DEFAULT_INCREMENT_COUNT = 10000; - - IncrementPerformanceTest() {} - - public int run(final String [] args) throws Exception { - Configuration conf = getConf(); - final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME); - final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY)); - int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT); - final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT); - LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" + - getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName + - ", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount + - ", incrementCount=" + incrementCount); - - ExecutorService service = Executors.newFixedThreadPool(threadCount); - Set> futures = new HashSet>(); - final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter - while (integer.incrementAndGet() <= threadCount) { - futures.add(service.submit(new Runnable() { - @Override - public void run() { - try { - try (Connection connection = ConnectionFactory.createConnection(getConf())) { - try (Table table = connection.getTable(tableName)) { - Timer timer = metrics.timer("increments"); - for (int i = 0; i < incrementCount; i++) { - byte[] row = Bytes.toBytes(i); - Timer.Context context = timer.time(); - try { - table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l); - } catch (IOException e) { - // swallow..it's a test. - } finally { - context.stop(); - } - } - } - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - })); - } - - for(Future future : futures) future.get(); - service.shutdown(); - Snapshot s = metrics.timer("increments").getSnapshot(); - LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(), - s.get95thPercentile(), s.get99thPercentile())); - return 0; - } - - @Override - public Configuration getConf() { - return this.conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - public static void main(String[] args) throws Exception { - System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args)); - } -} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 821b995520c..651bc86908a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -49,19 +49,24 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterAllFilter; import org.apache.hadoop.hbase.filter.FilterList; @@ -165,7 +170,17 @@ public class PerformanceEvaluation extends Configured implements Tool { "Run scan test (read every row)"); addCommandDescriptor(FilteredScanTest.class, "filterScan", "Run scan test using a filter to find a specific row based on it's value " + - "(make sure to use --rows=20)"); + "(make sure to use --rows=20)"); + addCommandDescriptor(IncrementTest.class, "increment", + "Increment on each row; clients overlap on keyspace so some concurrent operations"); + addCommandDescriptor(AppendTest.class, "append", + "Append on each row; clients overlap on keyspace so some concurrent operations"); + addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", + "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); + addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", + "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); + addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", + "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); } /** @@ -1089,15 +1104,24 @@ public class PerformanceEvaluation extends Configured implements Tool { return (System.nanoTime() - startTime) / 1000000; } + int getStartRow() { + return opts.startRow; + } + + int getLastRow() { + return getStartRow() + opts.perClientRunRows; + } + /** * Provides an extension point for tests that don't want a per row invocation. */ void testTimed() throws IOException, InterruptedException { - int lastRow = opts.startRow + opts.perClientRunRows; + int startRow = getStartRow(); + int lastRow = getLastRow(); // Report on completion of 1/10th of total. for (int ii = 0; ii < opts.cycles; ii++) { if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); - for (int i = opts.startRow; i < lastRow; i++) { + for (int i = startRow; i < lastRow; i++) { if (i % everyN != 0) continue; long startTime = System.nanoTime(); TraceScope scope = Trace.startSpan("test row", traceSampler); @@ -1106,15 +1130,16 @@ public class PerformanceEvaluation extends Configured implements Tool { } finally { scope.close(); } - if ( (i - opts.startRow) > opts.measureAfter) { + if ( (i - startRow) > opts.measureAfter) { latency.update((System.nanoTime() - startTime) / 1000); if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { - status.setStatus(generateStatus(opts.startRow, i, lastRow)); + status.setStatus(generateStatus(startRow, i, lastRow)); } } } } } + /** * report percentiles of latency * @throws IOException @@ -1456,7 +1481,116 @@ public class PerformanceEvaluation extends Configured implements Tool { Result r = testScanner.next(); updateValueSize(r); } + } + /** + * Base class for operations that are CAS-like; that read a value and then set it based off what + * they read. In this category is increment, append, checkAndPut, etc. + * + *

These operations also want some concurrency going on. Usually when these tests run, they + * operate in their own part of the key range. In CASTest, we will have them all overlap on the + * same key space. We do this with our getStartRow and getLastRow overrides. + */ + static abstract class CASTableTest extends TableTest { + private final byte [] qualifier; + CASTableTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + qualifier = Bytes.toBytes(this.getClass().getSimpleName()); + } + + byte [] getQualifier() { + return this.qualifier; + } + + @Override + int getStartRow() { + return 0; + } + + @Override + int getLastRow() { + return opts.perClientRunRows; + } + } + + static class IncrementTest extends CASTableTest { + IncrementTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void testRow(final int i) throws IOException { + Increment increment = new Increment(format(i)); + increment.addColumn(FAMILY_NAME, getQualifier(), 1l); + updateValueSize(this.table.increment(increment)); + } + } + + static class AppendTest extends CASTableTest { + AppendTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void testRow(final int i) throws IOException { + byte [] bytes = format(i); + Append append = new Append(bytes); + append.add(FAMILY_NAME, getQualifier(), bytes); + updateValueSize(this.table.append(append)); + } + } + + static class CheckAndMutateTest extends CASTableTest { + CheckAndMutateTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void testRow(final int i) throws IOException { + byte [] bytes = format(i); + // Put a known value so when we go to check it, it is there. + Put put = new Put(bytes); + put.addColumn(FAMILY_NAME, getQualifier(), bytes); + this.table.put(put); + RowMutations mutations = new RowMutations(bytes); + mutations.add(put); + this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, + mutations); + } + } + + static class CheckAndPutTest extends CASTableTest { + CheckAndPutTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void testRow(final int i) throws IOException { + byte [] bytes = format(i); + // Put a known value so when we go to check it, it is there. + Put put = new Put(bytes); + put.addColumn(FAMILY_NAME, getQualifier(), bytes); + this.table.put(put); + this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, put); + } + } + + static class CheckAndDeleteTest extends CASTableTest { + CheckAndDeleteTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void testRow(final int i) throws IOException { + byte [] bytes = format(i); + // Put a known value so when we go to check it, it is there. + Put put = new Put(bytes); + put.addColumn(FAMILY_NAME, getQualifier(), bytes); + this.table.put(put); + Delete delete = new Delete(put.getRow()); + delete.addColumn(FAMILY_NAME, getQualifier()); + this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, delete); + } } static class SequentialReadTest extends TableTest { @@ -1760,8 +1894,10 @@ public class PerformanceEvaluation extends Configured implements Tool { "clients (and HRegionServers)"); System.err.println(" running: 1 <= value <= 500"); System.err.println("Examples:"); - System.err.println(" To run a single evaluation client:"); + System.err.println(" To run a single client doing the default 1M sequentialWrites:"); System.err.println(" $ bin/hbase " + className + " sequentialWrite 1"); + System.err.println(" To run 10 clients doing increments over ten rows:"); + System.err.println(" $ bin/hbase " + className + " --rows=10 --nomapred increment 10"); } /**