HBASE-15157 Add *PerformanceTest for Append, CheckAnd*

This commit is contained in:
stack 2016-02-05 11:18:42 -08:00
parent 7239056c78
commit 81d81c9839
2 changed files with 142 additions and 134 deletions

View File

@ -1,128 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
/**
* Simple Increments Performance Test. Run this from main. It is to go against a cluster.
* Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181,
* a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by
* default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as
* in -DtableName="newTableName". It prints out configuration it is running with at the start and
* on the end it prints out percentiles.
*/
public class IncrementPerformanceTest implements Tool {
private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class);
private static final byte [] QUALIFIER = new byte [] {'q'};
private Configuration conf;
private final MetricRegistry metrics = new MetricRegistry();
private static final String TABLENAME = "tableName";
private static final String COLUMN_FAMILY = "columnFamilyName";
private static final String THREAD_COUNT = "threadCount";
private static final int DEFAULT_THREAD_COUNT = 80;
private static final String INCREMENT_COUNT = "incrementCount";
private static final int DEFAULT_INCREMENT_COUNT = 10000;
IncrementPerformanceTest() {}
public int run(final String [] args) throws Exception {
Configuration conf = getConf();
final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME);
final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY));
int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT);
final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT);
LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" +
getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName +
", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount +
", incrementCount=" + incrementCount);
ExecutorService service = Executors.newFixedThreadPool(threadCount);
Set<Future<?>> futures = new HashSet<Future<?>>();
final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter
while (integer.incrementAndGet() <= threadCount) {
futures.add(service.submit(new Runnable() {
@Override
public void run() {
try {
try (Connection connection = ConnectionFactory.createConnection(getConf())) {
try (Table table = connection.getTable(tableName)) {
Timer timer = metrics.timer("increments");
for (int i = 0; i < incrementCount; i++) {
byte[] row = Bytes.toBytes(i);
Timer.Context context = timer.time();
try {
table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l);
} catch (IOException e) {
// swallow..it's a test.
} finally {
context.stop();
}
}
}
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}));
}
for(Future<?> future : futures) future.get();
service.shutdown();
Snapshot s = metrics.timer("increments").getSnapshot();
LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(),
s.get95thPercentile(), s.get99thPercentile()));
return 0;
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args));
}
}

View File

@ -49,19 +49,24 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterAllFilter; import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList;
@ -165,7 +170,17 @@ public class PerformanceEvaluation extends Configured implements Tool {
"Run scan test (read every row)"); "Run scan test (read every row)");
addCommandDescriptor(FilteredScanTest.class, "filterScan", addCommandDescriptor(FilteredScanTest.class, "filterScan",
"Run scan test using a filter to find a specific row based on it's value " + "Run scan test using a filter to find a specific row based on it's value " +
"(make sure to use --rows=20)"); "(make sure to use --rows=20)");
addCommandDescriptor(IncrementTest.class, "increment",
"Increment on each row; clients overlap on keyspace so some concurrent operations");
addCommandDescriptor(AppendTest.class, "append",
"Append on each row; clients overlap on keyspace so some concurrent operations");
addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
"CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
"CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
"CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
} }
/** /**
@ -1089,15 +1104,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
return (System.nanoTime() - startTime) / 1000000; return (System.nanoTime() - startTime) / 1000000;
} }
int getStartRow() {
return opts.startRow;
}
int getLastRow() {
return getStartRow() + opts.perClientRunRows;
}
/** /**
* Provides an extension point for tests that don't want a per row invocation. * Provides an extension point for tests that don't want a per row invocation.
*/ */
void testTimed() throws IOException, InterruptedException { void testTimed() throws IOException, InterruptedException {
int lastRow = opts.startRow + opts.perClientRunRows; int startRow = getStartRow();
int lastRow = getLastRow();
// Report on completion of 1/10th of total. // Report on completion of 1/10th of total.
for (int ii = 0; ii < opts.cycles; ii++) { for (int ii = 0; ii < opts.cycles; ii++) {
if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
for (int i = opts.startRow; i < lastRow; i++) { for (int i = startRow; i < lastRow; i++) {
if (i % everyN != 0) continue; if (i % everyN != 0) continue;
long startTime = System.nanoTime(); long startTime = System.nanoTime();
TraceScope scope = Trace.startSpan("test row", traceSampler); TraceScope scope = Trace.startSpan("test row", traceSampler);
@ -1106,15 +1130,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
} finally { } finally {
scope.close(); scope.close();
} }
if ( (i - opts.startRow) > opts.measureAfter) { if ( (i - startRow) > opts.measureAfter) {
latency.update((System.nanoTime() - startTime) / 1000); latency.update((System.nanoTime() - startTime) / 1000);
if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
status.setStatus(generateStatus(opts.startRow, i, lastRow)); status.setStatus(generateStatus(startRow, i, lastRow));
} }
} }
} }
} }
} }
/** /**
* report percentiles of latency * report percentiles of latency
* @throws IOException * @throws IOException
@ -1456,7 +1481,116 @@ public class PerformanceEvaluation extends Configured implements Tool {
Result r = testScanner.next(); Result r = testScanner.next();
updateValueSize(r); updateValueSize(r);
} }
}
/**
* Base class for operations that are CAS-like; that read a value and then set it based off what
* they read. In this category is increment, append, checkAndPut, etc.
*
* <p>These operations also want some concurrency going on. Usually when these tests run, they
* operate in their own part of the key range. In CASTest, we will have them all overlap on the
* same key space. We do this with our getStartRow and getLastRow overrides.
*/
static abstract class CASTableTest extends TableTest {
private final byte [] qualifier;
CASTableTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
qualifier = Bytes.toBytes(this.getClass().getSimpleName());
}
byte [] getQualifier() {
return this.qualifier;
}
@Override
int getStartRow() {
return 0;
}
@Override
int getLastRow() {
return opts.perClientRunRows;
}
}
static class IncrementTest extends CASTableTest {
IncrementTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
Increment increment = new Increment(format(i));
increment.addColumn(FAMILY_NAME, getQualifier(), 1l);
updateValueSize(this.table.increment(increment));
}
}
static class AppendTest extends CASTableTest {
AppendTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
byte [] bytes = format(i);
Append append = new Append(bytes);
append.add(FAMILY_NAME, getQualifier(), bytes);
updateValueSize(this.table.append(append));
}
}
static class CheckAndMutateTest extends CASTableTest {
CheckAndMutateTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
byte [] bytes = format(i);
// Put a known value so when we go to check it, it is there.
Put put = new Put(bytes);
put.addColumn(FAMILY_NAME, getQualifier(), bytes);
this.table.put(put);
RowMutations mutations = new RowMutations(bytes);
mutations.add(put);
this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes,
mutations);
}
}
static class CheckAndPutTest extends CASTableTest {
CheckAndPutTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
byte [] bytes = format(i);
// Put a known value so when we go to check it, it is there.
Put put = new Put(bytes);
put.addColumn(FAMILY_NAME, getQualifier(), bytes);
this.table.put(put);
this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, put);
}
}
static class CheckAndDeleteTest extends CASTableTest {
CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
byte [] bytes = format(i);
// Put a known value so when we go to check it, it is there.
Put put = new Put(bytes);
put.addColumn(FAMILY_NAME, getQualifier(), bytes);
this.table.put(put);
Delete delete = new Delete(put.getRow());
delete.addColumn(FAMILY_NAME, getQualifier());
this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, delete);
}
} }
static class SequentialReadTest extends TableTest { static class SequentialReadTest extends TableTest {
@ -1760,8 +1894,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
"clients (and HRegionServers)"); "clients (and HRegionServers)");
System.err.println(" running: 1 <= value <= 500"); System.err.println(" running: 1 <= value <= 500");
System.err.println("Examples:"); System.err.println("Examples:");
System.err.println(" To run a single evaluation client:"); System.err.println(" To run a single client doing the default 1M sequentialWrites:");
System.err.println(" $ bin/hbase " + className + " sequentialWrite 1"); System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
System.err.println(" To run 10 clients doing increments over ten rows:");
System.err.println(" $ bin/hbase " + className + " --rows=10 --nomapred increment 10");
} }
/** /**