From 2a4532385784ad19cc2e3c4edb57f61ebdc46e05 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 4 Sep 2009 21:38:25 +0000 Subject: [PATCH] HBASE-1778 Improve PerformanceEvaluation git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@811559 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/PerformanceEvaluation.java | 291 ++++++++++++++---- 2 files changed, 238 insertions(+), 54 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9d02c0f6c68..6762055141a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ Release 0.21.0 - Unreleased HBASE-1798 [Regression] Unable to delete a row in the future HBASE-1790 filters are not working correctly (HBASE-1710 HBASE-1807 too) HBASE-1779 ThriftServer logged error if getVer() result is empty + HBASE-1778 Improve PerformanceEvaluation (Schubert Zhang via Stack) IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java b/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java index 964b2660f09..270567d62b4 100644 --- a/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; import java.text.SimpleDateFormat; @@ -35,6 +37,8 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Get; @@ -42,8 +46,8 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.util.Bytes; @@ -52,17 +56,19 @@ import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.hbase.util.MurmurHash; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; +import org.apache.hadoop.util.LineReader; /** @@ -94,7 +100,7 @@ public class PerformanceEvaluation implements HConstants { protected static final HTableDescriptor TABLE_DESCRIPTOR; static { TABLE_DESCRIPTOR = new HTableDescriptor("TestTable"); - TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(CATALOG_FAMILY)); + TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME)); } private static final String RANDOM_READ = "randomRead"; @@ -159,47 +165,215 @@ public class PerformanceEvaluation implements HConstants { void setStatus(final String msg) throws IOException; } + /** + * This class works as the InputSplit of Performance Evaluation + * MapReduce InputFormat, and the Record Value of RecordReader. + * Each map task will only read one record from a PeInputSplit, + * the record value is the PeInputSplit itself. + */ + public static class PeInputSplit extends InputSplit implements Writable { + private int startRow = 0; + private int rows = 0; + private int totalRows = 0; + private int clients = 0; + + public PeInputSplit() { + this.startRow = 0; + this.rows = 0; + this.totalRows = 0; + this.clients = 0; + } + + public PeInputSplit(int startRow, int rows, int totalRows, int clients) { + this.startRow = startRow; + this.rows = rows; + this.totalRows = totalRows; + this.clients = clients; + } + + @Override + public void readFields(DataInput in) throws IOException { + this.startRow = in.readInt(); + this.rows = in.readInt(); + this.totalRows = in.readInt(); + this.clients = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(startRow); + out.writeInt(rows); + out.writeInt(totalRows); + out.writeInt(clients); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; + } + + public int getStartRow() { + return startRow; + } + + public int getRows() { + return rows; + } + + public int getTotalRows() { + return totalRows; + } + + public int getClients() { + return clients; + } + } + + /** + * InputFormat of Performance Evaluation MapReduce job. + * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). + */ + public static class PeInputFormat extends FileInputFormat { + + @Override + public List getSplits(JobContext job) throws IOException { + // generate splits + List splitList = new ArrayList(); + + for (FileStatus file: listStatus(job)) { + Path path = file.getPath(); + FileSystem fs = path.getFileSystem(job.getConfiguration()); + FSDataInputStream fileIn = fs.open(path); + LineReader in = new LineReader(fileIn, job.getConfiguration()); + int lineLen = 0; + while(true) { + Text lineText = new Text(); + lineLen = in.readLine(lineText); + if(lineLen <= 0) { + break; + } + Matcher m = LINE_PATTERN.matcher(lineText.toString()); + if((m != null) && m.matches()) { + int startRow = Integer.parseInt(m.group(1)); + int rows = Integer.parseInt(m.group(2)); + int totalRows = Integer.parseInt(m.group(3)); + int clients = Integer.parseInt(m.group(4)); + + LOG.debug("split["+ splitList.size() + "] " + + " startRow=" + startRow + + " rows=" + rows + + " totalRows=" + totalRows + + " clients=" + clients); + + PeInputSplit newSplit = new PeInputSplit(startRow, rows, totalRows, clients); + splitList.add(newSplit); + } + } + in.close(); + } + + LOG.info("Total # of splits: " + splitList.size()); + return splitList; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) { + return new PeRecordReader(); + } + + public static class PeRecordReader extends RecordReader { + private boolean readOver = false; + private PeInputSplit split = null; + private NullWritable key = null; + private PeInputSplit value = null; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + this.readOver = false; + this.split = (PeInputSplit)split; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if(readOver) { + return false; + } + + key = NullWritable.get(); + value = (PeInputSplit)split; + + readOver = true; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public PeInputSplit getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + if(readOver) { + return 1.0f; + } else { + return 0.0f; + } + } + + @Override + public void close() throws IOException { + // do nothing + } + } + } + /** * MapReduce job that runs a performance evaluation client in each map task. */ - @SuppressWarnings("unchecked") - public static class EvaluationMapTask extends MapReduceBase - implements Mapper { + public static class EvaluationMapTask + extends Mapper { + /** configuration parameter name that contains the command */ public final static String CMD_KEY = "EvaluationMapTask.command"; private String cmd; private PerformanceEvaluation pe; @Override - public void configure(JobConf j) { - this.cmd = j.get(CMD_KEY); - - this.pe = new PerformanceEvaluation(new HBaseConfiguration(j)); + protected void setup(Context context) throws IOException, InterruptedException { + this.cmd = context.getConfiguration().get(CMD_KEY); + this.pe = new PerformanceEvaluation(new HBaseConfiguration(context.getConfiguration())); } - public void map(final Object key, - final Object value, final OutputCollector output, - final Reporter reporter) - throws IOException { - Matcher m = LINE_PATTERN.matcher(((Text)value).toString()); - if (m != null && m.matches()) { - int startRow = Integer.parseInt(m.group(1)); - int perClientRunRows = Integer.parseInt(m.group(2)); - int totalRows = Integer.parseInt(m.group(3)); - Status status = new Status() { - public void setStatus(String msg) { - reporter.setStatus(msg); - } - }; - long elapsedTime = this.pe.runOneClient(this.cmd, startRow, - perClientRunRows, totalRows, status); - // Collect how much time the thing took. Report as map output and - // to the ELAPSED_TIME counter. - reporter.incrCounter(Counter.ELAPSED_TIME, elapsedTime); - reporter.incrCounter(Counter.ROWS, perClientRunRows); - output.collect(new LongWritable(startRow), - new Text(Long.toString(elapsedTime))); - } + protected void map(NullWritable key, PeInputSplit value, final Context context) + throws IOException, InterruptedException { + + Status status = new Status() { + public void setStatus(String msg) { + context.setStatus(msg); + } + }; + + // Evaluation task + long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), + value.getRows(), value.getTotalRows(), status); + // Collect how much time the thing took. Report as map output and + // to the ELAPSED_TIME counter. + context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); + context.getCounter(Counter.ROWS).increment(value.rows); + context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); + context.progress(); } } @@ -225,7 +399,7 @@ public class PerformanceEvaluation implements HConstants { * @throws IOException */ private void runNIsMoreThanOne(final String cmd) - throws IOException { + throws IOException, InterruptedException, ClassNotFoundException { checkTable(new HBaseAdmin(conf)); if (this.nomapred) { doMultipleClients(cmd); @@ -288,21 +462,29 @@ public class PerformanceEvaluation implements HConstants { * @param cmd Command to run. * @throws IOException */ - private void doMapReduce(final String cmd) throws IOException { + private void doMapReduce(final String cmd) throws IOException, + InterruptedException, ClassNotFoundException { Path inputDir = writeInputFile(this.conf); this.conf.set(EvaluationMapTask.CMD_KEY, cmd); - JobConf job = new JobConf(this.conf, this.getClass()); - FileInputFormat.setInputPaths(job, inputDir); - job.setInputFormat(TextInputFormat.class); + Job job = new Job(this.conf); + job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); + + job.setInputFormatClass(PeInputFormat.class); + PeInputFormat.setInputPaths(job, inputDir); + + job.setOutputKeyClass(LongWritable.class); + job.setOutputValueClass(LongWritable.class); + job.setMapperClass(EvaluationMapTask.class); - job.setMaxMapAttempts(1); - job.setMaxReduceAttempts(1); - job.setNumMapTasks(this.N * 10); // Ten maps per client. + job.setReducerClass(LongSumReducer.class); + job.setNumReduceTasks(1); - job.setOutputFormat(TextOutputFormat.class); - FileOutputFormat.setOutputPath(job, new Path(inputDir, "outputs")); - JobClient.runJob(job); + + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs")); + + job.waitForCompletion(true); } /* @@ -666,7 +848,8 @@ public class PerformanceEvaluation implements HConstants { } } - private void runTest(final String cmd) throws IOException { + private void runTest(final String cmd) throws IOException, + InterruptedException, ClassNotFoundException { if (cmd.equals(RANDOM_READ_MEM)) { // For this one test, so all fits in memory, make R smaller (See // pg. 9 of BigTable paper).