HBASE-16108 RowCounter should support multiple key ranges (Konstantin Ryakhovskiy)
This commit is contained in:
parent
139f0ed53c
commit
cc73c03118
|
@ -19,6 +19,8 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -31,7 +33,9 @@ import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||||
|
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
@ -95,8 +99,7 @@ public class RowCounter extends Configured implements Tool {
|
||||||
public static Job createSubmittableJob(Configuration conf, String[] args)
|
public static Job createSubmittableJob(Configuration conf, String[] args)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String tableName = args[0];
|
String tableName = args[0];
|
||||||
String startKey = null;
|
List<MultiRowRangeFilter.RowRange> rowRangeList = null;
|
||||||
String endKey = null;
|
|
||||||
long startTime = 0;
|
long startTime = 0;
|
||||||
long endTime = 0;
|
long endTime = 0;
|
||||||
|
|
||||||
|
@ -110,14 +113,11 @@ public class RowCounter extends Configured implements Tool {
|
||||||
// First argument is table name, starting from second
|
// First argument is table name, starting from second
|
||||||
for (int i = 1; i < args.length; i++) {
|
for (int i = 1; i < args.length; i++) {
|
||||||
if (args[i].startsWith(rangeSwitch)) {
|
if (args[i].startsWith(rangeSwitch)) {
|
||||||
String[] startEnd = args[i].substring(rangeSwitch.length()).split(",", 2);
|
try {
|
||||||
if (startEnd.length != 2 || startEnd[1].contains(",")) {
|
rowRangeList = parseRowRangeParameter(args[i], rangeSwitch);
|
||||||
printUsage("Please specify range in such format as \"--range=a,b\" " +
|
} catch (IllegalArgumentException e) {
|
||||||
"or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
startKey = startEnd[0];
|
|
||||||
endKey = startEnd[1];
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (args[i].startsWith(startTimeArgKey)) {
|
if (args[i].startsWith(startTimeArgKey)) {
|
||||||
|
@ -146,12 +146,7 @@ public class RowCounter extends Configured implements Tool {
|
||||||
job.setJarByClass(RowCounter.class);
|
job.setJarByClass(RowCounter.class);
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setCacheBlocks(false);
|
scan.setCacheBlocks(false);
|
||||||
if (startKey != null && !startKey.equals("")) {
|
setScanFilter(scan, rowRangeList);
|
||||||
scan.setStartRow(Bytes.toBytesBinary(startKey));
|
|
||||||
}
|
|
||||||
if (endKey != null && !endKey.equals("")) {
|
|
||||||
scan.setStopRow(Bytes.toBytesBinary(endKey));
|
|
||||||
}
|
|
||||||
if (sb.length() > 0) {
|
if (sb.length() > 0) {
|
||||||
for (String columnName : sb.toString().trim().split(" ")) {
|
for (String columnName : sb.toString().trim().split(" ")) {
|
||||||
String family = StringUtils.substringBefore(columnName, ":");
|
String family = StringUtils.substringBefore(columnName, ":");
|
||||||
|
@ -165,7 +160,6 @@ public class RowCounter extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
scan.setFilter(new FirstKeyOnlyFilter());
|
|
||||||
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
|
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
|
||||||
job.setOutputFormatClass(NullOutputFormat.class);
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
||||||
|
@ -174,6 +168,54 @@ public class RowCounter extends Configured implements Tool {
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(
|
||||||
|
String arg, String rangeSwitch) {
|
||||||
|
final String[] ranges = arg.substring(rangeSwitch.length()).split(";");
|
||||||
|
final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
|
||||||
|
for (String range : ranges) {
|
||||||
|
String[] startEnd = range.split(",", 2);
|
||||||
|
if (startEnd.length != 2 || startEnd[1].contains(",")) {
|
||||||
|
printUsage("Please specify range in such format as \"--range=a,b\" " +
|
||||||
|
"or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
|
||||||
|
throw new IllegalArgumentException("Wrong range specification: " + range);
|
||||||
|
}
|
||||||
|
String startKey = startEnd[0];
|
||||||
|
String endKey = startEnd[1];
|
||||||
|
rangeList.add(new MultiRowRangeFilter.RowRange(
|
||||||
|
Bytes.toBytesBinary(startKey), true,
|
||||||
|
Bytes.toBytesBinary(endKey), false));
|
||||||
|
}
|
||||||
|
return rangeList;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets filter {@link FilterBase} to the {@link Scan} instance.
|
||||||
|
* If provided rowRangeList contains more than one element,
|
||||||
|
* method sets filter which is instance of {@link MultiRowRangeFilter}.
|
||||||
|
* Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}.
|
||||||
|
* If rowRangeList contains exactly one element, startRow and stopRow are set to the scan.
|
||||||
|
* @param scan
|
||||||
|
* @param rowRangeList
|
||||||
|
*/
|
||||||
|
private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
|
||||||
|
final int size = rowRangeList == null ? 0 : rowRangeList.size();
|
||||||
|
if (size <= 1) {
|
||||||
|
scan.setFilter(new FirstKeyOnlyFilter());
|
||||||
|
}
|
||||||
|
if (size == 1) {
|
||||||
|
MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
|
||||||
|
scan.setStartRow(range.getStartRow()); //inclusive
|
||||||
|
scan.setStopRow(range.getStopRow()); //exclusive
|
||||||
|
} else if (size > 1) {
|
||||||
|
try {
|
||||||
|
scan.setFilter(new MultiRowRangeFilter(rowRangeList));
|
||||||
|
} catch (IOException e) {
|
||||||
|
//the IOException should never be thrown. see HBASE-16145
|
||||||
|
throw new RuntimeException("Cannot instantiate MultiRowRangeFilter");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @param errorMessage Can attach a message when error occurs.
|
* @param errorMessage Can attach a message when error occurs.
|
||||||
*/
|
*/
|
||||||
|
@ -189,7 +231,7 @@ public class RowCounter extends Configured implements Tool {
|
||||||
private static void printUsage() {
|
private static void printUsage() {
|
||||||
System.err.println("Usage: RowCounter [options] <tablename> " +
|
System.err.println("Usage: RowCounter [options] <tablename> " +
|
||||||
"[--starttime=[start] --endtime=[end] " +
|
"[--starttime=[start] --endtime=[end] " +
|
||||||
"[--range=[startKey],[endKey]] [<column1> <column2>...]");
|
"[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
|
||||||
System.err.println("For performance consider the following options:\n"
|
System.err.println("For performance consider the following options:\n"
|
||||||
+ "-Dhbase.client.scanner.caching=100\n"
|
+ "-Dhbase.client.scanner.caching=100\n"
|
||||||
+ "-Dmapreduce.map.speculative=false");
|
+ "-Dmapreduce.map.speculative=false");
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class TestRowCounter {
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
TEST_UTIL.startMiniCluster();
|
TEST_UTIL.startMiniCluster();
|
||||||
Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
|
Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
|
||||||
writeRows(table);
|
writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL);
|
||||||
table.close();
|
table.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,7 +154,82 @@ public class TestRowCounter {
|
||||||
runRowCount(args, 8);
|
runRowCount(args, 8);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Test a case when a range is specified with single range of start-end keys
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRowCounterRowSingleRange() throws Exception {
|
||||||
|
String[] args = new String[] {
|
||||||
|
TABLE_NAME, "--range=\\x00row1,\\x00row3"
|
||||||
|
};
|
||||||
|
runRowCount(args, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case when a range is specified with single range with end key only
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRowCounterRowSingleRangeUpperBound() throws Exception {
|
||||||
|
String[] args = new String[] {
|
||||||
|
TABLE_NAME, "--range=,\\x00row3"
|
||||||
|
};
|
||||||
|
runRowCount(args, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case when a range is specified with two ranges where one range is with end key only
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRowCounterRowMultiRangeUpperBound() throws Exception {
|
||||||
|
String[] args = new String[] {
|
||||||
|
TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7"
|
||||||
|
};
|
||||||
|
runRowCount(args, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case when a range is specified with multiple ranges of start-end keys
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRowCounterRowMultiRange() throws Exception {
|
||||||
|
String[] args = new String[] {
|
||||||
|
TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
|
||||||
|
};
|
||||||
|
runRowCount(args, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a case when a range is specified with multiple ranges of start-end keys;
|
||||||
|
* one range is filled, another two are not
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRowCounterRowMultiEmptyRange() throws Exception {
|
||||||
|
String[] args = new String[] {
|
||||||
|
TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
|
||||||
|
};
|
||||||
|
runRowCount(args, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRowCounter10kRowRange() throws Exception {
|
||||||
|
String tableName = TABLE_NAME + "10k";
|
||||||
|
|
||||||
|
try (Table table = TEST_UTIL.createTable(
|
||||||
|
TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
|
||||||
|
writeRows(table, 10000, 0);
|
||||||
|
}
|
||||||
|
String[] args = new String[] {
|
||||||
|
tableName, "--range=\\x00row9872,\\x00row9875"
|
||||||
|
};
|
||||||
|
runRowCount(args, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
* Test a case when the timerange is specified with --starttime and --endtime options
|
* Test a case when the timerange is specified with --starttime and --endtime options
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
|
@ -222,7 +297,10 @@ public class TestRowCounter {
|
||||||
*/
|
*/
|
||||||
private void runRowCount(String[] args, int expectedCount) throws Exception {
|
private void runRowCount(String[] args, int expectedCount) throws Exception {
|
||||||
Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
|
Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
job.waitForCompletion(true);
|
job.waitForCompletion(true);
|
||||||
|
long duration = System.currentTimeMillis() - start;
|
||||||
|
LOG.debug("row count duration (ms): " + duration);
|
||||||
assertTrue(job.isSuccessful());
|
assertTrue(job.isSuccessful());
|
||||||
Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
|
Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
|
||||||
assertEquals(expectedCount, counter.getValue());
|
assertEquals(expectedCount, counter.getValue());
|
||||||
|
@ -235,7 +313,7 @@ public class TestRowCounter {
|
||||||
* @param table
|
* @param table
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private static void writeRows(Table table) throws IOException {
|
private static void writeRows(Table table, int totalRows, int rowsWithOneCol) throws IOException {
|
||||||
final byte[] family = Bytes.toBytes(COL_FAM);
|
final byte[] family = Bytes.toBytes(COL_FAM);
|
||||||
final byte[] value = Bytes.toBytes("abcd");
|
final byte[] value = Bytes.toBytes("abcd");
|
||||||
final byte[] col1 = Bytes.toBytes(COL1);
|
final byte[] col1 = Bytes.toBytes(COL1);
|
||||||
|
@ -244,7 +322,7 @@ public class TestRowCounter {
|
||||||
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
|
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
|
||||||
// write few rows with two columns
|
// write few rows with two columns
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) {
|
for (; i < totalRows - rowsWithOneCol; i++) {
|
||||||
// Use binary rows values to test for HBASE-15287.
|
// Use binary rows values to test for HBASE-15287.
|
||||||
byte[] row = Bytes.toBytesBinary("\\x00row" + i);
|
byte[] row = Bytes.toBytesBinary("\\x00row" + i);
|
||||||
Put put = new Put(row);
|
Put put = new Put(row);
|
||||||
|
@ -255,7 +333,7 @@ public class TestRowCounter {
|
||||||
}
|
}
|
||||||
|
|
||||||
// write few rows with only one column
|
// write few rows with only one column
|
||||||
for (; i < TOTAL_ROWS; i++) {
|
for (; i < totalRows; i++) {
|
||||||
byte[] row = Bytes.toBytes("row" + i);
|
byte[] row = Bytes.toBytes("row" + i);
|
||||||
Put put = new Put(row);
|
Put put = new Put(row);
|
||||||
put.addColumn(family, col2, value);
|
put.addColumn(family, col2, value);
|
||||||
|
@ -288,7 +366,7 @@ public class TestRowCounter {
|
||||||
assertTrue(data.toString().contains(
|
assertTrue(data.toString().contains(
|
||||||
"Usage: RowCounter [options] <tablename> " +
|
"Usage: RowCounter [options] <tablename> " +
|
||||||
"[--starttime=[start] --endtime=[end] " +
|
"[--starttime=[start] --endtime=[end] " +
|
||||||
"[--range=[startKey],[endKey]] " +
|
"[--range=[startKey],[endKey][;[startKey],[endKey]...]] " +
|
||||||
"[<column1> <column2>...]"));
|
"[<column1> <column2>...]"));
|
||||||
assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
|
assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
|
||||||
assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
|
assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
|
||||||
|
@ -308,7 +386,7 @@ public class TestRowCounter {
|
||||||
assertTrue(data.toString().contains(
|
assertTrue(data.toString().contains(
|
||||||
"Usage: RowCounter [options] <tablename> " +
|
"Usage: RowCounter [options] <tablename> " +
|
||||||
"[--starttime=[start] --endtime=[end] " +
|
"[--starttime=[start] --endtime=[end] " +
|
||||||
"[--range=[startKey],[endKey]] " +
|
"[--range=[startKey],[endKey][;[startKey],[endKey]...]] " +
|
||||||
"[<column1> <column2>...]"));
|
"[<column1> <column2>...]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue