HBASE-13118 [PE] Add being able to write many columns

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
This commit is contained in:
stack 2015-04-09 16:49:16 -07:00
parent ed703762ae
commit 66f7bf4615
3 changed files with 115 additions and 55 deletions

View File

@ -124,7 +124,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
public static final String TABLE_NAME = "TestTable"; public static final String TABLE_NAME = "TestTable";
public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
public static final int DEFAULT_VALUE_LENGTH = 1000; public static final int DEFAULT_VALUE_LENGTH = 1000;
public static final int ROW_LENGTH = 26; public static final int ROW_LENGTH = 26;
@ -610,6 +611,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
int valueSize = DEFAULT_VALUE_LENGTH; int valueSize = DEFAULT_VALUE_LENGTH;
int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10; int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
int cycles = 1; int cycles = 1;
int columns = 1;
int caching = 30;
boolean addColumns = true; boolean addColumns = true;
public TestOptions() {} public TestOptions() {}
@ -653,6 +656,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.randomSleep = that.randomSleep; this.randomSleep = that.randomSleep;
this.measureAfter = that.measureAfter; this.measureAfter = that.measureAfter;
this.addColumns = that.addColumns; this.addColumns = that.addColumns;
this.columns = that.columns;
this.caching = that.caching;
}
public int getCaching() {
return this.caching;
}
public void setCaching(final int caching) {
this.caching = caching;
}
public int getColumns() {
return this.columns;
}
public void setColumns(final int columns) {
this.columns = columns;
} }
public int getCycles() { public int getCycles() {
@ -1157,7 +1178,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
void onStartup() throws IOException { void onStartup() throws IOException {
this.table = connection.getTable(TableName.valueOf(opts.tableName)); this.table = connection.getTable(TableName.valueOf(opts.tableName));
} }
@Override @Override
void onTakedown() throws IOException { void onTakedown() throws IOException {
table.close(); table.close();
@ -1175,7 +1196,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
void onStartup() throws IOException { void onStartup() throws IOException {
this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
} }
@Override @Override
void onTakedown() throws IOException { void onTakedown() throws IOException {
mutator.close(); mutator.close();
@ -1190,9 +1211,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override @Override
void testRow(final int i) throws IOException { void testRow(final int i) throws IOException {
Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows)); Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
scan.setCaching(opts.caching);
FilterList list = new FilterList(); FilterList list = new FilterList();
if (opts.addColumns) { if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
} else {
scan.addFamily(FAMILY_NAME);
} }
if (opts.filterAll) { if (opts.filterAll) {
list.addFilter(new FilterAllFilter()); list.addFilter(new FilterAllFilter());
@ -1223,11 +1247,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException { void testRow(final int i) throws IOException {
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
scan.setCaching(opts.caching);
if (opts.filterAll) { if (opts.filterAll) {
scan.setFilter(new FilterAllFilter()); scan.setFilter(new FilterAllFilter());
} }
if (opts.addColumns) { if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
} else {
scan.addFamily(FAMILY_NAME);
} }
Result r = null; Result r = null;
int count = 0; int count = 0;
@ -1326,6 +1353,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
Get get = new Get(getRandomRow(this.rand, opts.totalRows)); Get get = new Get(getRandomRow(this.rand, opts.totalRows));
if (opts.addColumns) { if (opts.addColumns) {
get.addColumn(FAMILY_NAME, QUALIFIER_NAME); get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
} else {
get.addFamily(FAMILY_NAME);
} }
if (opts.filterAll) { if (opts.filterAll) {
get.setFilter(new FilterAllFilter()); get.setFilter(new FilterAllFilter());
@ -1369,21 +1398,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException { void testRow(final int i) throws IOException {
byte[] row = getRandomRow(this.rand, opts.totalRows); byte[] row = getRandomRow(this.rand, opts.totalRows);
Put put = new Put(row); Put put = new Put(row);
byte[] value = generateData(this.rand, getValueLength(this.rand)); for (int column = 0; column < opts.columns; column++) {
if (opts.useTags) { byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
byte[] tag = generateData(this.rand, TAG_LENGTH); byte[] value = generateData(this.rand, getValueLength(this.rand));
Tag[] tags = new Tag[opts.noOfTags]; if (opts.useTags) {
for (int n = 0; n < opts.noOfTags; n++) { byte[] tag = generateData(this.rand, TAG_LENGTH);
Tag t = new Tag((byte) n, tag); Tag[] tags = new Tag[opts.noOfTags];
tags[n] = t; for (int n = 0; n < opts.noOfTags; n++) {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
updateValueSize(kv.getValueLength());
} else {
put.add(FAMILY_NAME, qualifier, value);
updateValueSize(value.length);
} }
KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
updateValueSize(kv.getValueLength());
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
updateValueSize(value.length);
} }
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
mutator.mutate(put); mutator.mutate(put);
@ -1410,9 +1442,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException { void testRow(final int i) throws IOException {
if (this.testScanner == null) { if (this.testScanner == null) {
Scan scan = new Scan(format(opts.startRow)); Scan scan = new Scan(format(opts.startRow));
scan.setCaching(30); scan.setCaching(opts.caching);
if (opts.addColumns) { if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
} else {
scan.addFamily(FAMILY_NAME);
} }
if (opts.filterAll) { if (opts.filterAll) {
scan.setFilter(new FilterAllFilter()); scan.setFilter(new FilterAllFilter());
@ -1452,21 +1486,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException { void testRow(final int i) throws IOException {
byte[] row = format(i); byte[] row = format(i);
Put put = new Put(row); Put put = new Put(row);
byte[] value = generateData(this.rand, getValueLength(this.rand)); for (int column = 0; column < opts.columns; column++) {
if (opts.useTags) { byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
byte[] tag = generateData(this.rand, TAG_LENGTH); byte[] value = generateData(this.rand, getValueLength(this.rand));
Tag[] tags = new Tag[opts.noOfTags]; if (opts.useTags) {
for (int n = 0; n < opts.noOfTags; n++) { byte[] tag = generateData(this.rand, TAG_LENGTH);
Tag t = new Tag((byte) n, tag); Tag[] tags = new Tag[opts.noOfTags];
tags[n] = t; for (int n = 0; n < opts.noOfTags; n++) {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
updateValueSize(kv.getValueLength());
} else {
put.add(FAMILY_NAME, qualifier, value);
updateValueSize(value.length);
} }
KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
value, tags);
put.add(kv);
updateValueSize(kv.getValueLength());
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
updateValueSize(value.length);
} }
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
mutator.mutate(put); mutator.mutate(put);
@ -1498,7 +1535,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected Scan constructScan(byte[] valuePrefix) throws IOException { protected Scan constructScan(byte[] valuePrefix) throws IOException {
FilterList list = new FilterList(); FilterList list = new FilterList();
Filter filter = new SingleColumnValueFilter( Filter filter = new SingleColumnValueFilter(
FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL, FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
new BinaryComparator(valuePrefix) new BinaryComparator(valuePrefix)
); );
list.addFilter(filter); list.addFilter(filter);
@ -1506,8 +1543,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
list.addFilter(new FilterAllFilter()); list.addFilter(new FilterAllFilter());
} }
Scan scan = new Scan(); Scan scan = new Scan();
scan.setCaching(opts.caching);
if (opts.addColumns) { if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
} else {
scan.addFamily(FAMILY_NAME);
} }
scan.setFilter(list); scan.setFilter(list);
return scan; return scan;
@ -1520,11 +1560,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @param timeMs Time taken in milliseconds. * @param timeMs Time taken in milliseconds.
* @return String value with label, ie '123.76 MB/s' * @return String value with label, ie '123.76 MB/s'
*/ */
private static String calculateMbps(int rows, long timeMs, final int valueSize) { private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) {
// MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS) BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
// * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
BigDecimal rowSize =
BigDecimal.valueOf(ROW_LENGTH + valueSize + FAMILY_NAME.length + QUALIFIER_NAME.length);
BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
.divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
.divide(BYTES_PER_MB, CXT); .divide(BYTES_PER_MB, CXT);
@ -1613,7 +1651,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
status.setStatus("Finished " + cmd + " in " + totalElapsedTime + status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
"ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
getAverageValueLength(opts)) + ")"); getAverageValueLength(opts), opts.columns) + ")");
return new RunResult(totalElapsedTime, t.getLatency()); return new RunResult(totalElapsedTime, t.getLatency());
} }
@ -1645,14 +1683,23 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
protected void printUsage() { protected void printUsage() {
printUsage(null); printUsage(this.getClass().getName(), null);
} }
protected void printUsage(final String message) { protected static void printUsage(final String message) {
printUsage(PerformanceEvaluation.class.getName(), message);
}
protected static void printUsageAndExit(final String message, final int exitCode) {
printUsage(message);
System.exit(exitCode);
}
protected static void printUsage(final String className, final String message) {
if (message != null && message.length() > 0) { if (message != null && message.length() > 0) {
System.err.println(message); System.err.println(message);
} }
System.err.println("Usage: java " + this.getClass().getName() + " \\"); System.err.println("Usage: java " + className + " \\");
System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
System.err.println(); System.err.println();
System.err.println("Options:"); System.err.println("Options:");
@ -1704,6 +1751,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" cycles How many times to cycle the test. Defaults: 1."); System.err.println(" cycles How many times to cycle the test. Defaults: 1.");
System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
System.err.println(" columns Columns to write per row. Default: 1");
System.err.println(" caching Scan caching to use. Default: 30");
System.err.println(); System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: "); System.err.println(" For example: ");
@ -1721,8 +1770,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" running: 1 <= value <= 500"); System.err.println(" running: 1 <= value <= 500");
System.err.println("Examples:"); System.err.println("Examples:");
System.err.println(" To run a single evaluation client:"); System.err.println(" To run a single evaluation client:");
System.err.println(" $ bin/hbase " + this.getClass().getName() System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
+ " sequentialWrite 1");
} }
/** /**
@ -1935,6 +1983,18 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue; continue;
} }
final String columns = "--columns=";
if (cmd.startsWith(columns)) {
opts.columns = Integer.parseInt(cmd.substring(columns.length()));
continue;
}
final String caching = "--caching=";
if (cmd.startsWith(caching)) {
opts.caching = Integer.parseInt(cmd.substring(caching.length()));
continue;
}
if (isCommandClass(cmd)) { if (isCommandClass(cmd)) {
opts.cmdName = cmd; opts.cmdName = cmd;
opts.numClientThreads = Integer.parseInt(args.remove()); opts.numClientThreads = Integer.parseInt(args.remove());
@ -1945,6 +2005,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
opts = calculateRowsAndSize(opts); opts = calculateRowsAndSize(opts);
break; break;
} else {
printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
} }
// Not matching any option or command. // Not matching any option or command.
@ -1970,7 +2032,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
} }
static int getRowsPerGB(final TestOptions opts) { static int getRowsPerGB(final TestOptions opts) {
return ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize); return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns());
} }
@Override @Override

View File

@ -125,6 +125,7 @@ public class TestHFileOutputFormat {
private int valLength; private int valLength;
private static final int VALLEN_DEFAULT=10; private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length"; private static final String VALLEN_CONF="randomkv.val.length";
private static final byte [] QUALIFIER = Bytes.toBytes("data");
@Override @Override
protected void setup(Context context) throws IOException, protected void setup(Context context) throws IOException,
@ -159,8 +160,7 @@ public class TestHFileOutputFormat {
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat.FAMILIES) { for (byte[] family : TestHFileOutputFormat.FAMILIES) {
KeyValue kv = new KeyValue(keyBytes, family, KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
context.write(key, kv); context.write(key, kv);
} }
} }
@ -878,7 +878,7 @@ public class TestHFileOutputFormat {
int taskId = context.getTaskAttemptID().getTaskID().getId(); int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
final byte [] qualifier = Bytes.toBytes("data");
Random random = new Random(); Random random = new Random();
for (int i = 0; i < numRows; i++) { for (int i = 0; i < numRows; i++) {
@ -887,8 +887,7 @@ public class TestHFileOutputFormat {
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : families) { for (byte[] family : families) {
KeyValue kv = new KeyValue(keyBytes, family, KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes);
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
writer.write(key, kv); writer.write(key, kv);
} }
} }

View File

@ -124,6 +124,7 @@ public class TestHFileOutputFormat2 {
private int valLength; private int valLength;
private static final int VALLEN_DEFAULT=10; private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length"; private static final String VALLEN_CONF="randomkv.val.length";
private static final byte [] QUALIFIER = Bytes.toBytes("data");
@Override @Override
protected void setup(Context context) throws IOException, protected void setup(Context context) throws IOException,
@ -159,8 +160,7 @@ public class TestHFileOutputFormat2 {
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat2.FAMILIES) { for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
Cell kv = new KeyValue(keyBytes, family, Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
context.write(key, kv); context.write(key, kv);
} }
} }
@ -879,7 +879,7 @@ public class TestHFileOutputFormat2 {
int taskId = context.getTaskAttemptID().getTaskID().getId(); int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
final byte [] qualifier = Bytes.toBytes("data");
Random random = new Random(); Random random = new Random();
for (int i = 0; i < numRows; i++) { for (int i = 0; i < numRows; i++) {
@ -888,8 +888,7 @@ public class TestHFileOutputFormat2 {
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : families) { for (byte[] family : families) {
Cell kv = new KeyValue(keyBytes, family, Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
writer.write(key, kv); writer.write(key, kv);
} }
} }