diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 965fb187c90..b4dab24b7e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.Constructor; @@ -30,20 +28,21 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Durability; @@ -70,21 +69,17 @@ import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.hbase.util.MurmurHash; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; -import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.codehaus.jackson.map.ObjectMapper; + +import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY; /** * Script used evaluating HBase performance and scalability. Runs a HBase @@ -105,7 +100,7 @@ import org.apache.hadoop.util.ToolRunner; public class PerformanceEvaluation extends Configured implements Tool { protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); - public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); + public static final String TABLE_NAME = "TestTable"; public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); public static final int VALUE_LENGTH = 1000; @@ -119,44 +114,12 @@ public class PerformanceEvaluation extends Configured implements Tool { private static final MathContext CXT = MathContext.DECIMAL64; private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); + private static final TestOptions DEFAULT_OPTS = new TestOptions(); - protected HTableDescriptor TABLE_DESCRIPTOR; protected Map commands = new TreeMap(); - private boolean nomapred = false; - private int N = 1; - private int R = ROWS_PER_GB; - private float sampleRate = 1.0f; - private TableName tableName = TABLE_NAME; - private Compression.Algorithm compression = Compression.Algorithm.NONE; - private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; - private boolean flushCommits = true; - private boolean writeToWAL = true; - private boolean inMemoryCF = false; - private boolean reportLatency = false; - private int presplitRegions = 0; - private boolean useTags = false; - private int noOfTags = 1; - private int multiGet = 0; - private HConnection connection; - private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); - /** Regex to parse lines in input file passed to mapreduce task. */ - public static final Pattern LINE_PATTERN = - Pattern.compile("tableName=(\\w+),\\s+" + - "startRow=(\\d+),\\s+" + - "perClientRunRows=(\\d+),\\s+" + - "totalRows=(\\d+),\\s+" + - "sampleRate=([-+]?[0-9]*\\.?[0-9]+),\\s+" + - "clients=(\\d+),\\s+" + - "flushCommits=(\\w+),\\s+" + - "writeToWAL=(\\w+),\\s+" + - "useTags=(\\w+),\\s+" + - "noOfTags=(\\d+),\\s+" + - "reportLatency=(\\w+),\\s+" + - "multiGet=(\\d+)"); - /** * Enum for map metrics. Keep it out here rather than inside in the Map * inner-class so we can find associated properties. @@ -219,273 +182,11 @@ public class PerformanceEvaluation extends Configured implements Tool { void setStatus(final String msg) throws IOException; } - /** - * This class works as the InputSplit of Performance Evaluation - * MapReduce InputFormat, and the Record Value of RecordReader. - * Each map task will only read one record from a PeInputSplit, - * the record value is the PeInputSplit itself. - */ - public static class PeInputSplit extends InputSplit implements Writable { - private TableName tableName = TABLE_NAME; - private int startRow = 0; - private int rows = 0; - private int totalRows = 0; - private float sampleRate = 1.0f; - private int clients = 0; - private boolean flushCommits = false; - private boolean writeToWAL = true; - private boolean useTags = false; - private int noOfTags = 0; - private boolean reportLatency = false; - private int multiGet = 0; - - public PeInputSplit() {} - - public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, - float sampleRate, int clients, boolean flushCommits, boolean writeToWAL, - boolean useTags, int noOfTags, boolean reportLatency, int multiGet) { - this.tableName = tableName; - this.startRow = startRow; - this.rows = rows; - this.totalRows = totalRows; - this.sampleRate = sampleRate; - this.clients = clients; - this.flushCommits = flushCommits; - this.writeToWAL = writeToWAL; - this.useTags = useTags; - this.noOfTags = noOfTags; - this.reportLatency = reportLatency; - this.multiGet = multiGet; - } - - @Override - public void readFields(DataInput in) throws IOException { - int tableNameLen = in.readInt(); - byte[] name = new byte[tableNameLen]; - in.readFully(name); - this.tableName = TableName.valueOf(name); - - this.startRow = in.readInt(); - this.rows = in.readInt(); - this.totalRows = in.readInt(); - this.sampleRate = in.readFloat(); - this.clients = in.readInt(); - this.flushCommits = in.readBoolean(); - this.writeToWAL = in.readBoolean(); - this.useTags = in.readBoolean(); - this.noOfTags = in.readInt(); - this.reportLatency = in.readBoolean(); - this.multiGet = in.readInt(); - } - - @Override - public void write(DataOutput out) throws IOException { - byte[] name = this.tableName.toBytes(); - out.writeInt(name.length); - out.write(name); - out.writeInt(startRow); - out.writeInt(rows); - out.writeInt(totalRows); - out.writeFloat(sampleRate); - out.writeInt(clients); - out.writeBoolean(flushCommits); - out.writeBoolean(writeToWAL); - out.writeBoolean(useTags); - out.writeInt(noOfTags); - out.writeBoolean(reportLatency); - out.writeInt(multiGet); - } - - @Override - public long getLength() throws IOException, InterruptedException { - return 0; - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - return new String[0]; - } - - public TableName getTableName() { - return tableName; - } - - public int getStartRow() { - return startRow; - } - - public int getRows() { - return rows; - } - - public int getTotalRows() { - return totalRows; - } - - public float getSampleRate() { - return sampleRate; - } - - public int getClients() { - return clients; - } - - public boolean isFlushCommits() { - return flushCommits; - } - - public boolean isWriteToWAL() { - return writeToWAL; - } - - public boolean isUseTags() { - return useTags; - } - - public int getNoOfTags() { - return noOfTags; - } - - public boolean isReportLatency() { - return reportLatency; - } - - public int getMultiGet() { - return multiGet; - } - } - - /** - * InputFormat of Performance Evaluation MapReduce job. - * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). - */ - public static class PeInputFormat extends FileInputFormat { - - @Override - public List getSplits(JobContext job) throws IOException { - // generate splits - List splitList = new ArrayList(); - - for (FileStatus file: listStatus(job)) { - if (file.isDir()) { - continue; - } - Path path = file.getPath(); - FileSystem fs = path.getFileSystem(job.getConfiguration()); - FSDataInputStream fileIn = fs.open(path); - LineReader in = new LineReader(fileIn, job.getConfiguration()); - int lineLen = 0; - while(true) { - Text lineText = new Text(); - lineLen = in.readLine(lineText); - if(lineLen <= 0) { - break; - } - Matcher m = LINE_PATTERN.matcher(lineText.toString()); - if((m != null) && m.matches()) { - TableName tableName = TableName.valueOf(m.group(1)); - int startRow = Integer.parseInt(m.group(2)); - int rows = Integer.parseInt(m.group(3)); - int totalRows = Integer.parseInt(m.group(4)); - float sampleRate = Float.parseFloat(m.group(5)); - int clients = Integer.parseInt(m.group(6)); - boolean flushCommits = Boolean.parseBoolean(m.group(7)); - boolean writeToWAL = Boolean.parseBoolean(m.group(8)); - boolean useTags = Boolean.parseBoolean(m.group(9)); - int noOfTags = Integer.parseInt(m.group(10)); - boolean reportLatency = Boolean.parseBoolean(m.group(11)); - int multiGet = Integer.parseInt(m.group(12)); - - LOG.debug("tableName=" + tableName + - " split["+ splitList.size() + "] " + - " startRow=" + startRow + - " rows=" + rows + - " totalRows=" + totalRows + - " sampleRate=" + sampleRate + - " clients=" + clients + - " flushCommits=" + flushCommits + - " writeToWAL=" + writeToWAL + - " useTags=" + useTags + - " noOfTags=" + noOfTags + - " reportLatency=" + reportLatency + - " multiGet=" + multiGet); - - PeInputSplit newSplit = - new PeInputSplit(tableName, startRow, rows, totalRows, sampleRate, clients, - flushCommits, writeToWAL, useTags, noOfTags, reportLatency, multiGet); - splitList.add(newSplit); - } - } - in.close(); - } - - LOG.info("Total # of splits: " + splitList.size()); - return splitList; - } - - @Override - public RecordReader createRecordReader(InputSplit split, - TaskAttemptContext context) { - return new PeRecordReader(); - } - - public static class PeRecordReader extends RecordReader { - private boolean readOver = false; - private PeInputSplit split = null; - private NullWritable key = null; - private PeInputSplit value = null; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - this.readOver = false; - this.split = (PeInputSplit)split; - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if(readOver) { - return false; - } - - key = NullWritable.get(); - value = split; - - readOver = true; - return true; - } - - @Override - public NullWritable getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public PeInputSplit getCurrentValue() throws IOException, InterruptedException { - return value; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - if(readOver) { - return 1.0f; - } else { - return 0.0f; - } - } - - @Override - public void close() throws IOException { - // do nothing - } - } - } - /** * MapReduce job that runs a performance evaluation client in each map task. */ public static class EvaluationMapTask - extends Mapper { + extends Mapper { /** configuration parameter name that contains the command */ public final static String CMD_KEY = "EvaluationMapTask.command"; @@ -512,16 +213,14 @@ public class PerformanceEvaluation extends Configured implements Tool { } private Class forName(String className, Class type) { - Class clazz = null; try { - clazz = Class.forName(className).asSubclass(type); + return Class.forName(className).asSubclass(type); } catch (ClassNotFoundException e) { throw new IllegalStateException("Could not find class for name: " + className, e); } - return clazz; } - protected void map(NullWritable key, PeInputSplit value, final Context context) + protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException { Status status = new Status() { @@ -530,18 +229,17 @@ public class PerformanceEvaluation extends Configured implements Tool { } }; + ObjectMapper mapper = new ObjectMapper(); + TestOptions opts = mapper.readValue(value.toString(), TestOptions.class); + Configuration conf = HBaseConfiguration.create(context.getConfiguration()); + // Evaluation task - pe.tableName = value.getTableName(); - long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), - value.getRows(), value.getTotalRows(), value.getSampleRate(), - value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), - value.getNoOfTags(), value.isReportLatency(), value.getMultiGet(), - HConnectionManager.createConnection(context.getConfiguration()), status); + long elapsedTime = this.pe.runOneClient(this.cmd, conf, opts, status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); - context.getCounter(Counter.ROWS).increment(value.rows); - context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); + context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); + context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime)); context.progress(); } } @@ -552,21 +250,21 @@ public class PerformanceEvaluation extends Configured implements Tool { * @return True if we created the table. * @throws IOException */ - private boolean checkTable(HBaseAdmin admin) throws IOException { - HTableDescriptor tableDescriptor = getTableDescriptor(); - if (this.presplitRegions > 0) { + private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException { + HTableDescriptor tableDescriptor = getTableDescriptor(opts); + if (opts.presplitRegions > 0) { // presplit requested if (admin.tableExists(tableDescriptor.getTableName())) { admin.disableTable(tableDescriptor.getTableName()); admin.deleteTable(tableDescriptor.getTableName()); } - byte[][] splits = getSplits(); + byte[][] splits = getSplits(opts); for (int i=0; i < splits.length; i++) { LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); } admin.createTable(tableDescriptor, splits); - LOG.info ("Table created with " + this.presplitRegions + " splits"); + LOG.info ("Table created with " + opts.presplitRegions + " splits"); } else { boolean tableExists = admin.tableExists(tableDescriptor.getTableName()); @@ -578,124 +276,73 @@ public class PerformanceEvaluation extends Configured implements Tool { return admin.tableExists(tableDescriptor.getTableName()); } - protected HTableDescriptor getTableDescriptor() { - if (TABLE_DESCRIPTOR == null) { - TABLE_DESCRIPTOR = new HTableDescriptor(tableName); - HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); - family.setDataBlockEncoding(blockEncoding); - family.setCompressionType(compression); - if (inMemoryCF) { - family.setInMemory(true); - } - TABLE_DESCRIPTOR.addFamily(family); + /** + * Create an HTableDescriptor from provided TestOptions. + */ + protected static HTableDescriptor getTableDescriptor(TestOptions opts) { + HTableDescriptor desc = new HTableDescriptor(opts.tableName); + HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); + family.setDataBlockEncoding(opts.blockEncoding); + family.setCompressionType(opts.compression); + if (opts.inMemoryCF) { + family.setInMemory(true); } - return TABLE_DESCRIPTOR; + desc.addFamily(family); + return desc; } /** * generates splits based on total number of rows and specified split regions - * - * @return splits : array of byte [] */ - protected byte[][] getSplits() { - if (this.presplitRegions == 0) + protected static byte[][] getSplits(TestOptions opts) { + if (opts.presplitRegions == 0) return new byte [0][]; - int numSplitPoints = presplitRegions - 1; + int numSplitPoints = opts.presplitRegions - 1; byte[][] splits = new byte[numSplitPoints][]; - int jump = this.R / this.presplitRegions; - for (int i=0; i < numSplitPoints; i++) { + int jump = opts.totalRows / opts.presplitRegions; + for (int i = 0; i < numSplitPoints; i++) { int rowkey = jump * (1 + i); splits[i] = format(rowkey); } return splits; } - /* - * We're to run multiple clients concurrently. Setup a mapreduce job. Run - * one map per client. Then run a single reduce to sum the elapsed times. - * @param cmd Command to run. - * @throws IOException - */ - private void runNIsMoreThanOne(final Class cmd) - throws IOException, InterruptedException, ClassNotFoundException { - checkTable(new HBaseAdmin(getConf())); - if (this.nomapred) { - doMultipleClients(cmd); - } else { - doMapReduce(cmd); - } - } - /* * Run all clients in this vm each to its own thread. * @param cmd Command to run. * @throws IOException */ - private void doMultipleClients(final Class cmd) throws IOException { - final List threads = new ArrayList(this.N); - final long[] timings = new long[this.N]; - final int perClientRows = R/N; - final float sampleRate = this.sampleRate; - final TableName tableName = this.tableName; - final DataBlockEncoding encoding = this.blockEncoding; - final boolean flushCommits = this.flushCommits; - final Compression.Algorithm compression = this.compression; - final boolean writeToWal = this.writeToWAL; - final boolean reportLatency = this.reportLatency; - final int preSplitRegions = this.presplitRegions; - final boolean useTags = this.useTags; - final int numTags = this.noOfTags; - final int multiGet = this.multiGet; - final HConnection connection = HConnectionManager.createConnection(getConf()); - for (int i = 0; i < this.N; i++) { + private void doLocalClients(final Class cmd, final TestOptions opts) + throws IOException, InterruptedException { + Future[] threads = new Future[opts.numClientThreads]; + long[] timings = new long[opts.numClientThreads]; + ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, + new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); + for (int i = 0; i < threads.length; i++) { final int index = i; - Thread t = new Thread ("TestClient-" + i) { + threads[i] = pool.submit(new Callable() { @Override - public void run() { - super.run(); - PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); - pe.tableName = tableName; - pe.blockEncoding = encoding; - pe.flushCommits = flushCommits; - pe.compression = compression; - pe.writeToWAL = writeToWal; - pe.presplitRegions = preSplitRegions; - pe.N = N; - pe.sampleRate = sampleRate; - pe.reportLatency = reportLatency; - pe.connection = connection; - pe.useTags = useTags; - pe.noOfTags = numTags; - pe.multiGet = multiGet; - try { - long elapsedTime = pe.runOneClient(cmd, index * perClientRows, - perClientRows, R, sampleRate, flushCommits, writeToWal, useTags, - noOfTags, reportLatency, multiGet, connection, new Status() { - public void setStatus(final String msg) throws IOException { - LOG.info("client-" + getName() + " " + msg); - } - }); - timings[index] = elapsedTime; - LOG.info("Finished " + getName() + " in " + elapsedTime + - "ms writing " + perClientRows + " rows"); - } catch (IOException e) { - throw new RuntimeException(e); - } + public Long call() throws Exception { + TestOptions threadOpts = new TestOptions(opts); + threadOpts.startRow = index * threadOpts.perClientRunRows; + long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() { + public void setStatus(final String msg) throws IOException { + LOG.info("client-" + Thread.currentThread().getName() + " " + msg); + } + }); + LOG.info("Finished " + Thread.currentThread().getName() + " in " + elapsedTime + + "ms over " + threadOpts.perClientRunRows + " rows"); + return elapsedTime; } - }; - threads.add(t); + }); } - for (Thread t: threads) { - t.start(); - } - for (Thread t: threads) { - while(t.isAlive()) { - try { - t.join(); - } catch (InterruptedException e) { - LOG.debug("Interrupted, continuing" + e.toString()); - } + pool.shutdown(); + for (int i = 0; i < threads.length; i++) { + try { + timings[i] = threads[i].get(); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); } } final String test = cmd.getSimpleName(); @@ -703,13 +350,13 @@ public class PerformanceEvaluation extends Configured implements Tool { + Arrays.toString(timings)); Arrays.sort(timings); long total = 0; - for (int i = 0; i < this.N; i++) { + for (int i = 0; i < timings.length; i++) { total += timings[i]; } LOG.info("[" + test + "]" + "\tMin: " + timings[0] + "ms" - + "\tMax: " + timings[this.N - 1] + "ms" - + "\tAvg: " + (total / this.N) + "ms"); + + "\tMax: " + timings[timings.length - 1] + "ms" + + "\tAvg: " + (total / timings.length) + "ms"); } /* @@ -719,18 +366,20 @@ public class PerformanceEvaluation extends Configured implements Tool { * @param cmd Command to run. * @throws IOException */ - private void doMapReduce(final Class cmd) throws IOException, + private void doMapReduce(final Class cmd, TestOptions opts) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); - Path inputDir = writeInputFile(conf); + Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); Job job = new Job(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); - job.setInputFormatClass(PeInputFormat.class); - PeInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(NLineInputFormat.class); + NLineInputFormat.setInputPaths(job, inputDir); + // this is default, but be explicit about it just in case. + NLineInputFormat.setNumLinesPerSplit(job, 1); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); @@ -744,9 +393,9 @@ public class PerformanceEvaluation extends Configured implements Tool { TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); TableMapReduceUtil.addDependencyJars(job); - // Add a Class from the hbase.jar so it gets registered too. TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - org.apache.hadoop.hbase.util.Bytes.class); + DescriptiveStatistics.class, // commons-math + ObjectMapper.class); // jackson-mapper-asl TableMapReduceUtil.initCredentials(job); @@ -759,7 +408,7 @@ public class PerformanceEvaluation extends Configured implements Tool { * @return Directory that contains file written. * @throws IOException */ - private Path writeInputFile(final Configuration c) throws IOException { + private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); Path inputDir = new Path(jobdir, "inputs"); @@ -772,22 +421,16 @@ public class PerformanceEvaluation extends Configured implements Tool { // Make input random. Map m = new TreeMap(); Hash h = MurmurHash.getInstance(); - int perClientRows = (this.R / this.N); + int perClientRows = (opts.totalRows / opts.numClientThreads); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SORT_PROPERTIES_ALPHABETICALLY, true); try { for (int i = 0; i < 10; i++) { - for (int j = 0; j < N; j++) { - String s = "tableName=" + this.tableName + - ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + - ", perClientRunRows=" + (perClientRows / 10) + - ", totalRows=" + this.R + - ", sampleRate=" + this.sampleRate + - ", clients=" + this.N + - ", flushCommits=" + this.flushCommits + - ", writeToWAL=" + this.writeToWAL + - ", useTags=" + this.useTags + - ", noOfTags=" + this.noOfTags + - ", reportLatency=" + this.reportLatency + - ", multiGet=" + this.multiGet; + for (int j = 0; j < opts.numClientThreads; j++) { + TestOptions next = new TestOptions(opts); + next.startRow = (j * perClientRows) + (i * (perClientRows/10)); + next.perClientRunRows = perClientRows / 10; + String s = mapper.writeValueAsString(next); int hash = h.hash(Bytes.toBytes(s)); m.put(hash, s); } @@ -829,95 +472,50 @@ public class PerformanceEvaluation extends Configured implements Tool { } /** - * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test - * tests}. This makes the reflection logic a little easier to understand... + * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. + * This makes tracking all these arguments a little easier. */ static class TestOptions { - private int startRow; - private int perClientRunRows; - private int totalRows; - private float sampleRate; - private int numClientThreads; - private TableName tableName; - private boolean flushCommits; - private boolean writeToWAL = true; - private boolean useTags = false; - private int noOfTags = 0; - private boolean reportLatency; - private int multiGet = 0; - private HConnection connection; - TestOptions() {} + public TestOptions() {} - TestOptions(int startRow, int perClientRunRows, int totalRows, float sampleRate, - int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, - boolean useTags, int noOfTags, boolean reportLatency, int multiGet, - HConnection connection) { - this.startRow = startRow; - this.perClientRunRows = perClientRunRows; - this.totalRows = totalRows; - this.sampleRate = sampleRate; - this.numClientThreads = numClientThreads; - this.tableName = tableName; - this.flushCommits = flushCommits; - this.writeToWAL = writeToWAL; - this.useTags = useTags; - this.noOfTags = noOfTags; - this.reportLatency = reportLatency; - this.multiGet = multiGet; - this.connection = connection; + public TestOptions(TestOptions that) { + this.nomapred = that.nomapred; + this.startRow = that.startRow; + this.perClientRunRows = that.perClientRunRows; + this.numClientThreads = that.numClientThreads; + this.totalRows = that.totalRows; + this.sampleRate = that.sampleRate; + this.tableName = that.tableName; + this.flushCommits = that.flushCommits; + this.writeToWAL = that.writeToWAL; + this.useTags = that.useTags; + this.noOfTags = that.noOfTags; + this.reportLatency = that.reportLatency; + this.multiGet = that.multiGet; + this.inMemoryCF = that.inMemoryCF; + this.presplitRegions = that.presplitRegions; + this.compression = that.compression; + this.blockEncoding = that.blockEncoding; } - public int getStartRow() { - return startRow; - } - - public int getPerClientRunRows() { - return perClientRunRows; - } - - public int getTotalRows() { - return totalRows; - } - - public float getSampleRate() { - return sampleRate; - } - - public int getNumClientThreads() { - return numClientThreads; - } - - public TableName getTableName() { - return tableName; - } - - public boolean isFlushCommits() { - return flushCommits; - } - - public boolean isWriteToWAL() { - return writeToWAL; - } - - public boolean isReportLatency() { - return reportLatency; - } - - public int getMultiGet() { - return multiGet; - } - - public HConnection getConnection() { - return connection; - } - - public boolean isUseTags() { - return this.useTags; - } - public int getNumTags() { - return this.noOfTags; - } + public boolean nomapred = false; + public int startRow = 0; + public int perClientRunRows = ROWS_PER_GB; + public int numClientThreads = 1; + public int totalRows = ROWS_PER_GB; + public float sampleRate = 1.0f; + public String tableName = TABLE_NAME; + public boolean flushCommits = true; + public boolean writeToWAL = true; + public boolean useTags = false; + public int noOfTags = 1; + public boolean reportLatency = false; + public int multiGet = 0; + boolean inMemoryCF = false; + int presplitRegions = 0; + public Compression.Algorithm compression = Compression.Algorithm.NONE; + public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; } /* @@ -927,48 +525,26 @@ public class PerformanceEvaluation extends Configured implements Tool { static abstract class Test { // Below is make it so when Tests are all running in the one // jvm, that they each have a differently seeded Random. - private static final Random randomSeed = - new Random(System.currentTimeMillis()); + private static final Random randomSeed = new Random(System.currentTimeMillis()); private static long nextRandomSeed() { return randomSeed.nextLong(); } protected final Random rand = new Random(nextRandomSeed()); + protected final Configuration conf; + protected final TestOptions opts; - protected final int startRow; - protected final int perClientRunRows; - protected final int totalRows; - protected final float sampleRate; private final Status status; - protected TableName tableName; - protected HTableInterface table; - protected volatile Configuration conf; - protected boolean flushCommits; - protected boolean writeToWAL; - protected boolean useTags; - protected int noOfTags; - protected boolean reportLatency; protected HConnection connection; + protected HTableInterface table; /** * Note that all subclasses of this class must provide a public contructor * that has the exact same list of arguments. */ Test(final Configuration conf, final TestOptions options, final Status status) { - super(); - this.startRow = options.getStartRow(); - this.perClientRunRows = options.getPerClientRunRows(); - this.totalRows = options.getTotalRows(); - this.sampleRate = options.getSampleRate(); - this.status = status; - this.tableName = options.getTableName(); - this.table = null; this.conf = conf; - this.flushCommits = options.isFlushCommits(); - this.writeToWAL = options.isWriteToWAL(); - this.useTags = options.isUseTags(); - this.noOfTags = options.getNumTags(); - this.reportLatency = options.isReportLatency(); - this.connection = options.getConnection(); + this.opts = options; + this.status = status; } private String generateStatus(final int sr, final int i, final int lr) { @@ -976,20 +552,22 @@ public class PerformanceEvaluation extends Configured implements Tool { } protected int getReportingPeriod() { - int period = this.perClientRunRows / 10; - return period == 0 ? this.perClientRunRows : period; + int period = opts.perClientRunRows / 10; + return period == 0 ? opts.perClientRunRows : period; } void testSetup() throws IOException { - this.table = connection.getTable(tableName); + this.connection = HConnectionManager.createConnection(conf); + this.table = connection.getTable(opts.tableName); this.table.setAutoFlush(false, true); } void testTakedown() throws IOException { - if (flushCommits) { + if (opts.flushCommits) { this.table.flushCommits(); } table.close(); + connection.close(); } /* @@ -1013,12 +591,12 @@ public class PerformanceEvaluation extends Configured implements Tool { * Provides an extension point for tests that don't want a per row invocation. */ void testTimed() throws IOException { - int lastRow = this.startRow + this.perClientRunRows; + int lastRow = opts.startRow + opts.perClientRunRows; // Report on completion of 1/10th of total. - for (int i = this.startRow; i < lastRow; i++) { + for (int i = opts.startRow; i < lastRow; i++) { testRow(i); if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { - status.setStatus(generateStatus(this.startRow, i, lastRow)); + status.setStatus(generateStatus(opts.startRow, i, lastRow)); } } } @@ -1039,7 +617,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { - Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); + Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows)); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(new WhileMatchFilter(new PageFilter(120))); ResultScanner s = this.table.getScanner(scan); @@ -1049,8 +627,8 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override protected int getReportingPeriod() { - int period = this.perClientRunRows / 100; - return period == 0 ? this.perClientRunRows : period; + int period = opts.perClientRunRows / 100; + return period == 0 ? opts.perClientRunRows : period; } } @@ -1084,15 +662,15 @@ public class PerformanceEvaluation extends Configured implements Tool { protected abstract Pair getStartAndStopRow(); protected Pair generateStartAndStopRows(int maxRange) { - int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; + int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; int stop = start + maxRange; return new Pair(format(start), format(stop)); } @Override protected int getReportingPeriod() { - int period = this.perClientRunRows / 100; - return period == 0? this.perClientRunRows: period; + int period = opts.perClientRunRows / 100; + return period == 0? opts.perClientRunRows: period; } } @@ -1142,24 +720,20 @@ public class PerformanceEvaluation extends Configured implements Tool { static class RandomReadTest extends Test { private final int everyN; - private final boolean reportLatency; private final double[] times; - private final int multiGet; private ArrayList gets; int idx = 0; RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); - everyN = (int) (this.totalRows / (this.totalRows * this.sampleRate)); - LOG.info("Sampling 1 every " + everyN + " out of " + perClientRunRows + " total rows."); - this.reportLatency = options.isReportLatency(); - this.multiGet = options.getMultiGet(); - if (this.multiGet > 0) { - LOG.info("MultiGet enabled. Sending GETs in batches of " + this.multiGet + "."); - this.gets = new ArrayList(this.multiGet); + everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); + LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); + if (opts.multiGet > 0) { + LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); + this.gets = new ArrayList(opts.multiGet); } - if (this.reportLatency) { - this.times = new double[(int) Math.ceil(this.perClientRunRows * this.sampleRate / Math.max(1, this.multiGet))]; + if (opts.reportLatency) { + this.times = new double[(int) Math.ceil(opts.perClientRunRows * opts.sampleRate / Math.max(1, opts.multiGet))]; } else { this.times = null; } @@ -1168,14 +742,14 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { if (i % everyN == 0) { - Get get = new Get(getRandomRow(this.rand, this.totalRows)); + Get get = new Get(getRandomRow(this.rand, opts.totalRows)); get.addColumn(FAMILY_NAME, QUALIFIER_NAME); - if (this.multiGet > 0) { + if (opts.multiGet > 0) { this.gets.add(get); - if (this.gets.size() == this.multiGet) { + if (this.gets.size() == opts.multiGet) { long start = System.nanoTime(); this.table.get(this.gets); - if (this.reportLatency) { + if (opts.reportLatency) { times[idx++] = (System.nanoTime() - start) / 1e6; } this.gets.clear(); @@ -1183,7 +757,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } else { long start = System.nanoTime(); this.table.get(get); - if (this.reportLatency) { + if (opts.reportLatency) { times[idx++] = (System.nanoTime() - start) / 1e6; } } @@ -1192,8 +766,8 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override protected int getReportingPeriod() { - int period = this.perClientRunRows / 100; - return period == 0 ? this.perClientRunRows : period; + int period = opts.perClientRunRows / 100; + return period == 0 ? opts.perClientRunRows : period; } @Override @@ -1203,7 +777,7 @@ public class PerformanceEvaluation extends Configured implements Tool { this.gets.clear(); } super.testTakedown(); - if (this.reportLatency) { + if (opts.reportLatency) { Arrays.sort(times); DescriptiveStatistics ds = new DescriptiveStatistics(); for (double t : times) { @@ -1231,13 +805,13 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { - byte[] row = getRandomRow(this.rand, this.totalRows); + byte[] row = getRandomRow(this.rand, opts.totalRows); Put put = new Put(row); byte[] value = generateData(this.rand, VALUE_LENGTH); - if (useTags) { + if (opts.useTags) { byte[] tag = generateData(this.rand, TAG_LENGTH); - Tag[] tags = new Tag[noOfTags]; - for (int n = 0; n < noOfTags; n++) { + Tag[] tags = new Tag[opts.noOfTags]; + for (int n = 0; n < opts.noOfTags; n++) { Tag t = new Tag((byte) n, tag); tags[n] = t; } @@ -1247,7 +821,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } else { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } - put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); table.put(put); } } @@ -1272,7 +846,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { if (this.testScanner == null) { - Scan scan = new Scan(format(this.startRow)); + Scan scan = new Scan(format(opts.startRow)); scan.setCaching(30); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); this.testScanner = table.getScanner(scan); @@ -1305,10 +879,10 @@ public class PerformanceEvaluation extends Configured implements Tool { byte[] row = format(i); Put put = new Put(row); byte[] value = generateData(this.rand, VALUE_LENGTH); - if (useTags) { + if (opts.useTags) { byte[] tag = generateData(this.rand, TAG_LENGTH); - Tag[] tags = new Tag[noOfTags]; - for (int n = 0; n < noOfTags; n++) { + Tag[] tags = new Tag[opts.noOfTags]; + for (int n = 0; n < opts.noOfTags; n++) { Tag t = new Tag((byte) n, tag); tags[n] = t; } @@ -1318,7 +892,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } else { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } - put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); table.put(put); } } @@ -1421,26 +995,21 @@ public class PerformanceEvaluation extends Configured implements Tool { return format(random.nextInt(Integer.MAX_VALUE) % totalRows); } - long runOneClient(final Class cmd, final int startRow, - final int perClientRunRows, final int totalRows, final float sampleRate, - boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, - boolean reportLatency, int multiGet, HConnection connection, final Status status) - throws IOException { - status.setStatus("Start " + cmd + " at offset " + startRow + " for " + - perClientRunRows + " rows"); + static long runOneClient(final Class cmd, Configuration conf, TestOptions opts, + final Status status) + throws IOException { + status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " + + opts.perClientRunRows + " rows"); long totalElapsedTime = 0; - TestOptions options = new TestOptions(startRow, perClientRunRows, - totalRows, sampleRate, N, tableName, flushCommits, writeToWAL, useTags, noOfTags, - reportLatency, multiGet, connection); final Test t; try { - Constructor constructor = cmd.getDeclaredConstructor( - Configuration.class, TestOptions.class, Status.class); - t = constructor.newInstance(getConf(), options, status); + Constructor constructor = + cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class); + t = constructor.newInstance(conf, opts, status); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Invalid command class: " + - cmd.getName() + ". It does not provide a constructor as described by" + + cmd.getName() + ". It does not provide a constructor as described by " + "the javadoc comment. Available constructors are: " + Arrays.toString(cmd.getConstructors())); } catch (Exception e) { @@ -1449,41 +1018,24 @@ public class PerformanceEvaluation extends Configured implements Tool { totalElapsedTime = t.test(); status.setStatus("Finished " + cmd + " in " + totalElapsedTime + - "ms at offset " + startRow + " for " + perClientRunRows + " rows" + - " (" + calculateMbps((int)(perClientRunRows * sampleRate), totalElapsedTime) + ")"); + "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + + " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")"); return totalElapsedTime; } - private void runNIsOne(final Class cmd) throws IOException { - Status status = new Status() { - public void setStatus(String msg) throws IOException { - LOG.info(msg); - } - }; - + private void runTest(final Class cmd, TestOptions opts) throws IOException, + InterruptedException, ClassNotFoundException { HBaseAdmin admin = null; try { admin = new HBaseAdmin(getConf()); - checkTable(admin); - runOneClient(cmd, 0, this.R, this.R, this.sampleRate, this.flushCommits, - this.writeToWAL, this.useTags, this.noOfTags, this.reportLatency, this.multiGet, - this.connection, status); - } catch (Exception e) { - LOG.error("Failed", e); + checkTable(admin, opts); } finally { if (admin != null) admin.close(); } - } - - private void runTest(final Class cmd) throws IOException, - InterruptedException, ClassNotFoundException { - if (N == 1) { - // If there is only one client and one HRegionServer, we assume nothing - // has been set up at all. - runNIsOne(cmd); + if (opts.nomapred) { + doLocalClients(cmd, opts); } else { - // Else, run - runNIsMoreThanOne(cmd); + doMapReduce(cmd, opts); } } @@ -1543,16 +1095,15 @@ public class PerformanceEvaluation extends Configured implements Tool { + " sequentialWrite 1"); } - private void getArgs(final int start, final String[] args) { + private static int getNumClients(final int start, final String[] args) { if(start + 1 > args.length) { throw new IllegalArgumentException("must supply the number of clients"); } - N = Integer.parseInt(args[start]); + int N = Integer.parseInt(args[start]); if (N < 1) { throw new IllegalArgumentException("Number of clients must be > 1"); } - // Set total number of rows to write. - this.R = this.R * N; + return N; } public int run(String[] args) throws Exception { @@ -1570,7 +1121,9 @@ public class PerformanceEvaluation extends Configured implements Tool { // input, take a look at writeInputFile(). // Then you must adapt the LINE_PATTERN input regex, // and parse the argument, take a look at PEInputFormat.getSplits(). - + + TestOptions opts = new TestOptions(); + for (int i = 0; i < args.length; i++) { String cmd = args[i]; if (cmd.equals("-h") || cmd.startsWith("--h")) { @@ -1581,94 +1134,94 @@ public class PerformanceEvaluation extends Configured implements Tool { final String nmr = "--nomapred"; if (cmd.startsWith(nmr)) { - this.nomapred = true; + opts.nomapred = true; continue; } final String rows = "--rows="; if (cmd.startsWith(rows)) { - this.R = Integer.parseInt(cmd.substring(rows.length())); + opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); continue; } final String sampleRate = "--sampleRate="; if (cmd.startsWith(sampleRate)) { - this.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); + opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); continue; } final String table = "--table="; if (cmd.startsWith(table)) { - this.tableName = TableName.valueOf(cmd.substring(table.length())); + opts.tableName = cmd.substring(table.length()); continue; } final String compress = "--compress="; if (cmd.startsWith(compress)) { - this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); + opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); continue; } final String blockEncoding = "--blockEncoding="; if (cmd.startsWith(blockEncoding)) { - this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); + opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); continue; } final String flushCommits = "--flushCommits="; if (cmd.startsWith(flushCommits)) { - this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); + opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); continue; } final String writeToWAL = "--writeToWAL="; if (cmd.startsWith(writeToWAL)) { - this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); + opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); continue; } final String presplit = "--presplit="; if (cmd.startsWith(presplit)) { - this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); + opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); continue; } final String inMemory = "--inmemory="; if (cmd.startsWith(inMemory)) { - this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); + opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); continue; } final String latency = "--latency"; if (cmd.startsWith(latency)) { - this.reportLatency = true; + opts.reportLatency = true; continue; } final String multiGet = "--multiGet="; if (cmd.startsWith(multiGet)) { - this.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); + opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); continue; } - this.connection = HConnectionManager.createConnection(getConf()); - final String useTags = "--usetags="; if (cmd.startsWith(useTags)) { - this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); + opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); continue; } final String noOfTags = "--nooftags="; if (cmd.startsWith(noOfTags)) { - this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); + opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); continue; } Class cmdClass = determineCommandClass(cmd); if (cmdClass != null) { - getArgs(i + 1, args); - runTest(cmdClass); + opts.numClientThreads = getNumClients(i + 1, args); + // number of rows specified + opts.totalRows = opts.perClientRunRows * opts.numClientThreads; + runTest(cmdClass, opts); errCode = 0; break; }