HBASE-10007 PerformanceEvaluation: Add sampling and latency collection to randomRead test (Nick Dimiduk)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1544681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-11-22 21:14:00 +00:00
parent 1349b26d3a
commit 5824f11f55
1 changed files with 185 additions and 81 deletions

View File

@ -23,6 +23,9 @@ import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.math.BigDecimal;
import java.math.MathContext;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -82,7 +85,6 @@ import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
/** /**
* Script used evaluating HBase performance and scalability. Runs a HBase * Script used evaluating HBase performance and scalability. Runs a HBase
* client that steps through one of a set of hardcoded tests or 'experiments' * client that steps through one of a set of hardcoded tests or 'experiments'
@ -102,52 +104,55 @@ import org.apache.hadoop.util.ToolRunner;
public class PerformanceEvaluation extends Configured implements Tool { public class PerformanceEvaluation extends Configured implements Tool {
protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; public static final TableName TABLE_NAME = TableName.valueOf("TestTable");
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
public static final int VALUE_LENGTH = 1000; public static final int VALUE_LENGTH = 1000;
public static final int ROW_LENGTH = 26;
private static final int ONE_GB = 1024 * 1024 * 1000; private static final int ONE_GB = 1024 * 1024 * 1000;
private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH; private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH;
// TODO : should we make this configurable // TODO : should we make this configurable
private static final int TAG_LENGTH = 256; private static final int TAG_LENGTH = 256;
private static final DecimalFormat FMT = new DecimalFormat("0.##");
public static final byte[] COMPRESSION = Bytes.toBytes("NONE"); private static final MathContext CXT = MathContext.DECIMAL64;
public static final TableName TABLE_NAME = private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
TableName.valueOf("TestTable"); private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
protected HTableDescriptor TABLE_DESCRIPTOR; protected HTableDescriptor TABLE_DESCRIPTOR;
protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>(); protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
private boolean nomapred = false; private boolean nomapred = false;
private int rowPrefixLength = DEFAULT_ROW_PREFIX_LENGTH;
private int N = 1; private int N = 1;
private int R = ROWS_PER_GB; private int R = ROWS_PER_GB;
private float sampleRate = 1.0f;
private TableName tableName = TABLE_NAME; private TableName tableName = TABLE_NAME;
private Compression.Algorithm compression = Compression.Algorithm.NONE; private Compression.Algorithm compression = Compression.Algorithm.NONE;
private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
private boolean flushCommits = true; private boolean flushCommits = true;
private boolean writeToWAL = true; private boolean writeToWAL = true;
private boolean inMemoryCF = false; private boolean inMemoryCF = false;
private boolean reportLatency = false;
private int presplitRegions = 0; private int presplitRegions = 0;
private boolean useTags = false; private boolean useTags = false;
private int noOfTags = 1; private int noOfTags = 1;
private HConnection connection; private HConnection connection;
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
/**
* Regex to parse lines in input file passed to mapreduce task. /** Regex to parse lines in input file passed to mapreduce task. */
*/
public static final Pattern LINE_PATTERN = public static final Pattern LINE_PATTERN =
Pattern.compile("tableName=(\\w+),\\s+" + Pattern.compile("tableName=(\\w+),\\s+" +
"startRow=(\\d+),\\s+" + "startRow=(\\d+),\\s+" +
"perClientRunRows=(\\d+),\\s+" + "perClientRunRows=(\\d+),\\s+" +
"totalRows=(\\d+),\\s+" + "totalRows=(\\d+),\\s+" +
"sampleRate=([-+]?[0-9]*\\.?[0-9]+),\\s+" +
"clients=(\\d+),\\s+" + "clients=(\\d+),\\s+" +
"flushCommits=(\\w+),\\s+" + "flushCommits=(\\w+),\\s+" +
"writeToWAL=(\\w+),\\s+" + "writeToWAL=(\\w+),\\s+" +
"useTags=(\\w+),\\s+" + "useTags=(\\w+),\\s+" +
"noOfTags=(\\d+)"); "noOfTags=(\\d+),\\s+" +
"reportLatency=(\\w+)");
/** /**
* Enum for map metrics. Keep it out here rather than inside in the Map * Enum for map metrics. Keep it out here rather than inside in the Map
@ -157,8 +162,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
/** elapsed time */ /** elapsed time */
ELAPSED_TIME, ELAPSED_TIME,
/** number of rows */ /** number of rows */
ROWS} ROWS
}
/** /**
* Constructor * Constructor
@ -221,26 +226,30 @@ public class PerformanceEvaluation extends Configured implements Tool {
private int startRow = 0; private int startRow = 0;
private int rows = 0; private int rows = 0;
private int totalRows = 0; private int totalRows = 0;
private float sampleRate = 1.0f;
private int clients = 0; private int clients = 0;
private boolean flushCommits = false; private boolean flushCommits = false;
private boolean writeToWAL = true; private boolean writeToWAL = true;
private boolean useTags = false; private boolean useTags = false;
private int noOfTags = 0; private int noOfTags = 0;
private boolean reportLatency = false;
public PeInputSplit() { public PeInputSplit() {}
}
public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows,
boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) { float sampleRate, int clients, boolean flushCommits, boolean writeToWAL,
boolean useTags, int noOfTags, boolean reportLatency) {
this.tableName = tableName; this.tableName = tableName;
this.startRow = startRow; this.startRow = startRow;
this.rows = rows; this.rows = rows;
this.totalRows = totalRows; this.totalRows = totalRows;
this.sampleRate = sampleRate;
this.clients = clients; this.clients = clients;
this.flushCommits = flushCommits; this.flushCommits = flushCommits;
this.writeToWAL = writeToWAL; this.writeToWAL = writeToWAL;
this.useTags = useTags; this.useTags = useTags;
this.noOfTags = noOfTags; this.noOfTags = noOfTags;
this.reportLatency = reportLatency;
} }
@Override @Override
@ -253,11 +262,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.startRow = in.readInt(); this.startRow = in.readInt();
this.rows = in.readInt(); this.rows = in.readInt();
this.totalRows = in.readInt(); this.totalRows = in.readInt();
this.sampleRate = in.readFloat();
this.clients = in.readInt(); this.clients = in.readInt();
this.flushCommits = in.readBoolean(); this.flushCommits = in.readBoolean();
this.writeToWAL = in.readBoolean(); this.writeToWAL = in.readBoolean();
this.useTags = in.readBoolean(); this.useTags = in.readBoolean();
this.noOfTags = in.readInt(); this.noOfTags = in.readInt();
this.reportLatency = in.readBoolean();
} }
@Override @Override
@ -268,11 +279,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
out.writeInt(startRow); out.writeInt(startRow);
out.writeInt(rows); out.writeInt(rows);
out.writeInt(totalRows); out.writeInt(totalRows);
out.writeFloat(sampleRate);
out.writeInt(clients); out.writeInt(clients);
out.writeBoolean(flushCommits); out.writeBoolean(flushCommits);
out.writeBoolean(writeToWAL); out.writeBoolean(writeToWAL);
out.writeBoolean(useTags); out.writeBoolean(useTags);
out.writeInt(noOfTags); out.writeInt(noOfTags);
out.writeBoolean(reportLatency);
} }
@Override @Override
@ -301,6 +314,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
return totalRows; return totalRows;
} }
public float getSampleRate() {
return sampleRate;
}
public int getClients() { public int getClients() {
return clients; return clients;
} }
@ -320,6 +337,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
public int getNoOfTags() { public int getNoOfTags() {
return noOfTags; return noOfTags;
} }
public boolean isReportLatency() {
return reportLatency;
}
} }
/** /**
@ -354,26 +375,30 @@ public class PerformanceEvaluation extends Configured implements Tool {
int startRow = Integer.parseInt(m.group(2)); int startRow = Integer.parseInt(m.group(2));
int rows = Integer.parseInt(m.group(3)); int rows = Integer.parseInt(m.group(3));
int totalRows = Integer.parseInt(m.group(4)); int totalRows = Integer.parseInt(m.group(4));
int clients = Integer.parseInt(m.group(5)); float sampleRate = Float.parseFloat(m.group(5));
boolean flushCommits = Boolean.parseBoolean(m.group(6)); int clients = Integer.parseInt(m.group(6));
boolean writeToWAL = Boolean.parseBoolean(m.group(7)); boolean flushCommits = Boolean.parseBoolean(m.group(7));
boolean useTags = Boolean.parseBoolean(m.group(8)); boolean writeToWAL = Boolean.parseBoolean(m.group(8));
int noOfTags = Integer.parseInt(m.group(9)); boolean useTags = Boolean.parseBoolean(m.group(9));
int noOfTags = Integer.parseInt(m.group(10));
boolean reportLatency = Boolean.parseBoolean(m.group(11));
LOG.debug("tableName=" + tableName + LOG.debug("tableName=" + tableName +
" split["+ splitList.size() + "] " + " split["+ splitList.size() + "] " +
" startRow=" + startRow + " startRow=" + startRow +
" rows=" + rows + " rows=" + rows +
" totalRows=" + totalRows + " totalRows=" + totalRows +
" sampleRate=" + sampleRate +
" clients=" + clients + " clients=" + clients +
" flushCommits=" + flushCommits + " flushCommits=" + flushCommits +
" writeToWAL=" + writeToWAL + " writeToWAL=" + writeToWAL +
" useTags=" + useTags + " useTags=" + useTags +
" noOfTags=" + noOfTags); " noOfTags=" + noOfTags +
" reportLatency=" + reportLatency);
PeInputSplit newSplit = PeInputSplit newSplit =
new PeInputSplit(tableName, startRow, rows, totalRows, clients, new PeInputSplit(tableName, startRow, rows, totalRows, sampleRate, clients,
flushCommits, writeToWAL, useTags, noOfTags); flushCommits, writeToWAL, useTags, noOfTags, reportLatency);
splitList.add(newSplit); splitList.add(newSplit);
} }
} }
@ -410,7 +435,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
key = NullWritable.get(); key = NullWritable.get();
value = (PeInputSplit)split; value = split;
readOver = true; readOver = true;
return true; return true;
@ -494,9 +519,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
// Evaluation task // Evaluation task
pe.tableName = value.getTableName(); pe.tableName = value.getTableName();
long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
value.getRows(), value.getTotalRows(), value.getRows(), value.getTotalRows(), value.getSampleRate(),
value.isFlushCommits(), value.isWriteToWAL(), value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(),
value.isUseTags(), value.getNoOfTags(), value.getNoOfTags(), value.isReportLatency(),
HConnectionManager.createConnection(context.getConfiguration()), status); HConnectionManager.createConnection(context.getConfiguration()), status);
// Collect how much time the thing took. Report as map output and // Collect how much time the thing took. Report as map output and
// to the ELAPSED_TIME counter. // to the ELAPSED_TIME counter.
@ -536,8 +561,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
LOG.info("Table " + tableDescriptor + " created"); LOG.info("Table " + tableDescriptor + " created");
} }
} }
boolean tableExists = admin.tableExists(tableDescriptor.getTableName()); return admin.tableExists(tableDescriptor.getTableName());
return tableExists;
} }
protected HTableDescriptor getTableDescriptor() { protected HTableDescriptor getTableDescriptor() {
@ -598,11 +622,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
final List<Thread> threads = new ArrayList<Thread>(this.N); final List<Thread> threads = new ArrayList<Thread>(this.N);
final long[] timings = new long[this.N]; final long[] timings = new long[this.N];
final int perClientRows = R/N; final int perClientRows = R/N;
final float sampleRate = this.sampleRate;
final TableName tableName = this.tableName; final TableName tableName = this.tableName;
final DataBlockEncoding encoding = this.blockEncoding; final DataBlockEncoding encoding = this.blockEncoding;
final boolean flushCommits = this.flushCommits; final boolean flushCommits = this.flushCommits;
final Compression.Algorithm compression = this.compression; final Compression.Algorithm compression = this.compression;
final boolean writeToWal = this.writeToWAL; final boolean writeToWal = this.writeToWAL;
final boolean reportLatency = this.reportLatency;
final int preSplitRegions = this.presplitRegions; final int preSplitRegions = this.presplitRegions;
final boolean useTags = this.useTags; final boolean useTags = this.useTags;
final int numTags = this.noOfTags; final int numTags = this.noOfTags;
@ -621,17 +647,19 @@ public class PerformanceEvaluation extends Configured implements Tool {
pe.writeToWAL = writeToWal; pe.writeToWAL = writeToWal;
pe.presplitRegions = preSplitRegions; pe.presplitRegions = preSplitRegions;
pe.N = N; pe.N = N;
pe.sampleRate = sampleRate;
pe.reportLatency = reportLatency;
pe.connection = connection; pe.connection = connection;
pe.useTags = useTags; pe.useTags = useTags;
pe.noOfTags = numTags; pe.noOfTags = numTags;
try { try {
long elapsedTime = pe.runOneClient(cmd, index * perClientRows, long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
perClientRows, R, perClientRows, R, sampleRate, flushCommits, writeToWal, useTags,
flushCommits, writeToWAL, useTags, noOfTags, connection, new Status() { noOfTags, reportLatency, connection, new Status() {
public void setStatus(final String msg) throws IOException { public void setStatus(final String msg) throws IOException {
LOG.info("client-" + getName() + " " + msg); LOG.info("client-" + getName() + " " + msg);
} }
}); });
timings[index] = elapsedTime; timings[index] = elapsedTime;
LOG.info("Finished " + getName() + " in " + elapsedTime + LOG.info("Finished " + getName() + " in " + elapsedTime +
"ms writing " + perClientRows + " rows"); "ms writing " + perClientRows + " rows");
@ -736,11 +764,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
", perClientRunRows=" + (perClientRows / 10) + ", perClientRunRows=" + (perClientRows / 10) +
", totalRows=" + this.R + ", totalRows=" + this.R +
", sampleRate=" + this.sampleRate +
", clients=" + this.N + ", clients=" + this.N +
", flushCommits=" + this.flushCommits + ", flushCommits=" + this.flushCommits +
", writeToWAL=" + this.writeToWAL + ", writeToWAL=" + this.writeToWAL +
", useTags=" + this.useTags + ", useTags=" + this.useTags +
", noOfTags=" + this.noOfTags; ", noOfTags=" + this.noOfTags +
", reportLatency=" + this.reportLatency;
int hash = h.hash(Bytes.toBytes(s)); int hash = h.hash(Bytes.toBytes(s));
m.put(hash, s); m.put(hash, s);
} }
@ -789,29 +819,32 @@ public class PerformanceEvaluation extends Configured implements Tool {
private int startRow; private int startRow;
private int perClientRunRows; private int perClientRunRows;
private int totalRows; private int totalRows;
private float sampleRate;
private int numClientThreads; private int numClientThreads;
private TableName tableName; private TableName tableName;
private boolean flushCommits; private boolean flushCommits;
private boolean writeToWAL = true; private boolean writeToWAL = true;
private boolean useTags = false; private boolean useTags = false;
private int noOfTags = 0; private int noOfTags = 0;
private boolean reportLatency;
private HConnection connection; private HConnection connection;
TestOptions() { TestOptions() {}
}
TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, TestOptions(int startRow, int perClientRunRows, int totalRows, float sampleRate,
TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL,
int noOfTags, HConnection connection) { boolean useTags, int noOfTags, boolean reportLatency, HConnection connection) {
this.startRow = startRow; this.startRow = startRow;
this.perClientRunRows = perClientRunRows; this.perClientRunRows = perClientRunRows;
this.totalRows = totalRows; this.totalRows = totalRows;
this.sampleRate = sampleRate;
this.numClientThreads = numClientThreads; this.numClientThreads = numClientThreads;
this.tableName = tableName; this.tableName = tableName;
this.flushCommits = flushCommits; this.flushCommits = flushCommits;
this.writeToWAL = writeToWAL; this.writeToWAL = writeToWAL;
this.useTags = useTags; this.useTags = useTags;
this.noOfTags = noOfTags; this.noOfTags = noOfTags;
this.reportLatency = reportLatency;
this.connection = connection; this.connection = connection;
} }
@ -827,6 +860,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
return totalRows; return totalRows;
} }
public float getSampleRate() {
return sampleRate;
}
public int getNumClientThreads() { public int getNumClientThreads() {
return numClientThreads; return numClientThreads;
} }
@ -843,6 +880,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
return writeToWAL; return writeToWAL;
} }
public boolean isReportLatency() {
return reportLatency;
}
public HConnection getConnection() { public HConnection getConnection() {
return connection; return connection;
} }
@ -872,6 +913,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected final int startRow; protected final int startRow;
protected final int perClientRunRows; protected final int perClientRunRows;
protected final int totalRows; protected final int totalRows;
protected final float sampleRate;
private final Status status; private final Status status;
protected TableName tableName; protected TableName tableName;
protected HTableInterface table; protected HTableInterface table;
@ -880,6 +922,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected boolean writeToWAL; protected boolean writeToWAL;
protected boolean useTags; protected boolean useTags;
protected int noOfTags; protected int noOfTags;
protected boolean reportLatency;
protected HConnection connection; protected HConnection connection;
/** /**
@ -891,6 +934,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.startRow = options.getStartRow(); this.startRow = options.getStartRow();
this.perClientRunRows = options.getPerClientRunRows(); this.perClientRunRows = options.getPerClientRunRows();
this.totalRows = options.getTotalRows(); this.totalRows = options.getTotalRows();
this.sampleRate = options.getSampleRate();
this.status = status; this.status = status;
this.tableName = options.getTableName(); this.tableName = options.getTableName();
this.table = null; this.table = null;
@ -899,6 +943,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.writeToWAL = options.isWriteToWAL(); this.writeToWAL = options.isWriteToWAL();
this.useTags = options.isUseTags(); this.useTags = options.isUseTags();
this.noOfTags = options.getNumTags(); this.noOfTags = options.getNumTags();
this.reportLatency = options.isReportLatency();
this.connection = options.getConnection(); this.connection = options.getConnection();
} }
@ -908,7 +953,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected int getReportingPeriod() { protected int getReportingPeriod() {
int period = this.perClientRunRows / 10; int period = this.perClientRunRows / 10;
return period == 0? this.perClientRunRows: period; return period == 0 ? this.perClientRunRows : period;
} }
void testSetup() throws IOException { void testSetup() throws IOException {
@ -974,17 +1019,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
scan.setFilter(new WhileMatchFilter(new PageFilter(120))); scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
ResultScanner s = this.table.getScanner(scan); ResultScanner s = this.table.getScanner(scan);
//int count = 0; for (Result rr; (rr = s.next()) != null;) ;
for (Result rr = null; (rr = s.next()) != null;) {
// LOG.info("" + count++ + " " + rr.toString());
}
s.close(); s.close();
} }
@Override @Override
protected int getReportingPeriod() { protected int getReportingPeriod() {
int period = this.perClientRunRows / 100; int period = this.perClientRunRows / 100;
return period == 0? this.perClientRunRows: period; return period == 0 ? this.perClientRunRows : period;
} }
} }
@ -1002,7 +1044,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
ResultScanner s = this.table.getScanner(scan); ResultScanner s = this.table.getScanner(scan);
int count = 0; int count = 0;
for (Result rr = null; (rr = s.next()) != null;) { for (Result rr; (rr = s.next()) != null;) {
count++; count++;
} }
@ -1075,23 +1117,49 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
static class RandomReadTest extends Test { static class RandomReadTest extends Test {
private final int everyN;
private final boolean reportLatency;
private final float[] times;
int idx = 0;
RandomReadTest(Configuration conf, TestOptions options, Status status) { RandomReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status); super(conf, options, status);
everyN = (int) (this.totalRows / (this.totalRows * this.sampleRate));
LOG.info("Sampling 1 every " + everyN + " out of " + perClientRunRows + " total rows.");
this.reportLatency = options.isReportLatency();
if (this.reportLatency) {
times = new float[(int) Math.ceil(this.perClientRunRows * this.sampleRate)];
} else {
times = null;
}
} }
@Override @Override
void testRow(final int i) throws IOException { void testRow(final int i) throws IOException {
Get get = new Get(getRandomRow(this.rand, this.totalRows)); if (i % everyN == 0) {
get.addColumn(FAMILY_NAME, QUALIFIER_NAME); Get get = new Get(getRandomRow(this.rand, this.totalRows));
this.table.get(get); get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
long start = System.nanoTime();
this.table.get(get);
if (this.reportLatency) {
times[idx++] = (float) ((System.nanoTime() - start) / 1000000.0);
}
}
} }
@Override @Override
protected int getReportingPeriod() { protected int getReportingPeriod() {
int period = this.perClientRunRows / 100; int period = this.perClientRunRows / 100;
return period == 0? this.perClientRunRows: period; return period == 0 ? this.perClientRunRows : period;
} }
@Override
protected void testTakedown() throws IOException {
super.testTakedown();
if (this.reportLatency) {
LOG.info("randomRead latency log (ms): " + Arrays.toString(times));
}
}
} }
static class RandomWriteTest extends Test { static class RandomWriteTest extends Test {
@ -1161,7 +1229,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
get.addColumn(FAMILY_NAME, QUALIFIER_NAME); get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
table.get(get); table.get(get);
} }
} }
static class SequentialWriteTest extends Test { static class SequentialWriteTest extends Test {
@ -1187,7 +1254,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put); table.put(put);
} }
} }
static class FilteredScanTest extends Test { static class FilteredScanTest extends Test {
@ -1223,14 +1289,31 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
} }
/**
* Compute a throughput rate in MB/s.
* @param rows Number of records consumed.
* @param timeMs Time taken in milliseconds.
* @return String value with label, ie '123.76 MB/s'
*/
private static String calculateMbps(int rows, long timeMs) {
// MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS)
// * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB
BigDecimal rowSize =
BigDecimal.valueOf(ROW_LENGTH + VALUE_LENGTH + FAMILY_NAME.length + QUALIFIER_NAME.length);
BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
.divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
.divide(BYTES_PER_MB, CXT);
return FMT.format(mbps) + " MB/s";
}
/* /*
* Format passed integer. * Format passed integer.
* @param number * @param number
* @return Returns zero-prefixed 10-byte wide decimal version of passed * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
* number (Does absolute in case number is negative). * number (Does absolute in case number is negative).
*/ */
public static byte [] format(final int number) { public static byte [] format(final int number) {
byte [] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; byte [] b = new byte[ROW_LENGTH];
int d = Math.abs(number); int d = Math.abs(number);
for (int i = b.length - 1; i >= 0; i--) { for (int i = b.length - 1; i >= 0; i--) {
b[i] = (byte)((d % 10) + '0'); b[i] = (byte)((d % 10) + '0');
@ -1272,16 +1355,17 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
long runOneClient(final Class<? extends Test> cmd, final int startRow, long runOneClient(final Class<? extends Test> cmd, final int startRow,
final int perClientRunRows, final int totalRows, final int perClientRunRows, final int totalRows, final float sampleRate,
boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
HConnection connection, final Status status) boolean reportLatency, HConnection connection, final Status status)
throws IOException { throws IOException {
status.setStatus("Start " + cmd + " at offset " + startRow + " for " + status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
perClientRunRows + " rows"); perClientRunRows + " rows");
long totalElapsedTime = 0; long totalElapsedTime = 0;
TestOptions options = new TestOptions(startRow, perClientRunRows, TestOptions options = new TestOptions(startRow, perClientRunRows,
totalRows, N, tableName, flushCommits, writeToWAL, useTags, noOfTags, connection); totalRows, sampleRate, N, tableName, flushCommits, writeToWAL, useTags, noOfTags,
reportLatency, connection);
final Test t; final Test t;
try { try {
Constructor<? extends Test> constructor = cmd.getDeclaredConstructor( Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
@ -1298,11 +1382,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
totalElapsedTime = t.test(); totalElapsedTime = t.test();
status.setStatus("Finished " + cmd + " in " + totalElapsedTime + status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
"ms at offset " + startRow + " for " + perClientRunRows + " rows"); "ms at offset " + startRow + " for " + perClientRunRows + " rows" +
" (" + calculateMbps((int)(perClientRunRows * sampleRate), totalElapsedTime) + ")");
return totalElapsedTime; return totalElapsedTime;
} }
private void runNIsOne(final Class<? extends Test> cmd) { private void runNIsOne(final Class<? extends Test> cmd) throws IOException {
Status status = new Status() { Status status = new Status() {
public void setStatus(String msg) throws IOException { public void setStatus(String msg) throws IOException {
LOG.info(msg); LOG.info(msg);
@ -1313,10 +1398,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
try { try {
admin = new HBaseAdmin(getConf()); admin = new HBaseAdmin(getConf());
checkTable(admin); checkTable(admin);
runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, runOneClient(cmd, 0, this.R, this.R, this.sampleRate, this.flushCommits,
this.useTags, this.noOfTags, this.connection, status); this.writeToWAL, this.useTags, this.noOfTags, this.reportLatency, this.connection, status);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed", e); LOG.error("Failed", e);
} finally {
if (admin != null) admin.close();
} }
} }
@ -1342,23 +1429,31 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
System.err.println("Usage: java " + this.getClass().getName() + " \\"); System.err.println("Usage: java " + this.getClass().getName() + " \\");
System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\");
System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] [-D<property=value>]* <command> <nclients>"); System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " +
"[-D<property=value>]* <command> <nclients>");
System.err.println(); System.err.println();
System.err.println("Options:"); System.err.println("Options:");
System.err.println(" nomapred Run multiple clients using threads " + System.err.println(" nomapred Run multiple clients using threads " +
"(rather than use mapreduce)"); "(rather than use mapreduce)");
System.err.println(" rows Rows each client runs. Default: One million"); System.err.println(" rows Rows each client runs. Default: One million");
System.err.println(" sampleRate Execute test on a sample of total " +
"rows. Only supported by randomRead. Default: 1.0");
System.err.println(" table Alternate table name. Default: 'TestTable'"); System.err.println(" table Alternate table name. Default: 'TestTable'");
System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'");
System.err.println(" flushCommits Used to determine if the test should flush the table. Default: false"); System.err.println(" flushCommits Used to determine if the test should flush the table. " +
"Default: false");
System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); System.err.println(" writeToWAL Set writeToWAL on puts. Default: True");
System.err.println(" presplit Create presplit table. Recommended for accurate perf analysis (see guide). Default: disabled"); System.err.println(" presplit Create presplit table. Recommended for accurate perf " +
System.err "analysis (see guide). Default: disabled");
.println(" inmemory Tries to keep the HFiles of the CF inmemory as far as possible. Not " + System.err.println(" inmemory Tries to keep the HFiles of the CF " +
"guaranteed that reads are always served from inmemory. Default: false"); "inmemory as far as possible. Not guaranteed that reads are always served " +
System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. Default : false"); "from memory. Default: false");
System.err System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " +
.println(" numoftags Specify the no of tags that would be needed. This works only if usetags is true."); "Default: false");
System.err.println(" numoftags Specify the no of tags that would be needed. " +
"This works only if usetags is true.");
System.err.println(" latency Set to report operation latencies. " +
"Currently only supported by randomRead test. Default: False");
System.err.println(); System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: "); System.err.println(" For example: ");
@ -1428,6 +1523,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue; continue;
} }
final String sampleRate = "--sampleRate=";
if (cmd.startsWith(sampleRate)) {
this.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
continue;
}
final String table = "--table="; final String table = "--table=";
if (cmd.startsWith(table)) { if (cmd.startsWith(table)) {
this.tableName = TableName.valueOf(cmd.substring(table.length())); this.tableName = TableName.valueOf(cmd.substring(table.length()));
@ -1470,6 +1571,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue; continue;
} }
final String latency = "--latency";
if (cmd.startsWith(latency)) {
this.reportLatency = true;
continue;
}
this.connection = HConnectionManager.createConnection(getConf()); this.connection = HConnectionManager.createConnection(getConf());
final String useTags = "--usetags="; final String useTags = "--usetags=";
@ -1507,9 +1614,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
return descriptor != null ? descriptor.getCmdClass() : null; return descriptor != null ? descriptor.getCmdClass() : null;
} }
/**
* @param args
*/
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
System.exit(res); System.exit(res);