From cc73c03118df3686779eb9a73e815106576bdaf2 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 1 Jul 2016 03:41:17 -0700 Subject: [PATCH] HBASE-16108 RowCounter should support multiple key ranges (Konstantin Ryakhovskiy) --- .../hadoop/hbase/mapreduce/RowCounter.java | 74 +++++++++++---- .../hbase/mapreduce/TestRowCounter.java | 92 +++++++++++++++++-- 2 files changed, 143 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 720d4b16cf9..aca84fd1d67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.List; +import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,7 +33,9 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Counter; @@ -95,8 +99,7 @@ public class RowCounter extends Configured implements Tool { public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { String tableName = args[0]; - String startKey = null; - String endKey = null; + List rowRangeList = null; long startTime = 0; long endTime = 0; @@ -110,14 +113,11 @@ public class RowCounter extends Configured implements Tool { // First argument is table name, starting from second for (int i = 1; i < args.length; i++) { if (args[i].startsWith(rangeSwitch)) { - String[] startEnd = args[i].substring(rangeSwitch.length()).split(",", 2); - if (startEnd.length != 2 || startEnd[1].contains(",")) { - printUsage("Please specify range in such format as \"--range=a,b\" " + - "or, with only one boundary, \"--range=,b\" or \"--range=a,\""); + try { + rowRangeList = parseRowRangeParameter(args[i], rangeSwitch); + } catch (IllegalArgumentException e) { return null; } - startKey = startEnd[0]; - endKey = startEnd[1]; continue; } if (args[i].startsWith(startTimeArgKey)) { @@ -146,12 +146,7 @@ public class RowCounter extends Configured implements Tool { job.setJarByClass(RowCounter.class); Scan scan = new Scan(); scan.setCacheBlocks(false); - if (startKey != null && !startKey.equals("")) { - scan.setStartRow(Bytes.toBytesBinary(startKey)); - } - if (endKey != null && !endKey.equals("")) { - scan.setStopRow(Bytes.toBytesBinary(endKey)); - } + setScanFilter(scan, rowRangeList); if (sb.length() > 0) { for (String columnName : sb.toString().trim().split(" ")) { String family = StringUtils.substringBefore(columnName, ":"); @@ -165,7 +160,6 @@ public class RowCounter extends Configured implements Tool { } } } - scan.setFilter(new FirstKeyOnlyFilter()); scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, @@ -174,6 +168,54 @@ public class RowCounter extends Configured implements Tool { return job; } + private static List parseRowRangeParameter( + String arg, String rangeSwitch) { + final String[] ranges = arg.substring(rangeSwitch.length()).split(";"); + final List rangeList = new ArrayList<>(); + for (String range : ranges) { + String[] startEnd = range.split(",", 2); + if (startEnd.length != 2 || startEnd[1].contains(",")) { + printUsage("Please specify range in such format as \"--range=a,b\" " + + "or, with only one boundary, \"--range=,b\" or \"--range=a,\""); + throw new IllegalArgumentException("Wrong range specification: " + range); + } + String startKey = startEnd[0]; + String endKey = startEnd[1]; + rangeList.add(new MultiRowRangeFilter.RowRange( + Bytes.toBytesBinary(startKey), true, + Bytes.toBytesBinary(endKey), false)); + } + return rangeList; + } + + /** + * Sets filter {@link FilterBase} to the {@link Scan} instance. + * If provided rowRangeList contains more than one element, + * method sets filter which is instance of {@link MultiRowRangeFilter}. + * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. + * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan. + * @param scan + * @param rowRangeList + */ + private static void setScanFilter(Scan scan, List rowRangeList) { + final int size = rowRangeList == null ? 0 : rowRangeList.size(); + if (size <= 1) { + scan.setFilter(new FirstKeyOnlyFilter()); + } + if (size == 1) { + MultiRowRangeFilter.RowRange range = rowRangeList.get(0); + scan.setStartRow(range.getStartRow()); //inclusive + scan.setStopRow(range.getStopRow()); //exclusive + } else if (size > 1) { + try { + scan.setFilter(new MultiRowRangeFilter(rowRangeList)); + } catch (IOException e) { + //the IOException should never be thrown. see HBASE-16145 + throw new RuntimeException("Cannot instantiate MultiRowRangeFilter"); + } + } + } + /* * @param errorMessage Can attach a message when error occurs. */ @@ -189,7 +231,7 @@ public class RowCounter extends Configured implements Tool { private static void printUsage() { System.err.println("Usage: RowCounter [options] " + "[--starttime=[start] --endtime=[end] " + - "[--range=[startKey],[endKey]] [ ...]"); + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [ ...]"); System.err.println("For performance consider the following options:\n" + "-Dhbase.client.scanner.caching=100\n" + "-Dmapreduce.map.speculative=false"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java index 4a719dcd02e..cd831998b8f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -72,7 +72,7 @@ public class TestRowCounter { public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(); Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM)); - writeRows(table); + writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL); table.close(); } @@ -154,7 +154,82 @@ public class TestRowCounter { runRowCount(args, 8); } - /** + /** + * Test a case when a range is specified with single range of start-end keys + * @throws Exception + */ + @Test + public void testRowCounterRowSingleRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3" + }; + runRowCount(args, 2); + } + + /** + * Test a case when a range is specified with single range with end key only + * @throws Exception + */ + @Test + public void testRowCounterRowSingleRangeUpperBound() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=,\\x00row3" + }; + runRowCount(args, 3); + } + + /** + * Test a case when a range is specified with two ranges where one range is with end key only + * @throws Exception + */ + @Test + public void testRowCounterRowMultiRangeUpperBound() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7" + }; + runRowCount(args, 5); + } + + /** + * Test a case when a range is specified with multiple ranges of start-end keys + * @throws Exception + */ + @Test + public void testRowCounterRowMultiRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8" + }; + runRowCount(args, 5); + } + + /** + * Test a case when a range is specified with multiple ranges of start-end keys; + * one range is filled, another two are not + * @throws Exception + */ + @Test + public void testRowCounterRowMultiEmptyRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3;;" + }; + runRowCount(args, 2); + } + + @Test + public void testRowCounter10kRowRange() throws Exception { + String tableName = TABLE_NAME + "10k"; + + try (Table table = TEST_UTIL.createTable( + TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) { + writeRows(table, 10000, 0); + } + String[] args = new String[] { + tableName, "--range=\\x00row9872,\\x00row9875" + }; + runRowCount(args, 3); + } + + /** * Test a case when the timerange is specified with --starttime and --endtime options * * @throws Exception @@ -222,7 +297,10 @@ public class TestRowCounter { */ private void runRowCount(String[] args, int expectedCount) throws Exception { Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args); + long start = System.currentTimeMillis(); job.waitForCompletion(true); + long duration = System.currentTimeMillis() - start; + LOG.debug("row count duration (ms): " + duration); assertTrue(job.isSuccessful()); Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS); assertEquals(expectedCount, counter.getValue()); @@ -235,7 +313,7 @@ public class TestRowCounter { * @param table * @throws IOException */ - private static void writeRows(Table table) throws IOException { + private static void writeRows(Table table, int totalRows, int rowsWithOneCol) throws IOException { final byte[] family = Bytes.toBytes(COL_FAM); final byte[] value = Bytes.toBytes("abcd"); final byte[] col1 = Bytes.toBytes(COL1); @@ -244,7 +322,7 @@ public class TestRowCounter { ArrayList rowsUpdate = new ArrayList(); // write few rows with two columns int i = 0; - for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) { + for (; i < totalRows - rowsWithOneCol; i++) { // Use binary rows values to test for HBASE-15287. byte[] row = Bytes.toBytesBinary("\\x00row" + i); Put put = new Put(row); @@ -255,7 +333,7 @@ public class TestRowCounter { } // write few rows with only one column - for (; i < TOTAL_ROWS; i++) { + for (; i < totalRows; i++) { byte[] row = Bytes.toBytes("row" + i); Put put = new Put(row); put.addColumn(family, col2, value); @@ -288,7 +366,7 @@ public class TestRowCounter { assertTrue(data.toString().contains( "Usage: RowCounter [options] " + "[--starttime=[start] --endtime=[end] " + - "[--range=[startKey],[endKey]] " + + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] " + "[ ...]")); assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100")); assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false")); @@ -308,7 +386,7 @@ public class TestRowCounter { assertTrue(data.toString().contains( "Usage: RowCounter [options] " + "[--starttime=[start] --endtime=[end] " + - "[--range=[startKey],[endKey]] " + + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] " + "[ ...]")); }