HBASE-21773 - Addendum - Bring back "public static Job createSubmitta… (#1953)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit edf75f2535
)
This commit is contained in:
parent
c4042f4927
commit
017c374519
|
@ -137,6 +137,103 @@ public class RowCounter extends AbstractHBaseTool {
|
|||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the actual job.
|
||||
*
|
||||
* @param conf The current configuration.
|
||||
* @param args The command line parameters.
|
||||
* @return The newly created job.
|
||||
* @throws IOException When setting up the job fails.
|
||||
* @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static Job createSubmittableJob(Configuration conf, String[] args)
|
||||
throws IOException {
|
||||
String tableName = args[0];
|
||||
List<MultiRowRangeFilter.RowRange> rowRangeList = null;
|
||||
long startTime = 0;
|
||||
long endTime = 0;
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
final String rangeSwitch = "--range=";
|
||||
final String startTimeArgKey = "--starttime=";
|
||||
final String endTimeArgKey = "--endtime=";
|
||||
final String expectedCountArg = "--expected-count=";
|
||||
|
||||
// First argument is table name, starting from second
|
||||
for (int i = 1; i < args.length; i++) {
|
||||
if (args[i].startsWith(rangeSwitch)) {
|
||||
try {
|
||||
rowRangeList = parseRowRangeParameter(
|
||||
args[i].substring(args[1].indexOf(rangeSwitch)+rangeSwitch.length()));
|
||||
} catch (IllegalArgumentException e) {
|
||||
return null;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (args[i].startsWith(startTimeArgKey)) {
|
||||
startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
if (args[i].startsWith(endTimeArgKey)) {
|
||||
endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
if (args[i].startsWith(expectedCountArg)) {
|
||||
conf.setLong(EXPECTED_COUNT_KEY,
|
||||
Long.parseLong(args[i].substring(expectedCountArg.length())));
|
||||
continue;
|
||||
}
|
||||
// if no switch, assume column names
|
||||
sb.append(args[i]);
|
||||
sb.append(" ");
|
||||
}
|
||||
if (endTime < startTime) {
|
||||
printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
|
||||
return null;
|
||||
}
|
||||
|
||||
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
|
||||
job.setJarByClass(RowCounter.class);
|
||||
Scan scan = new Scan();
|
||||
scan.setCacheBlocks(false);
|
||||
setScanFilter(scan, rowRangeList);
|
||||
if (sb.length() > 0) {
|
||||
for (String columnName : sb.toString().trim().split(" ")) {
|
||||
String family = StringUtils.substringBefore(columnName, ":");
|
||||
String qualifier = StringUtils.substringAfter(columnName, ":");
|
||||
|
||||
if (StringUtils.isBlank(qualifier)) {
|
||||
scan.addFamily(Bytes.toBytes(family));
|
||||
}
|
||||
else {
|
||||
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
|
||||
}
|
||||
}
|
||||
}
|
||||
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
||||
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
|
||||
job.setNumReduceTasks(0);
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints usage without error message.
|
||||
* Note that we don't document --expected-count, because it's intended for test.
|
||||
*/
|
||||
private static void printUsage(String errorMessage) {
|
||||
System.err.println("ERROR: " + errorMessage);
|
||||
System.err.println("Usage: hbase rowcounter [options] <tablename> "
|
||||
+ "[--starttime=<start> --endtime=<end>] "
|
||||
+ "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
|
||||
System.err.println("For performance consider the following options:\n"
|
||||
+ "-Dhbase.client.scanner.caching=100\n"
|
||||
+ "-Dmapreduce.map.speculative=false");
|
||||
}
|
||||
|
||||
private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) {
|
||||
final List<String> rangesSplit = Splitter.on(";").splitToList(arg);
|
||||
final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
|
||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
||||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -52,7 +54,7 @@ public class TestRowCounter {
|
|||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRowCounter.class);
|
||||
HBaseClassTestRule.forClass(TestRowCounter.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestRowCounter.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
@ -92,7 +94,7 @@ public class TestRowCounter {
|
|||
@Test
|
||||
public void testRowCounterNoColumn() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME
|
||||
TABLE_NAME
|
||||
};
|
||||
runRowCount(args, 10);
|
||||
}
|
||||
|
@ -106,7 +108,7 @@ public class TestRowCounter {
|
|||
@Test
|
||||
public void testRowCounterExclusiveColumn() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, COL_FAM + ":" + COL1
|
||||
TABLE_NAME, COL_FAM + ":" + COL1
|
||||
};
|
||||
runRowCount(args, 8);
|
||||
}
|
||||
|
@ -120,7 +122,7 @@ public class TestRowCounter {
|
|||
@Test
|
||||
public void testRowCounterColumnWithColonInQualifier() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
|
||||
TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
|
||||
};
|
||||
runRowCount(args, 8);
|
||||
}
|
||||
|
@ -134,7 +136,7 @@ public class TestRowCounter {
|
|||
@Test
|
||||
public void testRowCounterHiddenColumn() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, COL_FAM + ":" + COL2
|
||||
TABLE_NAME, COL_FAM + ":" + COL2
|
||||
};
|
||||
runRowCount(args, 10);
|
||||
}
|
||||
|
@ -149,7 +151,7 @@ public class TestRowCounter {
|
|||
@Test
|
||||
public void testRowCounterColumnAndRowRange() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1
|
||||
TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1
|
||||
};
|
||||
runRowCount(args, 8);
|
||||
}
|
||||
|
@ -161,7 +163,7 @@ public class TestRowCounter {
|
|||
@Test
|
||||
public void testRowCounterRowSingleRange() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3"
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3"
|
||||
};
|
||||
runRowCount(args, 2);
|
||||
}
|
||||
|
@ -197,7 +199,7 @@ public class TestRowCounter {
|
|||
@Test
|
||||
public void testRowCounterRowMultiRange() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
|
||||
};
|
||||
runRowCount(args, 5);
|
||||
}
|
||||
|
@ -210,7 +212,7 @@ public class TestRowCounter {
|
|||
@Test
|
||||
public void testRowCounterRowMultiEmptyRange() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
|
||||
};
|
||||
runRowCount(args, 2);
|
||||
}
|
||||
|
@ -260,30 +262,30 @@ public class TestRowCounter {
|
|||
table.close();
|
||||
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + 0,
|
||||
"--endtime=" + ts
|
||||
TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + 0,
|
||||
"--endtime=" + ts
|
||||
};
|
||||
runRowCount(args, 1);
|
||||
|
||||
args = new String[] {
|
||||
TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + 0,
|
||||
"--endtime=" + (ts - 10)
|
||||
TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + 0,
|
||||
"--endtime=" + (ts - 10)
|
||||
};
|
||||
runRowCount(args, 1);
|
||||
|
||||
args = new String[] {
|
||||
TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + ts,
|
||||
"--endtime=" + (ts + 1000)
|
||||
TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + ts,
|
||||
"--endtime=" + (ts + 1000)
|
||||
};
|
||||
runRowCount(args, 2);
|
||||
|
||||
args = new String[] {
|
||||
TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + (ts - 30 * 1000),
|
||||
"--endtime=" + (ts + 30 * 1000),
|
||||
TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + (ts - 30 * 1000),
|
||||
"--endtime=" + (ts + 30 * 1000),
|
||||
};
|
||||
runRowCount(args, 3);
|
||||
}
|
||||
|
@ -307,6 +309,224 @@ public class TestRowCounter {
|
|||
assertTrue(result==0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the RowCounter map reduce job and verify the row count.
|
||||
*
|
||||
* @param args the command line arguments to be used for rowcounter job.
|
||||
* @param expectedCount the expected row count (result of map reduce job).
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
private void runCreateSubmittableJobWithArgs(String[] args, int expectedCount) throws Exception {
|
||||
Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
|
||||
long start = System.currentTimeMillis();
|
||||
job.waitForCompletion(true);
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
LOG.debug("row count duration (ms): " + duration);
|
||||
assertTrue(job.isSuccessful());
|
||||
Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
|
||||
assertEquals(expectedCount, counter.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsNoColumn() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when the column specified in command line arguments is
|
||||
* exclusive for few rows.
|
||||
*
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsExclusiveColumn() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, COL_FAM + ":" + COL1
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 8);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when the column specified in command line arguments is
|
||||
* one for which the qualifier contains colons.
|
||||
*
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsColumnWithColonInQualifier() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 8);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when the column specified in command line arguments is not part
|
||||
* of first KV for a row.
|
||||
*
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsHiddenColumn() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, COL_FAM + ":" + COL2
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 10);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test a case when the column specified in command line arguments is
|
||||
* exclusive for few rows and also a row range filter is specified
|
||||
*
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsColumnAndRowRange() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 8);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when a range is specified with single range of start-end keys
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsRowSingleRange() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3"
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when a range is specified with single range with end key only
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsRowSingleRangeUpperBound() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=,\\x00row3"
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when a range is specified with two ranges where one range is with end key only
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsRowMultiRangeUpperBound() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7"
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 5);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when a range is specified with multiple ranges of start-end keys
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsRowMultiRange() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 5);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when a range is specified with multiple ranges of start-end keys;
|
||||
* one range is filled, another two are not
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsRowMultiEmptyRange() throws Exception {
|
||||
String[] args = new String[] {
|
||||
TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgs10kRowRange() throws Exception {
|
||||
String tableName = TABLE_NAME + "CreateSubmittableJobWithArgs10kRowRange";
|
||||
|
||||
try (Table table = TEST_UTIL.createTable(
|
||||
TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
|
||||
writeRows(table, 10000, 0);
|
||||
}
|
||||
String[] args = new String[] {
|
||||
tableName, "--range=\\x00row9872,\\x00row9875"
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case when the timerange is specified with --starttime and --endtime options
|
||||
*
|
||||
* @throws Exception in case of any unexpected error.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSubmittableJobWithArgsTimeRange() throws Exception {
|
||||
final byte[] family = Bytes.toBytes(COL_FAM);
|
||||
final byte[] col1 = Bytes.toBytes(COL1);
|
||||
Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
|
||||
Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
|
||||
Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
|
||||
|
||||
long ts;
|
||||
|
||||
String tableName = TABLE_NAME_TS_RANGE+"CreateSubmittableJobWithArgs";
|
||||
// clean up content of TABLE_NAME
|
||||
Table table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM));
|
||||
|
||||
ts = System.currentTimeMillis();
|
||||
put1.addColumn(family, col1, ts, Bytes.toBytes("val1"));
|
||||
table.put(put1);
|
||||
Thread.sleep(100);
|
||||
|
||||
ts = System.currentTimeMillis();
|
||||
put2.addColumn(family, col1, ts, Bytes.toBytes("val2"));
|
||||
put3.addColumn(family, col1, ts, Bytes.toBytes("val3"));
|
||||
table.put(put2);
|
||||
table.put(put3);
|
||||
table.close();
|
||||
|
||||
String[] args = new String[] {
|
||||
tableName, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + 0,
|
||||
"--endtime=" + ts
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 1);
|
||||
|
||||
args = new String[] {
|
||||
tableName, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + 0,
|
||||
"--endtime=" + (ts - 10)
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 1);
|
||||
|
||||
args = new String[] {
|
||||
tableName, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + ts,
|
||||
"--endtime=" + (ts + 1000)
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 2);
|
||||
|
||||
args = new String[] {
|
||||
tableName, COL_FAM + ":" + COL1,
|
||||
"--starttime=" + (ts - 30 * 1000),
|
||||
"--endtime=" + (ts + 30 * 1000),
|
||||
};
|
||||
runCreateSubmittableJobWithArgs(args, 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
|
||||
* two columns, Few have one.
|
||||
|
@ -405,17 +625,17 @@ public class TestRowCounter {
|
|||
|
||||
private void assertUsageContent(String usage) {
|
||||
assertTrue(usage.contains("usage: hbase rowcounter "
|
||||
+ "<tablename> [options] [<column1> <column2>...]"));
|
||||
+ "<tablename> [options] [<column1> <column2>...]"));
|
||||
assertTrue(usage.contains("Options:\n"));
|
||||
assertTrue(usage.contains("--starttime=<arg> "
|
||||
+ "starting time filter to start counting rows from.\n"));
|
||||
+ "starting time filter to start counting rows from.\n"));
|
||||
assertTrue(usage.contains("--endtime=<arg> "
|
||||
+ "end time filter limit, to only count rows up to this timestamp.\n"));
|
||||
+ "end time filter limit, to only count rows up to this timestamp.\n"));
|
||||
assertTrue(usage.contains("--range=<arg> "
|
||||
+ "[startKey],[endKey][;[startKey],[endKey]...]]\n"));
|
||||
+ "[startKey],[endKey][;[startKey],[endKey]...]]\n"));
|
||||
assertTrue(usage.contains("--expectedCount=<arg> expected number of rows to be count.\n"));
|
||||
assertTrue(usage.contains("For performance, "
|
||||
+ "consider the following configuration properties:\n"));
|
||||
+ "consider the following configuration properties:\n"));
|
||||
assertTrue(usage.contains("-Dhbase.client.scanner.caching=100\n"));
|
||||
assertTrue(usage.contains("-Dmapreduce.map.speculative=false\n"));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue