From 66f7bf461532423d92be4e57a26edc2e13a7dbad Mon Sep 17 00:00:00 2001 From: stack Date: Thu, 9 Apr 2015 16:49:16 -0700 Subject: [PATCH] HBASE-13118 [PE] Add being able to write many columns Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java --- .../hadoop/hbase/PerformanceEvaluation.java | 152 ++++++++++++------ .../mapreduce/TestHFileOutputFormat.java | 9 +- .../mapreduce/TestHFileOutputFormat2.java | 9 +- 3 files changed, 115 insertions(+), 55 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index d51f6987df8..eb2f46885b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -124,7 +124,8 @@ public class PerformanceEvaluation extends Configured implements Tool { public static final String TABLE_NAME = "TestTable"; public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); - public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); + public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0); + public static final byte [] QUALIFIER_NAME = COLUMN_ZERO; public static final int DEFAULT_VALUE_LENGTH = 1000; public static final int ROW_LENGTH = 26; @@ -610,6 +611,8 @@ public class PerformanceEvaluation extends Configured implements Tool { int valueSize = DEFAULT_VALUE_LENGTH; int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10; int cycles = 1; + int columns = 1; + int caching = 30; boolean addColumns = true; public TestOptions() {} @@ -653,6 +656,24 @@ public class PerformanceEvaluation extends Configured implements Tool { this.randomSleep = that.randomSleep; this.measureAfter = that.measureAfter; this.addColumns = that.addColumns; + this.columns = that.columns; + this.caching = that.caching; + } + + public int getCaching() { + return this.caching; + } + + public void setCaching(final int caching) { + this.caching = caching; + } + + public int getColumns() { + return this.columns; + } + + public void setColumns(final int columns) { + this.columns = columns; } public int getCycles() { @@ -1157,7 +1178,7 @@ public class PerformanceEvaluation extends Configured implements Tool { void onStartup() throws IOException { this.table = connection.getTable(TableName.valueOf(opts.tableName)); } - + @Override void onTakedown() throws IOException { table.close(); @@ -1175,7 +1196,7 @@ public class PerformanceEvaluation extends Configured implements Tool { void onStartup() throws IOException { this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); } - + @Override void onTakedown() throws IOException { mutator.close(); @@ -1190,9 +1211,12 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows)); + scan.setCaching(opts.caching); FilterList list = new FilterList(); if (opts.addColumns) { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + } else { + scan.addFamily(FAMILY_NAME); } if (opts.filterAll) { list.addFilter(new FilterAllFilter()); @@ -1223,11 +1247,14 @@ public class PerformanceEvaluation extends Configured implements Tool { void testRow(final int i) throws IOException { Pair startAndStopRow = getStartAndStopRow(); Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); + scan.setCaching(opts.caching); if (opts.filterAll) { scan.setFilter(new FilterAllFilter()); } if (opts.addColumns) { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + } else { + scan.addFamily(FAMILY_NAME); } Result r = null; int count = 0; @@ -1326,6 +1353,8 @@ public class PerformanceEvaluation extends Configured implements Tool { Get get = new Get(getRandomRow(this.rand, opts.totalRows)); if (opts.addColumns) { get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + } else { + get.addFamily(FAMILY_NAME); } if (opts.filterAll) { get.setFilter(new FilterAllFilter()); @@ -1369,21 +1398,24 @@ public class PerformanceEvaluation extends Configured implements Tool { void testRow(final int i) throws IOException { byte[] row = getRandomRow(this.rand, opts.totalRows); Put put = new Put(row); - byte[] value = generateData(this.rand, getValueLength(this.rand)); - if (opts.useTags) { - byte[] tag = generateData(this.rand, TAG_LENGTH); - Tag[] tags = new Tag[opts.noOfTags]; - for (int n = 0; n < opts.noOfTags; n++) { - Tag t = new Tag((byte) n, tag); - tags[n] = t; + for (int column = 0; column < opts.columns; column++) { + byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); + byte[] value = generateData(this.rand, getValueLength(this.rand)); + if (opts.useTags) { + byte[] tag = generateData(this.rand, TAG_LENGTH); + Tag[] tags = new Tag[opts.noOfTags]; + for (int n = 0; n < opts.noOfTags; n++) { + Tag t = new Tag((byte) n, tag); + tags[n] = t; + } + KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, + value, tags); + put.add(kv); + updateValueSize(kv.getValueLength()); + } else { + put.add(FAMILY_NAME, qualifier, value); + updateValueSize(value.length); } - KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, - value, tags); - put.add(kv); - updateValueSize(kv.getValueLength()); - } else { - put.add(FAMILY_NAME, QUALIFIER_NAME, value); - updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); mutator.mutate(put); @@ -1410,9 +1442,11 @@ public class PerformanceEvaluation extends Configured implements Tool { void testRow(final int i) throws IOException { if (this.testScanner == null) { Scan scan = new Scan(format(opts.startRow)); - scan.setCaching(30); + scan.setCaching(opts.caching); if (opts.addColumns) { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + } else { + scan.addFamily(FAMILY_NAME); } if (opts.filterAll) { scan.setFilter(new FilterAllFilter()); @@ -1452,21 +1486,24 @@ public class PerformanceEvaluation extends Configured implements Tool { void testRow(final int i) throws IOException { byte[] row = format(i); Put put = new Put(row); - byte[] value = generateData(this.rand, getValueLength(this.rand)); - if (opts.useTags) { - byte[] tag = generateData(this.rand, TAG_LENGTH); - Tag[] tags = new Tag[opts.noOfTags]; - for (int n = 0; n < opts.noOfTags; n++) { - Tag t = new Tag((byte) n, tag); - tags[n] = t; + for (int column = 0; column < opts.columns; column++) { + byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); + byte[] value = generateData(this.rand, getValueLength(this.rand)); + if (opts.useTags) { + byte[] tag = generateData(this.rand, TAG_LENGTH); + Tag[] tags = new Tag[opts.noOfTags]; + for (int n = 0; n < opts.noOfTags; n++) { + Tag t = new Tag((byte) n, tag); + tags[n] = t; + } + KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, + value, tags); + put.add(kv); + updateValueSize(kv.getValueLength()); + } else { + put.add(FAMILY_NAME, qualifier, value); + updateValueSize(value.length); } - KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, - value, tags); - put.add(kv); - updateValueSize(kv.getValueLength()); - } else { - put.add(FAMILY_NAME, QUALIFIER_NAME, value); - updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); mutator.mutate(put); @@ -1498,7 +1535,7 @@ public class PerformanceEvaluation extends Configured implements Tool { protected Scan constructScan(byte[] valuePrefix) throws IOException { FilterList list = new FilterList(); Filter filter = new SingleColumnValueFilter( - FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL, + FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL, new BinaryComparator(valuePrefix) ); list.addFilter(filter); @@ -1506,8 +1543,11 @@ public class PerformanceEvaluation extends Configured implements Tool { list.addFilter(new FilterAllFilter()); } Scan scan = new Scan(); + scan.setCaching(opts.caching); if (opts.addColumns) { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + } else { + scan.addFamily(FAMILY_NAME); } scan.setFilter(list); return scan; @@ -1520,11 +1560,9 @@ public class PerformanceEvaluation extends Configured implements Tool { * @param timeMs Time taken in milliseconds. * @return String value with label, ie '123.76 MB/s' */ - private static String calculateMbps(int rows, long timeMs, final int valueSize) { - // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS) - // * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB - BigDecimal rowSize = - BigDecimal.valueOf(ROW_LENGTH + valueSize + FAMILY_NAME.length + QUALIFIER_NAME.length); + private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) { + BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH + + ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns)); BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) .divide(BYTES_PER_MB, CXT); @@ -1613,7 +1651,7 @@ public class PerformanceEvaluation extends Configured implements Tool { status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime, - getAverageValueLength(opts)) + ")"); + getAverageValueLength(opts), opts.columns) + ")"); return new RunResult(totalElapsedTime, t.getLatency()); } @@ -1645,14 +1683,23 @@ public class PerformanceEvaluation extends Configured implements Tool { } protected void printUsage() { - printUsage(null); + printUsage(this.getClass().getName(), null); } - protected void printUsage(final String message) { + protected static void printUsage(final String message) { + printUsage(PerformanceEvaluation.class.getName(), message); + } + + protected static void printUsageAndExit(final String message, final int exitCode) { + printUsage(message); + System.exit(exitCode); + } + + protected static void printUsage(final String className, final String message) { if (message != null && message.length() > 0) { System.err.println(message); } - System.err.println("Usage: java " + this.getClass().getName() + " \\"); + System.err.println("Usage: java " + className + " \\"); System.err.println(" [-D]* "); System.err.println(); System.err.println("Options:"); @@ -1704,6 +1751,8 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" cycles How many times to cycle the test. Defaults: 1."); System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); + System.err.println(" columns Columns to write per row. Default: 1"); + System.err.println(" caching Scan caching to use. Default: 30"); System.err.println(); System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" For example: "); @@ -1721,8 +1770,7 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" running: 1 <= value <= 500"); System.err.println("Examples:"); System.err.println(" To run a single evaluation client:"); - System.err.println(" $ bin/hbase " + this.getClass().getName() - + " sequentialWrite 1"); + System.err.println(" $ bin/hbase " + className + " sequentialWrite 1"); } /** @@ -1935,6 +1983,18 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } + final String columns = "--columns="; + if (cmd.startsWith(columns)) { + opts.columns = Integer.parseInt(cmd.substring(columns.length())); + continue; + } + + final String caching = "--caching="; + if (cmd.startsWith(caching)) { + opts.caching = Integer.parseInt(cmd.substring(caching.length())); + continue; + } + if (isCommandClass(cmd)) { opts.cmdName = cmd; opts.numClientThreads = Integer.parseInt(args.remove()); @@ -1945,6 +2005,8 @@ public class PerformanceEvaluation extends Configured implements Tool { } opts = calculateRowsAndSize(opts); break; + } else { + printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); } // Not matching any option or command. @@ -1970,7 +2032,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } static int getRowsPerGB(final TestOptions opts) { - return ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize); + return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns()); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index ecea98e4bb4..438266e2261 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -125,6 +125,7 @@ public class TestHFileOutputFormat { private int valLength; private static final int VALLEN_DEFAULT=10; private static final String VALLEN_CONF="randomkv.val.length"; + private static final byte [] QUALIFIER = Bytes.toBytes("data"); @Override protected void setup(Context context) throws IOException, @@ -159,8 +160,7 @@ public class TestHFileOutputFormat { ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); for (byte[] family : TestHFileOutputFormat.FAMILIES) { - KeyValue kv = new KeyValue(keyBytes, family, - PerformanceEvaluation.QUALIFIER_NAME, valBytes); + KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); context.write(key, kv); } } @@ -878,7 +878,7 @@ public class TestHFileOutputFormat { int taskId = context.getTaskAttemptID().getTaskID().getId(); assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - + final byte [] qualifier = Bytes.toBytes("data"); Random random = new Random(); for (int i = 0; i < numRows; i++) { @@ -887,8 +887,7 @@ public class TestHFileOutputFormat { ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); for (byte[] family : families) { - KeyValue kv = new KeyValue(keyBytes, family, - PerformanceEvaluation.QUALIFIER_NAME, valBytes); + KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes); writer.write(key, kv); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 0f60f3b61d2..67a6c0a6b85 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -124,6 +124,7 @@ public class TestHFileOutputFormat2 { private int valLength; private static final int VALLEN_DEFAULT=10; private static final String VALLEN_CONF="randomkv.val.length"; + private static final byte [] QUALIFIER = Bytes.toBytes("data"); @Override protected void setup(Context context) throws IOException, @@ -159,8 +160,7 @@ public class TestHFileOutputFormat2 { ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); for (byte[] family : TestHFileOutputFormat2.FAMILIES) { - Cell kv = new KeyValue(keyBytes, family, - PerformanceEvaluation.QUALIFIER_NAME, valBytes); + Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); context.write(key, kv); } } @@ -879,7 +879,7 @@ public class TestHFileOutputFormat2 { int taskId = context.getTaskAttemptID().getTaskID().getId(); assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - + final byte [] qualifier = Bytes.toBytes("data"); Random random = new Random(); for (int i = 0; i < numRows; i++) { @@ -888,8 +888,7 @@ public class TestHFileOutputFormat2 { ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); for (byte[] family : families) { - Cell kv = new KeyValue(keyBytes, family, - PerformanceEvaluation.QUALIFIER_NAME, valBytes); + Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); writer.write(key, kv); } }