diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java index 4a5b3ae1498..ad3e7b91c08 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java @@ -22,21 +22,22 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; +import java.lang.reflect.Constructor; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; -import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.lang.reflect.Constructor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,21 +46,28 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.WhileMatchFilter; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.rest.client.Client; import org.apache.hadoop.hbase.rest.client.Cluster; import org.apache.hadoop.hbase.rest.client.RemoteAdmin; -import org.apache.hadoop.hbase.rest.client.RemoteHTable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.hbase.util.MurmurHash; @@ -78,6 +86,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.util.LineReader; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; /** * Script used evaluating Stargate performance and scalability. Runs a SG @@ -86,32 +96,30 @@ import org.apache.hadoop.util.LineReader; * command-line which test to run and how many clients are participating in * this experiment. Run java PerformanceEvaluation --help to * obtain usage. - * + * *

This class sets up and runs the evaluation programs described in * Section 7, Performance Evaluation, of the Bigtable * paper, pages 8-10. - * + * *

If number of clients > 1, we start up a MapReduce job. Each map task * runs an individual client. Each client does about 1GB of data. */ -public class PerformanceEvaluation { +public class PerformanceEvaluation extends Configured implements Tool { protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); - + + private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; private static final int ROW_LENGTH = 1000; + private static final int TAG_LENGTH = 256; private static final int ONE_GB = 1024 * 1024 * 1000; private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; - - public static final byte [] TABLE_NAME = Bytes.toBytes("TestTable"); + + public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); public static final byte [] FAMILY_NAME = Bytes.toBytes("info"); public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); + private TableName tableName = TABLE_NAME; - protected static final HTableDescriptor TABLE_DESCRIPTOR; - static { - TABLE_DESCRIPTOR = new HTableDescriptor(TableName.valueOf(TABLE_NAME)); - TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME)); - } - + protected HTableDescriptor TABLE_DESCRIPTOR; protected Map commands = new TreeMap(); protected static Cluster cluster = new Cluster(); @@ -119,18 +127,30 @@ public class PerformanceEvaluation { private boolean nomapred = false; private int N = 1; private int R = ROWS_PER_GB; - private int B = 100; + private Compression.Algorithm compression = Compression.Algorithm.NONE; + private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; + private boolean flushCommits = true; + private boolean writeToWAL = true; + private boolean inMemoryCF = false; + private int presplitRegions = 0; + private boolean useTags = false; + private int noOfTags = 1; + private HConnection connection; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); /** * Regex to parse lines in input file passed to mapreduce task. */ public static final Pattern LINE_PATTERN = - Pattern.compile("startRow=(\\d+),\\s+" + - "perClientRunRows=(\\d+),\\s+" + - "totalRows=(\\d+),\\s+" + - "clients=(\\d+),\\s+" + - "rowsPerPut=(\\d+)"); + Pattern.compile("tableName=(\\w+),\\s+" + + "startRow=(\\d+),\\s+" + + "perClientRunRows=(\\d+),\\s+" + + "totalRows=(\\d+),\\s+" + + "clients=(\\d+),\\s+" + + "flushCommits=(\\w+),\\s+" + + "writeToWAL=(\\w+),\\s+" + + "useTags=(\\w+),\\s+" + + "noOfTags=(\\d+)"); /** * Enum for map metrics. Keep it out here rather than inside in the Map @@ -173,13 +193,13 @@ public class PerformanceEvaluation { "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)"); } - protected void addCommandDescriptor(Class cmdClass, + protected void addCommandDescriptor(Class cmdClass, String name, String description) { - CmdDescriptor cmdDescriptor = + CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); commands.put(name, cmdDescriptor); } - + /** * Implementations can have their status set. */ @@ -191,7 +211,7 @@ public class PerformanceEvaluation { */ void setStatus(final String msg) throws IOException; } - + /** * This class works as the InputSplit of Performance Evaluation * MapReduce InputFormat, and the Record Value of RecordReader. @@ -199,81 +219,113 @@ public class PerformanceEvaluation { * the record value is the PeInputSplit itself. */ public static class PeInputSplit extends InputSplit implements Writable { + private TableName tableName = TABLE_NAME; private int startRow = 0; private int rows = 0; private int totalRows = 0; private int clients = 0; - private int rowsPerPut = 1; + private boolean flushCommits = false; + private boolean writeToWAL = true; + private boolean useTags = false; + private int noOfTags = 0; public PeInputSplit() { - this.startRow = 0; - this.rows = 0; - this.totalRows = 0; - this.clients = 0; - this.rowsPerPut = 1; } - public PeInputSplit(int startRow, int rows, int totalRows, int clients, - int rowsPerPut) { + public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, + boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) { + this.tableName = tableName; this.startRow = startRow; this.rows = rows; this.totalRows = totalRows; this.clients = clients; - this.rowsPerPut = 1; + this.flushCommits = flushCommits; + this.writeToWAL = writeToWAL; + this.useTags = useTags; + this.noOfTags = noOfTags; } - + @Override public void readFields(DataInput in) throws IOException { + int tableNameLen = in.readInt(); + byte[] name = new byte[tableNameLen]; + in.readFully(name); + this.tableName = TableName.valueOf(name); this.startRow = in.readInt(); this.rows = in.readInt(); this.totalRows = in.readInt(); this.clients = in.readInt(); - this.rowsPerPut = in.readInt(); + this.flushCommits = in.readBoolean(); + this.writeToWAL = in.readBoolean(); + this.useTags = in.readBoolean(); + this.noOfTags = in.readInt(); } @Override public void write(DataOutput out) throws IOException { + byte[] name = this.tableName.toBytes(); + out.writeInt(name.length); + out.write(name); out.writeInt(startRow); out.writeInt(rows); out.writeInt(totalRows); out.writeInt(clients); - out.writeInt(rowsPerPut); + out.writeBoolean(flushCommits); + out.writeBoolean(writeToWAL); + out.writeBoolean(useTags); + out.writeInt(noOfTags); } - + @Override public long getLength() throws IOException, InterruptedException { return 0; } - + @Override public String[] getLocations() throws IOException, InterruptedException { return new String[0]; } - + public int getStartRow() { return startRow; } - + + public TableName getTableName() { + return tableName; + } + public int getRows() { return rows; } - + public int getTotalRows() { return totalRows; } - + public int getClients() { return clients; } - public int getRowsPerPut() { - return rowsPerPut; + public boolean isFlushCommits() { + return flushCommits; + } + + public boolean isWriteToWAL() { + return writeToWAL; + } + + public boolean isUseTags() { + return useTags; + } + + public int getNoOfTags() { + return noOfTags; } } /** * InputFormat of Performance Evaluation MapReduce job. - * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). + * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). */ public static class PeInputFormat extends FileInputFormat { @@ -281,8 +333,11 @@ public class PerformanceEvaluation { public List getSplits(JobContext job) throws IOException { // generate splits List splitList = new ArrayList(); - + for (FileStatus file: listStatus(job)) { + if (file.isDir()) { + continue; + } Path path = file.getPath(); FileSystem fs = path.getFileSystem(job.getConfiguration()); FSDataInputStream fileIn = fs.open(path); @@ -296,73 +351,82 @@ public class PerformanceEvaluation { } Matcher m = LINE_PATTERN.matcher(lineText.toString()); if((m != null) && m.matches()) { - int startRow = Integer.parseInt(m.group(1)); - int rows = Integer.parseInt(m.group(2)); - int totalRows = Integer.parseInt(m.group(3)); - int clients = Integer.parseInt(m.group(4)); - int rowsPerPut = Integer.parseInt(m.group(5)); + TableName tableName = TableName.valueOf(m.group(1)); + int startRow = Integer.parseInt(m.group(2)); + int rows = Integer.parseInt(m.group(3)); + int totalRows = Integer.parseInt(m.group(4)); + int clients = Integer.parseInt(m.group(5)); + boolean flushCommits = Boolean.parseBoolean(m.group(6)); + boolean writeToWAL = Boolean.parseBoolean(m.group(7)); + boolean useTags = Boolean.parseBoolean(m.group(8)); + int noOfTags = Integer.parseInt(m.group(9)); - LOG.debug("split["+ splitList.size() + "] " + - " startRow=" + startRow + - " rows=" + rows + - " totalRows=" + totalRows + - " clients=" + clients + - " rowsPerPut=" + rowsPerPut); + LOG.debug("tableName=" + tableName + + " split["+ splitList.size() + "] " + + " startRow=" + startRow + + " rows=" + rows + + " totalRows=" + totalRows + + " clients=" + clients + + " flushCommits=" + flushCommits + + " writeToWAL=" + writeToWAL + + " useTags=" + useTags + + " noOfTags=" + noOfTags); PeInputSplit newSplit = - new PeInputSplit(startRow, rows, totalRows, clients, rowsPerPut); + new PeInputSplit(tableName, startRow, rows, totalRows, clients, + flushCommits, writeToWAL, useTags, noOfTags); splitList.add(newSplit); } } in.close(); } - + LOG.info("Total # of splits: " + splitList.size()); return splitList; } - + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new PeRecordReader(); } - + public static class PeRecordReader extends RecordReader { private boolean readOver = false; private PeInputSplit split = null; private NullWritable key = null; private PeInputSplit value = null; - + @Override - public void initialize(InputSplit split, TaskAttemptContext context) + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.readOver = false; this.split = (PeInputSplit)split; } - + @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(readOver) { return false; } - + key = NullWritable.get(); value = (PeInputSplit)split; - + readOver = true; return true; } - + @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return key; } - + @Override public PeInputSplit getCurrentValue() throws IOException, InterruptedException { return value; } - + @Override public float getProgress() throws IOException, InterruptedException { if(readOver) { @@ -371,18 +435,18 @@ public class PerformanceEvaluation { return 0.0f; } } - + @Override public void close() throws IOException { // do nothing } } } - + /** * MapReduce job that runs a performance evaluation client in each map task. */ - public static class EvaluationMapTask + public static class EvaluationMapTask extends Mapper { /** configuration parameter name that contains the command */ @@ -419,18 +483,22 @@ public class PerformanceEvaluation { return clazz; } - protected void map(NullWritable key, PeInputSplit value, final Context context) + protected void map(NullWritable key, PeInputSplit value, final Context context) throws IOException, InterruptedException { - + Status status = new Status() { public void setStatus(String msg) { - context.setStatus(msg); + context.setStatus(msg); } }; - + // Evaluation task + pe.tableName = value.getTableName(); long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), - value.getRows(), value.getTotalRows(), value.getRowsPerPut(), status); + value.getRows(), value.getTotalRows(), + value.isFlushCommits(), value.isWriteToWAL(), + value.isUseTags(), value.getNoOfTags(), + HConnectionManager.createConnection(context.getConfiguration()), status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -439,27 +507,71 @@ public class PerformanceEvaluation { context.progress(); } } - + /* * If table does not already exist, create. * @param c Client to use checking. * @return True if we created the table. * @throws IOException */ - private boolean checkTable() throws IOException { + private boolean checkTable(RemoteAdmin admin) throws IOException { HTableDescriptor tableDescriptor = getTableDescriptor(); - RemoteAdmin admin = new RemoteAdmin(new Client(cluster), conf); - if (!admin.isTableAvailable(tableDescriptor.getTableName().getName())) { + if (this.presplitRegions > 0) { + // presplit requested + if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) { + admin.deleteTable(tableDescriptor.getTableName().getName()); + } + + byte[][] splits = getSplits(); + for (int i=0; i < splits.length; i++) { + LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); + } admin.createTable(tableDescriptor); - return true; + LOG.info ("Table created with " + this.presplitRegions + " splits"); + } else { + boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); + if (!tableExists) { + admin.createTable(tableDescriptor); + LOG.info("Table " + tableDescriptor + " created"); + } } - return false; + boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); + return tableExists; } protected HTableDescriptor getTableDescriptor() { + if (TABLE_DESCRIPTOR == null) { + TABLE_DESCRIPTOR = new HTableDescriptor(tableName); + HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); + family.setDataBlockEncoding(blockEncoding); + family.setCompressionType(compression); + if (inMemoryCF) { + family.setInMemory(true); + } + TABLE_DESCRIPTOR.addFamily(family); + } return TABLE_DESCRIPTOR; } + /** + * Generates splits based on total number of rows and specified split regions + * + * @return splits : array of byte [] + */ + protected byte[][] getSplits() { + if (this.presplitRegions == 0) + return new byte [0][]; + + int numSplitPoints = presplitRegions - 1; + byte[][] splits = new byte[numSplitPoints][]; + int jump = this.R / this.presplitRegions; + for (int i=0; i < numSplitPoints; i++) { + int rowkey = jump * (1 + i); + splits[i] = format(rowkey); + } + return splits; + } + /* * We're to run multiple clients concurrently. Setup a mapreduce job. Run * one map per client. Then run a single reduce to sum the elapsed times. @@ -468,36 +580,59 @@ public class PerformanceEvaluation { */ private void runNIsMoreThanOne(final Class cmd) throws IOException, InterruptedException, ClassNotFoundException { - checkTable(); + RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf()); + checkTable(remoteAdmin); if (nomapred) { doMultipleClients(cmd); } else { doMapReduce(cmd); } } - + /* * Run all clients in this vm each to its own thread. * @param cmd Command to run. * @throws IOException */ private void doMultipleClients(final Class cmd) throws IOException { - final List threads = new ArrayList(N); + final List threads = new ArrayList(this.N); + final long[] timings = new long[this.N]; final int perClientRows = R/N; - for (int i = 0; i < N; i++) { - Thread t = new Thread (Integer.toString(i)) { + final TableName tableName = this.tableName; + final DataBlockEncoding encoding = this.blockEncoding; + final boolean flushCommits = this.flushCommits; + final Compression.Algorithm compression = this.compression; + final boolean writeToWal = this.writeToWAL; + final int preSplitRegions = this.presplitRegions; + final boolean useTags = this.useTags; + final int numTags = this.noOfTags; + final HConnection connection = HConnectionManager.createConnection(getConf()); + for (int i = 0; i < this.N; i++) { + final int index = i; + Thread t = new Thread ("TestClient-" + i) { @Override public void run() { super.run(); - PerformanceEvaluation pe = new PerformanceEvaluation(conf); - int index = Integer.parseInt(getName()); + PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); + pe.tableName = tableName; + pe.blockEncoding = encoding; + pe.flushCommits = flushCommits; + pe.compression = compression; + pe.writeToWAL = writeToWal; + pe.presplitRegions = preSplitRegions; + pe.N = N; + pe.connection = connection; + pe.useTags = useTags; + pe.noOfTags = numTags; try { long elapsedTime = pe.runOneClient(cmd, index * perClientRows, - perClientRows, R, B, new Status() { - public void setStatus(final String msg) throws IOException { - LOG.info("client-" + getName() + " " + msg); - } - }); + perClientRows, R, + flushCommits, writeToWAL, useTags, noOfTags, connection, new Status() { + public void setStatus(final String msg) throws IOException { + LOG.info("client-" + getName() + " " + msg); + } + }); + timings[index] = elapsedTime; LOG.info("Finished " + getName() + " in " + elapsedTime + "ms writing " + perClientRows + " rows"); } catch (IOException e) { @@ -519,8 +654,20 @@ public class PerformanceEvaluation { } } } + final String test = cmd.getSimpleName(); + LOG.info("[" + test + "] Summary of timings (ms): " + + Arrays.toString(timings)); + Arrays.sort(timings); + long total = 0; + for (int i = 0; i < this.N; i++) { + total += timings[i]; + } + LOG.info("[" + test + "]" + + "\tMin: " + timings[0] + "ms" + + "\tMax: " + timings[this.N - 1] + "ms" + + "\tAvg: " + (total / this.N) + "ms"); } - + /* * Run a mapreduce job. Run as many maps as asked-for clients. * Before we start up the job, write out an input file with instruction @@ -530,30 +677,31 @@ public class PerformanceEvaluation { */ private void doMapReduce(final Class cmd) throws IOException, InterruptedException, ClassNotFoundException { - Path inputDir = writeInputFile(this.conf); - this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); - this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); - Job job = new Job(this.conf); + Configuration conf = getConf(); + Path inputDir = writeInputFile(conf); + conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); + conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); + Job job = new Job(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); - + job.setInputFormatClass(PeInputFormat.class); PeInputFormat.setInputPaths(job, inputDir); - + job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); - + job.setMapperClass(EvaluationMapTask.class); job.setReducerClass(LongSumReducer.class); - job.setNumReduceTasks(1); - + job.setOutputFormatClass(TextOutputFormat.class); - TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs")); - + TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); job.waitForCompletion(true); } - + /* * Write input file of offsets-per-client for the mapreduce job. * @param c Configuration @@ -561,27 +709,30 @@ public class PerformanceEvaluation { * @throws IOException */ private Path writeInputFile(final Configuration c) throws IOException { - FileSystem fs = FileSystem.get(c); - if (!fs.exists(PERF_EVAL_DIR)) { - fs.mkdirs(PERF_EVAL_DIR); - } SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); - Path subdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); - fs.mkdirs(subdir); - Path inputFile = new Path(subdir, "input.txt"); + Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); + Path inputDir = new Path(jobdir, "inputs"); + + FileSystem fs = FileSystem.get(c); + fs.mkdirs(inputDir); + Path inputFile = new Path(inputDir, "input.txt"); PrintStream out = new PrintStream(fs.create(inputFile)); // Make input random. Map m = new TreeMap(); Hash h = MurmurHash.getInstance(); - int perClientRows = (R / N); + int perClientRows = (this.R / this.N); try { for (int i = 0; i < 10; i++) { for (int j = 0; j < N; j++) { - String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + + String s = "tableName=" + this.tableName + + ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + ", perClientRunRows=" + (perClientRows / 10) + - ", totalRows=" + R + - ", clients=" + N + - ", rowsPerPut=" + B; + ", totalRows=" + this.R + + ", clients=" + this.N + + ", flushCommits=" + this.flushCommits + + ", writeToWAL=" + this.writeToWAL + + ", useTags=" + this.useTags + + ", noOfTags=" + this.noOfTags; int hash = h.hash(Bytes.toBytes(s)); m.put(hash, s); } @@ -592,7 +743,7 @@ public class PerformanceEvaluation { } finally { out.close(); } - return subdir; + return inputDir; } /** @@ -630,18 +781,30 @@ public class PerformanceEvaluation { private int startRow; private int perClientRunRows; private int totalRows; - private byte[] tableName; - private int rowsPerPut; + private int numClientThreads; + private TableName tableName; + private boolean flushCommits; + private boolean writeToWAL = true; + private boolean useTags = false; + private int noOfTags = 0; + private HConnection connection; TestOptions() { } - TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, int rowsPerPut) { + TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, + TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, + int noOfTags, HConnection connection) { this.startRow = startRow; this.perClientRunRows = perClientRunRows; this.totalRows = totalRows; + this.numClientThreads = numClientThreads; this.tableName = tableName; - this.rowsPerPut = rowsPerPut; + this.flushCommits = flushCommits; + this.writeToWAL = writeToWAL; + this.useTags = useTags; + this.noOfTags = noOfTags; + this.connection = connection; } public int getStartRow() { @@ -656,12 +819,32 @@ public class PerformanceEvaluation { return totalRows; } - public byte[] getTableName() { + public int getNumClientThreads() { + return numClientThreads; + } + + public TableName getTableName() { return tableName; } - public int getRowsPerPut() { - return rowsPerPut; + public boolean isFlushCommits() { + return flushCommits; + } + + public boolean isWriteToWAL() { + return writeToWAL; + } + + public HConnection getConnection() { + return connection; + } + + public boolean isUseTags() { + return this.useTags; + } + + public int getNumTags() { + return this.noOfTags; } } @@ -671,7 +854,7 @@ public class PerformanceEvaluation { */ static abstract class Test { // Below is make it so when Tests are all running in the one - // jvm, that they each have a differently seeded Random. + // jvm, that they each have a differently seeded Random. private static final Random randomSeed = new Random(System.currentTimeMillis()); private static long nextRandomSeed() { @@ -682,10 +865,15 @@ public class PerformanceEvaluation { protected final int startRow; protected final int perClientRunRows; protected final int totalRows; - protected final Status status; - protected byte[] tableName; - protected RemoteHTable table; + private final Status status; + protected TableName tableName; + protected HTableInterface table; protected volatile Configuration conf; + protected boolean flushCommits; + protected boolean writeToWAL; + protected boolean useTags; + protected int noOfTags; + protected HConnection connection; /** * Note that all subclasses of this class must provide a public contructor @@ -700,41 +888,49 @@ public class PerformanceEvaluation { this.tableName = options.getTableName(); this.table = null; this.conf = conf; + this.flushCommits = options.isFlushCommits(); + this.writeToWAL = options.isWriteToWAL(); + this.useTags = options.isUseTags(); + this.noOfTags = options.getNumTags(); + this.connection = options.getConnection(); } - + protected String generateStatus(final int sr, final int i, final int lr) { return sr + "/" + i + "/" + lr; } - + protected int getReportingPeriod() { int period = this.perClientRunRows / 10; return period == 0? this.perClientRunRows: period; } - + void testSetup() throws IOException { - this.table = new RemoteHTable(new Client(cluster), conf, tableName); + this.table = connection.getTable(tableName); + this.table.setAutoFlush(false, true); } void testTakedown() throws IOException { - this.table.close(); + if (flushCommits) { + this.table.flushCommits(); + } + table.close(); } - + /* * Run test * @return Elapsed time. * @throws IOException */ long test() throws IOException { - long elapsedTime; testSetup(); - long startTime = System.currentTimeMillis(); + LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); + final long startTime = System.nanoTime(); try { testTimed(); - elapsedTime = System.currentTimeMillis() - startTime; } finally { testTakedown(); } - return elapsedTime; + return (System.nanoTime() - startTime) / 1000000; } /** @@ -755,8 +951,7 @@ public class PerformanceEvaluation { * Test for individual row. * @param i Row index. */ - void testRow(final int i) throws IOException { - } + abstract void testRow(final int i) throws IOException; } @SuppressWarnings("unused") @@ -771,13 +966,9 @@ public class PerformanceEvaluation { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(new WhileMatchFilter(new PageFilter(120))); ResultScanner s = this.table.getScanner(scan); - //int count = 0; - for (Result rr = null; (rr = s.next()) != null;) { - // LOG.info("" + count++ + " " + rr.toString()); - } s.close(); } - + @Override protected int getReportingPeriod() { int period = this.perClientRunRows / 100; @@ -890,48 +1081,40 @@ public class PerformanceEvaluation { } } - - static class RandomWriteTest extends Test { - int rowsPerPut; + static class RandomWriteTest extends Test { RandomWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); - rowsPerPut = options.getRowsPerPut(); } - + @Override - void testTimed() throws IOException { - int lastRow = this.startRow + this.perClientRunRows; - // Report on completion of 1/10th of total. - List puts = new ArrayList(); - for (int i = this.startRow; i < lastRow; i += rowsPerPut) { - for (int j = 0; j < rowsPerPut; j++) { - byte [] row = getRandomRow(this.rand, this.totalRows); - Put put = new Put(row); - byte[] value = generateValue(this.rand); - put.add(FAMILY_NAME, QUALIFIER_NAME, value); - puts.add(put); - if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { - status.setStatus(generateStatus(this.startRow, i, lastRow)); - } + void testRow(final int i) throws IOException { + byte[] row = getRandomRow(this.rand, this.totalRows); + Put put = new Put(row); + byte[] value = generateData(this.rand, ROW_LENGTH); + if (useTags) { + byte[] tag = generateData(this.rand, TAG_LENGTH); + Tag[] tags = new Tag[noOfTags]; + for (int n = 0; n < noOfTags; n++) { + Tag t = new Tag((byte) n, tag); + tags[n] = t; } - table.put(puts); + put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags); + } else { + put.add(FAMILY_NAME, QUALIFIER_NAME, value); } + put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + table.put(put); } } - + static class ScanTest extends Test { private ResultScanner testScanner; ScanTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } - - @Override - void testSetup() throws IOException { - super.testSetup(); - } - + @Override void testTakedown() throws IOException { if (this.testScanner != null) { @@ -939,8 +1122,8 @@ public class PerformanceEvaluation { } super.testTakedown(); } - - + + @Override void testRow(final int i) throws IOException { if (this.testScanner == null) { @@ -952,12 +1135,12 @@ public class PerformanceEvaluation { } } - + static class SequentialReadTest extends Test { SequentialReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } - + @Override void testRow(final int i) throws IOException { Get get = new Get(format(i)); @@ -966,32 +1149,30 @@ public class PerformanceEvaluation { } } - + static class SequentialWriteTest extends Test { - int rowsPerPut; SequentialWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); - rowsPerPut = options.getRowsPerPut(); } @Override - void testTimed() throws IOException { - int lastRow = this.startRow + this.perClientRunRows; - // Report on completion of 1/10th of total. - List puts = new ArrayList(); - for (int i = this.startRow; i < lastRow; i += rowsPerPut) { - for (int j = 0; j < rowsPerPut; j++) { - Put put = new Put(format(i + j)); - byte[] value = generateValue(this.rand); - put.add(FAMILY_NAME, QUALIFIER_NAME, value); - puts.add(put); - if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { - status.setStatus(generateStatus(this.startRow, i, lastRow)); - } + void testRow(final int i) throws IOException { + Put put = new Put(format(i)); + byte[] value = generateData(this.rand, ROW_LENGTH); + if (useTags) { + byte[] tag = generateData(this.rand, TAG_LENGTH); + Tag[] tags = new Tag[noOfTags]; + for (int n = 0; n < noOfTags; n++) { + Tag t = new Tag((byte) n, tag); + tags[n] = t; } - table.put(puts); + put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags); + } else { + put.add(FAMILY_NAME, QUALIFIER_NAME, value); } + put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + table.put(put); } } @@ -1027,7 +1208,7 @@ public class PerformanceEvaluation { return scan; } } - + /* * Format passed integer. * @param number @@ -1035,7 +1216,7 @@ public class PerformanceEvaluation { * number (Does absolute in case number is negative). */ public static byte [] format(final int number) { - byte [] b = new byte[10]; + byte [] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; int d = Math.abs(number); for (int i = b.length - 1; i >= 0; i--) { b[i] = (byte)((d % 10) + '0'); @@ -1043,34 +1224,51 @@ public class PerformanceEvaluation { } return b; } - - /* - * This method takes some time and is done inline uploading data. For - * example, doing the mapfile test, generation of the key and value - * consumes about 30% of CPU time. - * @return Generated random value to insert into a table cell. - */ + + public static byte[] generateData(final Random r, int length) { + byte [] b = new byte [length]; + int i = 0; + + for(i = 0; i < (length-8); i += 8) { + b[i] = (byte) (65 + r.nextInt(26)); + b[i+1] = b[i]; + b[i+2] = b[i]; + b[i+3] = b[i]; + b[i+4] = b[i]; + b[i+5] = b[i]; + b[i+6] = b[i]; + b[i+7] = b[i]; + } + + byte a = (byte) (65 + r.nextInt(26)); + for(; i < length; i++) { + b[i] = a; + } + return b; + } + public static byte[] generateValue(final Random r) { byte [] b = new byte [ROW_LENGTH]; r.nextBytes(b); return b; } - + static byte [] getRandomRow(final Random random, final int totalRows) { return format(random.nextInt(Integer.MAX_VALUE) % totalRows); } - + long runOneClient(final Class cmd, final int startRow, - final int perClientRunRows, final int totalRows, - final int rowsPerPut, final Status status) + final int perClientRunRows, final int totalRows, + boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, + HConnection connection, final Status status) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); long totalElapsedTime = 0; - Test t = null; TestOptions options = new TestOptions(startRow, perClientRunRows, - totalRows, getTableDescriptor().getTableName().getName(), rowsPerPut); + totalRows, N, tableName, flushCommits, writeToWAL, useTags, noOfTags, connection); + final Test t; try { Constructor constructor = cmd.getDeclaredConstructor( Configuration.class, TestOptions.class, Status.class); @@ -1089,7 +1287,7 @@ public class PerformanceEvaluation { "ms at offset " + startRow + " for " + perClientRunRows + " rows"); return totalElapsedTime; } - + private void runNIsOne(final Class cmd) { Status status = new Status() { public void setStatus(String msg) throws IOException { @@ -1097,12 +1295,16 @@ public class PerformanceEvaluation { } }; + RemoteAdmin admin = null; try { - checkTable(); - runOneClient(cmd, 0, R, R, B, status); + Client client = new Client(cluster); + admin = new RemoteAdmin(client, getConf()); + checkTable(admin); + runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, + this.useTags, this.noOfTags, this.connection, status); } catch (Exception e) { LOG.error("Failed", e); - } + } } private void runTest(final Class cmd) throws IOException, @@ -1112,7 +1314,7 @@ public class PerformanceEvaluation { // has been set up at all. runNIsOne(cmd); } else { - // Else, run + // Else, run runNIsMoreThanOne(cmd); } } @@ -1120,20 +1322,35 @@ public class PerformanceEvaluation { protected void printUsage() { printUsage(null); } - + protected void printUsage(final String message) { if (message != null && message.length() > 0) { System.err.println(message); } System.err.println("Usage: java " + this.getClass().getName() + " \\"); - System.err.println(" [--option] [--option=value] "); + System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); + System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] [-D]* "); System.err.println(); System.err.println("Options:"); - System.err.println(" host String. Specify Stargate endpoint."); - System.err.println(" rows Integer. Rows each client runs. Default: One million"); - System.err.println(" rowsPerPut Integer. Rows each Stargate (multi)Put. Default: 100"); - System.err.println(" nomapred (Flag) Run multiple clients using threads " + + System.err.println(" nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); + System.err.println(" rows Rows each client runs. Default: One million"); + System.err.println(" table Alternate table name. Default: 'TestTable'"); + System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); + System.err.println(" flushCommits Used to determine if the test should flush the table. Default: false"); + System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); + System.err.println(" presplit Create presplit table. Recommended for accurate perf analysis (see guide). Default: disabled"); + System.err + .println(" inmemory Tries to keep the HFiles of the CF inmemory as far as possible. Not " + + "guaranteed that reads are always served from inmemory. Default: false"); + System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. Default : false"); + System.err + .println(" numoftags Specify the no of tags that would be needed. This works only if usetags is true."); + System.err.println(); + System.err.println(" Note: -D properties will be applied to the conf used. "); + System.err.println(" For example: "); + System.err.println(" -Dmapred.output.compress=true"); + System.err.println(" -Dmapreduce.task.timeout=60000"); System.err.println(); System.err.println("Command:"); for (CmdDescriptor command : commands.values()) { @@ -1161,10 +1378,11 @@ public class PerformanceEvaluation { // Set total number of rows to write. R = R * N; } - - public int doCommandLine(final String[] args) { + + @Override + public int run(String[] args) throws Exception { // Process command-line args. TODO: Better cmd-line processing - // (but hopefully something not as painful as cli options). + // (but hopefully something not as painful as cli options). int errCode = -1; if (args.length < 1) { printUsage(); @@ -1174,27 +1392,77 @@ public class PerformanceEvaluation { try { for (int i = 0; i < args.length; i++) { String cmd = args[i]; - if (cmd.equals("-h")) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { printUsage(); errCode = 0; break; } - + final String nmr = "--nomapred"; if (cmd.startsWith(nmr)) { nomapred = true; continue; } - + final String rows = "--rows="; if (cmd.startsWith(rows)) { R = Integer.parseInt(cmd.substring(rows.length())); continue; } - final String rowsPerPut = "--rowsPerPut="; - if (cmd.startsWith(rowsPerPut)) { - this.B = Integer.parseInt(cmd.substring(rowsPerPut.length())); + final String table = "--table="; + if (cmd.startsWith(table)) { + this.tableName = TableName.valueOf(cmd.substring(table.length())); + continue; + } + + final String compress = "--compress="; + if (cmd.startsWith(compress)) { + this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); + continue; + } + + final String blockEncoding = "--blockEncoding="; + if (cmd.startsWith(blockEncoding)) { + this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); + continue; + } + + final String flushCommits = "--flushCommits="; + if (cmd.startsWith(flushCommits)) { + this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); + continue; + } + + final String writeToWAL = "--writeToWAL="; + if (cmd.startsWith(writeToWAL)) { + this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); + continue; + } + + final String presplit = "--presplit="; + if (cmd.startsWith(presplit)) { + this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); + continue; + } + + final String inMemory = "--inmemory="; + if (cmd.startsWith(inMemory)) { + this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); + continue; + } + + this.connection = HConnectionManager.createConnection(getConf()); + + final String useTags = "--usetags="; + if (cmd.startsWith(useTags)) { + this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); + continue; + } + + final String noOfTags = "--nooftags="; + if (cmd.startsWith(noOfTags)) { + this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); continue; } @@ -1219,14 +1487,14 @@ public class PerformanceEvaluation { errCode = 0; break; } - + printUsage(); break; } } catch (Exception e) { - e.printStackTrace(); + LOG.error("Failed", e); } - + return errCode; } @@ -1238,8 +1506,8 @@ public class PerformanceEvaluation { /** * @param args */ - public static void main(final String[] args) { - Configuration c = HBaseConfiguration.create(); - System.exit(new PerformanceEvaluation(c).doCommandLine(args)); + public static void main(final String[] args) throws Exception { + int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); + System.exit(res); } }