From a345aa8707e86405751fda7caa08990aa0842e23 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 1 Jul 2016 09:32:43 -0700 Subject: [PATCH] HBASE-16108 RowCounter should support multiple key ranges (Konstantin Ryakhovskiy) --- .../hadoop/hbase/mapreduce/RowCounter.java | 75 +++++++-- .../hbase/mapreduce/TestRowCounter.java | 156 +++++++++++++++--- 2 files changed, 191 insertions(+), 40 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index f278a698dba..c39b1431ed1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.List; +import java.util.ArrayList; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.HConstants; @@ -28,7 +30,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; @@ -85,8 +89,7 @@ public class RowCounter { public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { String tableName = args[0]; - String startKey = null; - String endKey = null; + List rowRangeList = null; long startTime = 0; long endTime = 0; @@ -99,14 +102,12 @@ public class RowCounter { // First argument is table name, starting from second for (int i = 1; i < args.length; i++) { if (args[i].startsWith(rangeSwitch)) { - String[] startEnd = args[i].substring(rangeSwitch.length()).split(",", 2); - if (startEnd.length != 2 || startEnd[1].contains(",")) { - printUsage("Please specify range in such format as \"--range=a,b\" " + - "or, with only one boundary, \"--range=,b\" or \"--range=a,\""); + try { + rowRangeList = parseRowRangeParameter(args[i], rangeSwitch); + } catch (IllegalArgumentException e) { return null; } - startKey = startEnd[0]; - endKey = startEnd[1]; + continue; } if (startTime < endTime) { printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); @@ -131,12 +132,7 @@ public class RowCounter { job.setJarByClass(RowCounter.class); Scan scan = new Scan(); scan.setCacheBlocks(false); - if (startKey != null && !startKey.equals("")) { - scan.setStartRow(Bytes.toBytesBinary(startKey)); - } - if (endKey != null && !endKey.equals("")) { - scan.setStopRow(Bytes.toBytesBinary(endKey)); - } + setScanFilter(scan, rowRangeList); if (sb.length() > 0) { for (String columnName : sb.toString().trim().split(" ")) { String family = StringUtils.substringBefore(columnName, ":"); @@ -150,7 +146,6 @@ public class RowCounter { } } } - scan.setFilter(new FirstKeyOnlyFilter()); scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, @@ -159,6 +154,54 @@ public class RowCounter { return job; } + private static List parseRowRangeParameter( + String arg, String rangeSwitch) { + final String[] ranges = arg.substring(rangeSwitch.length()).split(";"); + final List rangeList = new ArrayList<>(); + for (String range : ranges) { + String[] startEnd = range.split(",", 2); + if (startEnd.length != 2 || startEnd[1].contains(",")) { + printUsage("Please specify range in such format as \"--range=a,b\" " + + "or, with only one boundary, \"--range=,b\" or \"--range=a,\""); + throw new IllegalArgumentException("Wrong range specification: " + range); + } + String startKey = startEnd[0]; + String endKey = startEnd[1]; + rangeList.add(new MultiRowRangeFilter.RowRange( + Bytes.toBytesBinary(startKey), true, + Bytes.toBytesBinary(endKey), false)); + } + return rangeList; + } + + /** + * Sets filter {@link FilterBase} to the {@link Scan} instance. + * If provided rowRangeList contains more than one element, + * method sets filter which is instance of {@link MultiRowRangeFilter}. + * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. + * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan. + * @param scan + * @param rowRangeList + */ + private static void setScanFilter(Scan scan, List rowRangeList) { + final int size = rowRangeList == null ? 0 : rowRangeList.size(); + if (size <= 1) { + scan.setFilter(new FirstKeyOnlyFilter()); + } + if (size == 1) { + MultiRowRangeFilter.RowRange range = rowRangeList.get(0); + scan.setStartRow(range.getStartRow()); //inclusive + scan.setStopRow(range.getStopRow()); //exclusive + } else if (size > 1) { + try { + scan.setFilter(new MultiRowRangeFilter(rowRangeList)); + } catch (IOException e) { + //the IOException should never be thrown. see HBASE-16145 + throw new RuntimeException("Cannot instantiate MultiRowRangeFilter"); + } + } + } + /* * @param errorMessage Can attach a message when error occurs. */ @@ -173,7 +216,7 @@ public class RowCounter { private static void printUsage() { System.err.println("Usage: RowCounter [options] " + "[--starttime=[start] --endtime=[end] " + - "[--range=[startKey],[endKey]] [ ...]"); + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [ ...]"); System.err.println("For performance consider the following options:\n" + "-Dhbase.client.scanner.caching=100\n" + "-Dmapreduce.map.speculative=false"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java index 0e04c673003..f0c4c7bebee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -83,7 +83,7 @@ public class TestRowCounter { TEST_UTIL.setJobWithoutMRCluster(); TEST_UTIL.startMiniCluster(); Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM)); - writeRows(table); + writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL); table.close(); } @@ -147,6 +147,97 @@ public class TestRowCounter { runRowCount(args, 10); } + /** + * Test a case when a range is specified with single range of start-end keys + * @throws Exception + */ + @Test + public void testRowCounterRowSingleRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=row1,row3" + }; + runRowCount(args, 2); + } + + /** + * Test a case when a range is specified with single range with end key only + * @throws Exception + */ + @Test + public void testRowCounterRowSingleRangeUpperBound() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=,row3" + }; + runRowCount(args, 3); + } + + /** + * Test a case when a range is specified with two ranges where one range is with end key only + * @throws Exception + */ + @Test + public void testRowCounterRowMultiRangeUpperBound() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=,row3;row5,row7" + }; + runRowCount(args, 5); + } + /** + * Test a case when a range is specified with multiple ranges of start-end keys + * @throws Exception + */ + @Test + public void testRowCounterRowMultiRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=row1,row3;row5,row8" + }; + runRowCount(args, 5); + } + + /** + * Test a case when a range is specified with multiple ranges of start-end keys; + * one range is filled, another two are not + * @throws Exception + */ + @Test + public void testRowCounterRowMultiEmptyRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=row1,row3;;" + }; + runRowCount(args, 2); + } + + @Test + public void testRowCounter10kRowRange() throws Exception { + String tableName = TABLE_NAME + "10k"; + + try (Table table = TEST_UTIL.createTable( + TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) { + writeRows(table, 10000, 0); + } + String[] args = new String[] { + tableName, "--range=row9872,row9875" + }; + runRowCount(args, 3); + } + + /** + * test case for HBASE-15287 + * @throws Exception + */ + @Test + public void testRowCounterRowRangeBinary() throws Exception { + String tableName = TABLE_NAME + "Binary"; + try (Table table = TEST_UTIL.createTable( + TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) { + writeRows(table, 10, 0, true); + } + String[] args = new String[] { + tableName, "--range=\\x00row5,\\x00row8" + }; + runRowCount(args, 3); + } + /** * Test a case when the timerange is specified with --starttime and --endtime options * @@ -155,6 +246,7 @@ public class TestRowCounter { @Test public void testRowCounterTimeRange() throws Exception { + final String tableName = TABLE_NAME + "TimeRange"; final byte[] family = Bytes.toBytes(COL_FAM); final byte[] col1 = Bytes.toBytes(COL1); Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1)); @@ -164,32 +256,35 @@ public class TestRowCounter { long ts; // clean up content of TABLE_NAME - HTable table = TEST_UTIL.deleteTableData(TableName.valueOf(TABLE_NAME)); - ts = System.currentTimeMillis(); - put1.add(family, col1, ts, Bytes.toBytes("val1")); - table.put(put1); - Thread.sleep(100); - ts = System.currentTimeMillis(); - put2.add(family, col1, ts, Bytes.toBytes("val2")); - put3.add(family, col1, ts, Bytes.toBytes("val3")); - table.put(put2); - table.put(put3); - table.close(); + try (Table table = TEST_UTIL.createTable( + TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) { - String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0, + ts = System.currentTimeMillis(); + put1.add(family, col1, ts, Bytes.toBytes("val1")); + table.put(put1); + Thread.sleep(100); + + ts = System.currentTimeMillis(); + put2.add(family, col1, ts, Bytes.toBytes("val2")); + put3.add(family, col1, ts, Bytes.toBytes("val3")); + table.put(put2); + table.put(put3); + } + + String[] args = new String[] {tableName, COL_FAM + ":" + COL1, "--starttime=" + 0, "--endtime=" + ts}; runRowCount(args, 1); - args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0, + args = new String[] {tableName, COL_FAM + ":" + COL1, "--starttime=" + 0, "--endtime=" + (ts - 10)}; runRowCount(args, 1); - args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + ts, + args = new String[] {tableName, COL_FAM + ":" + COL1, "--starttime=" + ts, "--endtime=" + (ts + 1000)}; runRowCount(args, 2); - args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000), + args = new String[] {tableName, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000), "--endtime=" + (ts + 30 * 1000),}; runRowCount(args, 3); } @@ -207,21 +302,32 @@ public class TestRowCounter { Configuration conf = opts.getConfiguration(); args = opts.getRemainingArgs(); Job job = RowCounter.createSubmittableJob(conf, args); + long start = System.currentTimeMillis(); job.waitForCompletion(true); + long duration = System.currentTimeMillis() - start; + LOG.debug("row count duration (ms): " + duration); assertTrue(job.isSuccessful()); Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); assertEquals(expectedCount, counter.getValue()); } + private static void writeRows(Table table, int totalRows, int rowsWithOneCol) + throws IOException { + writeRows(table, totalRows, rowsWithOneCol, false); + } + /** * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have * two columns, Few have one. - * * @param table + * @param totalRows total number of rows to be added to the table + * @param rowsWithOneCol number of rows with one column to be added to the table + * @param writeBinary whether row prefix has to have \x00 in the beginning * @throws IOException */ - private static void writeRows(Table table) + private static void writeRows(Table table, int totalRows, int rowsWithOneCol, boolean writeBinary) throws IOException { + final String rowPrefix = writeBinary ? "\\x00row" : "row"; final byte[] family = Bytes.toBytes(COL_FAM); final byte[] value = Bytes.toBytes("abcd"); final byte[] col1 = Bytes.toBytes(COL1); @@ -230,8 +336,8 @@ public class TestRowCounter { ArrayList rowsUpdate = new ArrayList(); // write few rows with two columns int i = 0; - for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) { - byte[] row = Bytes.toBytes("row" + i); + for (; i < totalRows - rowsWithOneCol; i++) { + byte[] row = Bytes.toBytesBinary(rowPrefix + i); Put put = new Put(row); put.add(family, col1, value); put.add(family, col2, value); @@ -240,8 +346,8 @@ public class TestRowCounter { } // write few rows with only one column - for (; i < TOTAL_ROWS; i++) { - byte[] row = Bytes.toBytes("row" + i); + for (; i < totalRows; i++) { + byte[] row = Bytes.toBytesBinary(rowPrefix + i); Put put = new Put(row); put.add(family, col2, value); rowsUpdate.add(put); @@ -273,7 +379,8 @@ public class TestRowCounter { assertTrue(data.toString().contains("Wrong number of parameters:")); assertTrue(data.toString().contains("Usage: RowCounter [options] " + "[--starttime=[start] --endtime=[end] " + - "[--range=[startKey],[endKey]] " + + "[--range=[startKey],[endKey]" + + "[;[startKey],[endKey]...]] " + "[ ...]")); assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100")); assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false")); @@ -292,7 +399,8 @@ public class TestRowCounter { " \"--range=,b\" or \"--range=a,\"")); assertTrue(data.toString().contains("Usage: RowCounter [options] " + "[--starttime=[start] --endtime=[end] " + - "[--range=[startKey],[endKey]] " + + "[--range=[startKey],[endKey]" + + "[;[startKey],[endKey]...]] " + "[ ...]")); }