HBASE-11350 [PE] Allow random value size
This commit is contained in:
parent
58549428a6
commit
63f0dffdba
|
@ -86,6 +86,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||
import com.yammer.metrics.core.Histogram;
|
||||
import com.yammer.metrics.stats.UniformSample;
|
||||
import com.yammer.metrics.stats.Snapshot;
|
||||
|
||||
import org.htrace.Sampler;
|
||||
import org.htrace.Trace;
|
||||
import org.htrace.TraceScope;
|
||||
|
@ -113,11 +114,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
public static final String TABLE_NAME = "TestTable";
|
||||
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
|
||||
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
|
||||
public static final int VALUE_LENGTH = 1000;
|
||||
public static final int DEFAULT_VALUE_LENGTH = 1000;
|
||||
public static final int ROW_LENGTH = 26;
|
||||
|
||||
private static final int ONE_GB = 1024 * 1024 * 1000;
|
||||
private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH;
|
||||
private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
|
||||
// TODO : should we make this configurable
|
||||
private static final int TAG_LENGTH = 256;
|
||||
private static final DecimalFormat FMT = new DecimalFormat("0.##");
|
||||
|
@ -509,15 +510,18 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
this.compression = that.compression;
|
||||
this.blockEncoding = that.blockEncoding;
|
||||
this.filterAll = that.filterAll;
|
||||
this.valueRandom = that.valueRandom;
|
||||
this.valueSize = that.valueSize;
|
||||
this.period = that.period;
|
||||
}
|
||||
|
||||
public boolean nomapred = false;
|
||||
public boolean filterAll = false;
|
||||
public int startRow = 0;
|
||||
public float size = 1.0f;
|
||||
public int perClientRunRows = ROWS_PER_GB;
|
||||
public int perClientRunRows = DEFAULT_ROWS_PER_GB;
|
||||
public int numClientThreads = 1;
|
||||
public int totalRows = ROWS_PER_GB;
|
||||
public int totalRows = DEFAULT_ROWS_PER_GB;
|
||||
public float sampleRate = 1.0f;
|
||||
public double traceRate = 0.0;
|
||||
public String tableName = TABLE_NAME;
|
||||
|
@ -532,6 +536,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
public int presplitRegions = 0;
|
||||
public Compression.Algorithm compression = Compression.Algorithm.NONE;
|
||||
public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
|
||||
public boolean valueRandom = false;
|
||||
public int valueSize = DEFAULT_VALUE_LENGTH;
|
||||
public int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -560,6 +567,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
private String testName;
|
||||
private Histogram latency;
|
||||
private Histogram valueSize;
|
||||
|
||||
/**
|
||||
* Note that all subclasses of this class must provide a public contructor
|
||||
|
@ -582,13 +590,40 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
|
||||
}
|
||||
|
||||
private String generateStatus(final int sr, final int i, final int lr) {
|
||||
return sr + "/" + i + "/" + lr + " " + getShortLatencyReport();
|
||||
int getValueLength(final Random r) {
|
||||
return opts.valueRandom? Math.abs(r.nextInt() % opts.valueSize): opts.valueSize;
|
||||
}
|
||||
|
||||
void updateValueSize(final Result [] rs) throws IOException {
|
||||
if (rs == null || !isRandomValueSize()) return;
|
||||
for (Result r: rs) updateValueSize(r);
|
||||
}
|
||||
|
||||
void updateValueSize(final Result r) throws IOException {
|
||||
if (r == null || !isRandomValueSize()) return;
|
||||
int size = 0;
|
||||
for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
|
||||
size += scanner.current().getValueLength();
|
||||
}
|
||||
updateValueSize(size);
|
||||
}
|
||||
|
||||
void updateValueSize(final int valueSize) {
|
||||
if (!isRandomValueSize()) return;
|
||||
this.valueSize.update(valueSize);
|
||||
}
|
||||
|
||||
String generateStatus(final int sr, final int i, final int lr) {
|
||||
return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
|
||||
(!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
|
||||
}
|
||||
|
||||
boolean isRandomValueSize() {
|
||||
return opts.valueRandom;
|
||||
}
|
||||
|
||||
protected int getReportingPeriod() {
|
||||
int period = opts.perClientRunRows / 10;
|
||||
return period == 0 ? opts.perClientRunRows : period;
|
||||
return opts.period;
|
||||
}
|
||||
|
||||
void testSetup() throws IOException {
|
||||
|
@ -601,6 +636,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
Histogram.class.getDeclaredConstructor(com.yammer.metrics.stats.Sample.class);
|
||||
ctor.setAccessible(true);
|
||||
latency = (Histogram) ctor.newInstance(new UniformSample(1024 * 500));
|
||||
valueSize = (Histogram) ctor.newInstance(new UniformSample(1024 * 500));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -609,6 +645,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
void testTakedown() throws IOException {
|
||||
reportLatency();
|
||||
reportValueSize();
|
||||
if (opts.flushCommits) {
|
||||
this.table.flushCommits();
|
||||
}
|
||||
|
@ -662,17 +699,27 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
private void reportLatency() throws IOException {
|
||||
status.setStatus(testName + " latency log (microseconds), on " +
|
||||
latency.count() + " measures");
|
||||
Snapshot sn = latency.getSnapshot();
|
||||
status.setStatus(testName + " Min = " + latency.min());
|
||||
status.setStatus(testName + " Avg = " + latency.mean());
|
||||
status.setStatus(testName + " StdDev = " + latency.stdDev());
|
||||
reportHistogram(this.latency);
|
||||
}
|
||||
|
||||
private void reportValueSize() throws IOException {
|
||||
status.setStatus(testName + " valueSize after " +
|
||||
valueSize.count() + " measures");
|
||||
reportHistogram(this.valueSize);
|
||||
}
|
||||
|
||||
private void reportHistogram(final Histogram h) throws IOException {
|
||||
Snapshot sn = h.getSnapshot();
|
||||
status.setStatus(testName + " Min = " + h.min());
|
||||
status.setStatus(testName + " Avg = " + h.mean());
|
||||
status.setStatus(testName + " StdDev = " + h.stdDev());
|
||||
status.setStatus(testName + " 50th = " + sn.getMedian());
|
||||
status.setStatus(testName + " 95th = " + sn.get95thPercentile());
|
||||
status.setStatus(testName + " 99th = " + sn.get99thPercentile());
|
||||
status.setStatus(testName + " 99.9th = " + sn.get999thPercentile());
|
||||
status.setStatus(testName + " 99.99th = " + sn.getValue(0.9999));
|
||||
status.setStatus(testName + " 99.999th = " + sn.getValue(0.99999));
|
||||
status.setStatus(testName + " Max = " + latency.max());
|
||||
status.setStatus(testName + " Max = " + h.max());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -684,9 +731,22 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
* @return Subset of the histograms' calculation.
|
||||
*/
|
||||
private String getShortLatencyReport() {
|
||||
Snapshot sn = latency.getSnapshot();
|
||||
return "Mean=" + DOUBLE_FORMAT.format(latency.mean()) +
|
||||
", StdDev=" + DOUBLE_FORMAT.format(latency.stdDev()) +
|
||||
return getShortHistogramReport(this.latency);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Subset of the histograms' calculation.
|
||||
*/
|
||||
private String getShortValueSizeReport() {
|
||||
return getShortHistogramReport(this.valueSize);
|
||||
}
|
||||
|
||||
private String getShortHistogramReport(final Histogram h) {
|
||||
Snapshot sn = h.getSnapshot();
|
||||
return "mean=" + DOUBLE_FORMAT.format(h.mean()) +
|
||||
", min=" + DOUBLE_FORMAT.format(h.min()) +
|
||||
", max=" + DOUBLE_FORMAT.format(h.max()) +
|
||||
", stdDev=" + DOUBLE_FORMAT.format(h.stdDev()) +
|
||||
", 95th=" + DOUBLE_FORMAT.format(sn.get95thPercentile()) +
|
||||
", 99th=" + DOUBLE_FORMAT.format(sn.get99thPercentile());
|
||||
}
|
||||
|
@ -716,7 +776,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
list.addFilter(new WhileMatchFilter(new PageFilter(120)));
|
||||
scan.setFilter(list);
|
||||
ResultScanner s = this.table.getScanner(scan);
|
||||
for (Result rr; (rr = s.next()) != null;) ;
|
||||
for (Result rr; (rr = s.next()) != null;) {
|
||||
updateValueSize(rr);
|
||||
}
|
||||
s.close();
|
||||
}
|
||||
|
||||
|
@ -741,12 +803,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
scan.setFilter(new FilterAllFilter());
|
||||
}
|
||||
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
|
||||
ResultScanner s = this.table.getScanner(scan);
|
||||
Result r = null;
|
||||
int count = 0;
|
||||
while (s.next() != null) {
|
||||
ResultScanner s = this.table.getScanner(scan);
|
||||
for (; (r = s.next()) != null;) {
|
||||
updateValueSize(r);;
|
||||
count++;
|
||||
}
|
||||
|
||||
if (i % 100 == 0) {
|
||||
LOG.info(String.format("Scan for key range %s - %s returned %s rows",
|
||||
Bytes.toString(startAndStopRow.getFirst()),
|
||||
|
@ -837,17 +900,18 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
if (opts.multiGet > 0) {
|
||||
this.gets.add(get);
|
||||
if (this.gets.size() == opts.multiGet) {
|
||||
this.table.get(this.gets);
|
||||
Result [] rs = this.table.get(this.gets);
|
||||
updateValueSize(rs);
|
||||
this.gets.clear();
|
||||
}
|
||||
} else {
|
||||
this.table.get(get);
|
||||
updateValueSize(this.table.get(get));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getReportingPeriod() {
|
||||
int period = opts.perClientRunRows / 100;
|
||||
int period = opts.perClientRunRows / 10;
|
||||
return period == 0 ? opts.perClientRunRows : period;
|
||||
}
|
||||
|
||||
|
@ -870,7 +934,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
void testRow(final int i) throws IOException {
|
||||
byte[] row = getRandomRow(this.rand, opts.totalRows);
|
||||
Put put = new Put(row);
|
||||
byte[] value = generateData(this.rand, VALUE_LENGTH);
|
||||
byte[] value = generateData(this.rand, getValueLength(this.rand));
|
||||
if (opts.useTags) {
|
||||
byte[] tag = generateData(this.rand, TAG_LENGTH);
|
||||
Tag[] tags = new Tag[opts.noOfTags];
|
||||
|
@ -881,15 +945,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
|
||||
value, tags);
|
||||
put.add(kv);
|
||||
updateValueSize(kv.getValueLength());
|
||||
} else {
|
||||
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
|
||||
updateValueSize(value.length);
|
||||
}
|
||||
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class ScanTest extends Test {
|
||||
private ResultScanner testScanner;
|
||||
|
||||
|
@ -917,7 +982,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
}
|
||||
this.testScanner = table.getScanner(scan);
|
||||
}
|
||||
testScanner.next();
|
||||
Result r = testScanner.next();
|
||||
updateValueSize(r);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -934,7 +1000,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
if (opts.filterAll) {
|
||||
get.setFilter(new FilterAllFilter());
|
||||
}
|
||||
table.get(get);
|
||||
updateValueSize(table.get(get));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -947,7 +1013,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
void testRow(final int i) throws IOException {
|
||||
byte[] row = format(i);
|
||||
Put put = new Put(row);
|
||||
byte[] value = generateData(this.rand, VALUE_LENGTH);
|
||||
byte[] value = generateData(this.rand, getValueLength(this.rand));
|
||||
if (opts.useTags) {
|
||||
byte[] tag = generateData(this.rand, TAG_LENGTH);
|
||||
Tag[] tags = new Tag[opts.noOfTags];
|
||||
|
@ -958,8 +1024,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
|
||||
value, tags);
|
||||
put.add(kv);
|
||||
updateValueSize(kv.getValueLength());
|
||||
} else {
|
||||
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
|
||||
updateValueSize(value.length);
|
||||
}
|
||||
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||
table.put(put);
|
||||
|
@ -975,12 +1043,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
@Override
|
||||
void testRow(int i) throws IOException {
|
||||
byte[] value = generateData(this.rand, VALUE_LENGTH);
|
||||
byte[] value = generateData(this.rand, getValueLength(this.rand));
|
||||
Scan scan = constructScan(value);
|
||||
ResultScanner scanner = null;
|
||||
try {
|
||||
scanner = this.table.getScanner(scan);
|
||||
while (scanner.next() != null) {
|
||||
for (Result r = null; (r = scanner.next()) != null;) {
|
||||
updateValueSize(r);
|
||||
}
|
||||
} finally {
|
||||
if (scanner != null) scanner.close();
|
||||
|
@ -1010,11 +1079,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
* @param timeMs Time taken in milliseconds.
|
||||
* @return String value with label, ie '123.76 MB/s'
|
||||
*/
|
||||
private static String calculateMbps(int rows, long timeMs) {
|
||||
private static String calculateMbps(int rows, long timeMs, final int valueSize) {
|
||||
// MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS)
|
||||
// * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB
|
||||
BigDecimal rowSize =
|
||||
BigDecimal.valueOf(ROW_LENGTH + VALUE_LENGTH + FAMILY_NAME.length + QUALIFIER_NAME.length);
|
||||
BigDecimal.valueOf(ROW_LENGTH + valueSize + FAMILY_NAME.length + QUALIFIER_NAME.length);
|
||||
BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
|
||||
.divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
|
||||
.divide(BYTES_PER_MB, CXT);
|
||||
|
@ -1071,7 +1140,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
*/
|
||||
@Deprecated
|
||||
public static byte[] generateValue(final Random r) {
|
||||
return generateData(r, VALUE_LENGTH);
|
||||
return generateData(r, DEFAULT_VALUE_LENGTH);
|
||||
}
|
||||
|
||||
static byte [] getRandomRow(final Random random, final int totalRows) {
|
||||
|
@ -1102,10 +1171,15 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
|
||||
status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
|
||||
"ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
|
||||
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")");
|
||||
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
|
||||
getAverageValueLength(opts)) + ")");
|
||||
return totalElapsedTime;
|
||||
}
|
||||
|
||||
private static int getAverageValueLength(final TestOptions opts) {
|
||||
return opts.valueRandom? opts.valueSize/2: opts.valueSize;
|
||||
}
|
||||
|
||||
private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
|
||||
InterruptedException, ClassNotFoundException {
|
||||
HBaseAdmin admin = null;
|
||||
|
@ -1131,9 +1205,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
System.err.println(message);
|
||||
}
|
||||
System.err.println("Usage: java " + this.getClass().getName() + " \\");
|
||||
System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\");
|
||||
System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " +
|
||||
"[-D<property=value>]* <command> <nclients>");
|
||||
System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
|
||||
System.err.println();
|
||||
System.err.println("Options:");
|
||||
System.err.println(" nomapred Run multiple clients using threads " +
|
||||
|
@ -1166,6 +1238,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
+ " there by not returning any thing back to the client. Helps to check the server side"
|
||||
+ " performance. Uses FilterAllFilter internally. ");
|
||||
System.err.println(" latency Set to report operation latencies. Default: False");
|
||||
System.err.println(" valueSize Pass value size to use: Default: 1024");
|
||||
System.err.println(" valueRandom Set if we should vary value size between 0 and " +
|
||||
"'valueSize': Default: Not set.");
|
||||
System.err.println(" period Report every 'period' rows: " +
|
||||
"Default: opts.perClientRunRows / 10");
|
||||
System.err.println();
|
||||
System.err.println(" Note: -D properties will be applied to the conf used. ");
|
||||
System.err.println(" For example: ");
|
||||
|
@ -1332,21 +1409,43 @@ public class PerformanceEvaluation extends Configured implements Tool {
|
|||
continue;
|
||||
}
|
||||
|
||||
final String valueSize = "--valueSize=";
|
||||
if (cmd.startsWith(valueSize)) {
|
||||
opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String valueRandom = "--valueRandom";
|
||||
if (cmd.startsWith(valueRandom)) {
|
||||
opts.valueRandom = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
final String period = "--period=";
|
||||
if (cmd.startsWith(period)) {
|
||||
opts.period = Integer.parseInt(cmd.substring(period.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
Class<? extends Test> cmdClass = determineCommandClass(cmd);
|
||||
if (cmdClass != null) {
|
||||
opts.numClientThreads = getNumClients(i + 1, args);
|
||||
if (opts.size != DEFAULT_OPTS.size &&
|
||||
opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
|
||||
throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
|
||||
throw new IllegalArgumentException(rows + " and " + size +
|
||||
" are mutually exclusive arguments.");
|
||||
}
|
||||
// Calculate how many rows per gig. If random value size presume that that half the max
|
||||
// is average row size.
|
||||
int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
|
||||
if (opts.size != DEFAULT_OPTS.size) {
|
||||
// total size in GB specified
|
||||
opts.totalRows = (int) opts.size * ROWS_PER_GB;
|
||||
opts.totalRows = (int) opts.size * rowsPerGB;
|
||||
opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
|
||||
} else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
|
||||
// number of rows specified
|
||||
opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
|
||||
opts.size = opts.totalRows / ROWS_PER_GB;
|
||||
opts.size = opts.totalRows / rowsPerGB;
|
||||
}
|
||||
runTest(cmdClass, opts);
|
||||
errCode = 0;
|
||||
|
|
|
@ -345,7 +345,8 @@ public class TestHFileOutputFormat {
|
|||
// first region start key is always empty
|
||||
ret[0] = HConstants.EMPTY_BYTE_ARRAY;
|
||||
for (int i = 1; i < numKeys; i++) {
|
||||
ret[i] = PerformanceEvaluation.generateData(random, PerformanceEvaluation.VALUE_LENGTH);
|
||||
ret[i] =
|
||||
PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -344,7 +344,8 @@ public class TestHFileOutputFormat2 {
|
|||
// first region start key is always empty
|
||||
ret[0] = HConstants.EMPTY_BYTE_ARRAY;
|
||||
for (int i = 1; i < numKeys; i++) {
|
||||
ret[i] = PerformanceEvaluation.generateData(random, PerformanceEvaluation.VALUE_LENGTH);
|
||||
ret[i] =
|
||||
PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue