From 017c37451980480948467fd25fe1d0d0772d4c35 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Tue, 23 Jun 2020 15:13:53 +0100 Subject: [PATCH] =?UTF-8?q?HBASE-21773=20-=20Addendum=20-=20Bring=20back?= =?UTF-8?q?=20"public=20static=20Job=20createSubmitta=E2=80=A6=20(#1953)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Dimiduk Signed-off-by: Viraj Jasani (cherry picked from commit edf75f2535b6f6853a997525efe3f6f4ef4984b1) --- .../hadoop/hbase/mapreduce/RowCounter.java | 97 +++++++ .../hbase/mapreduce/TestRowCounter.java | 272 ++++++++++++++++-- 2 files changed, 343 insertions(+), 26 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 0b879c598c6..50d726b12cd 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -137,6 +137,103 @@ public class RowCounter extends AbstractHBaseTool { return job; } + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + * @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead. + */ + @Deprecated + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + List rowRangeList = null; + long startTime = 0; + long endTime = 0; + + StringBuilder sb = new StringBuilder(); + + final String rangeSwitch = "--range="; + final String startTimeArgKey = "--starttime="; + final String endTimeArgKey = "--endtime="; + final String expectedCountArg = "--expected-count="; + + // First argument is table name, starting from second + for (int i = 1; i < args.length; i++) { + if (args[i].startsWith(rangeSwitch)) { + try { + rowRangeList = parseRowRangeParameter( + args[i].substring(args[1].indexOf(rangeSwitch)+rangeSwitch.length())); + } catch (IllegalArgumentException e) { + return null; + } + continue; + } + if (args[i].startsWith(startTimeArgKey)) { + startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); + continue; + } + if (args[i].startsWith(endTimeArgKey)) { + endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); + continue; + } + if (args[i].startsWith(expectedCountArg)) { + conf.setLong(EXPECTED_COUNT_KEY, + Long.parseLong(args[i].substring(expectedCountArg.length()))); + continue; + } + // if no switch, assume column names + sb.append(args[i]); + sb.append(" "); + } + if (endTime < startTime) { + printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); + return null; + } + + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + job.setJarByClass(RowCounter.class); + Scan scan = new Scan(); + scan.setCacheBlocks(false); + setScanFilter(scan, rowRangeList); + if (sb.length() > 0) { + for (String columnName : sb.toString().trim().split(" ")) { + String family = StringUtils.substringBefore(columnName, ":"); + String qualifier = StringUtils.substringAfter(columnName, ":"); + + if (StringUtils.isBlank(qualifier)) { + scan.addFamily(Bytes.toBytes(family)); + } + else { + scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); + } + } + } + scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + job.setOutputFormatClass(NullOutputFormat.class); + TableMapReduceUtil.initTableMapperJob(tableName, scan, + RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); + job.setNumReduceTasks(0); + return job; + } + + /** + * Prints usage without error message. + * Note that we don't document --expected-count, because it's intended for test. + */ + private static void printUsage(String errorMessage) { + System.err.println("ERROR: " + errorMessage); + System.err.println("Usage: hbase rowcounter [options] " + + "[--starttime= --endtime=] " + + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [ ...]"); + System.err.println("For performance consider the following options:\n" + + "-Dhbase.client.scanner.caching=100\n" + + "-Dmapreduce.map.speculative=false"); + } + private static List parseRowRangeParameter(String arg) { final List rangesSplit = Splitter.on(";").splitToList(arg); final List rangeList = new ArrayList<>(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java index 12d121c8e56..add1b58fa12 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -52,7 +54,7 @@ public class TestRowCounter { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestRowCounter.class); + HBaseClassTestRule.forClass(TestRowCounter.class); private static final Logger LOG = LoggerFactory.getLogger(TestRowCounter.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -92,7 +94,7 @@ public class TestRowCounter { @Test public void testRowCounterNoColumn() throws Exception { String[] args = new String[] { - TABLE_NAME + TABLE_NAME }; runRowCount(args, 10); } @@ -106,7 +108,7 @@ public class TestRowCounter { @Test public void testRowCounterExclusiveColumn() throws Exception { String[] args = new String[] { - TABLE_NAME, COL_FAM + ":" + COL1 + TABLE_NAME, COL_FAM + ":" + COL1 }; runRowCount(args, 8); } @@ -120,7 +122,7 @@ public class TestRowCounter { @Test public void testRowCounterColumnWithColonInQualifier() throws Exception { String[] args = new String[] { - TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN + TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN }; runRowCount(args, 8); } @@ -134,7 +136,7 @@ public class TestRowCounter { @Test public void testRowCounterHiddenColumn() throws Exception { String[] args = new String[] { - TABLE_NAME, COL_FAM + ":" + COL2 + TABLE_NAME, COL_FAM + ":" + COL2 }; runRowCount(args, 10); } @@ -149,7 +151,7 @@ public class TestRowCounter { @Test public void testRowCounterColumnAndRowRange() throws Exception { String[] args = new String[] { - TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1 + TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1 }; runRowCount(args, 8); } @@ -161,7 +163,7 @@ public class TestRowCounter { @Test public void testRowCounterRowSingleRange() throws Exception { String[] args = new String[] { - TABLE_NAME, "--range=\\x00row1,\\x00row3" + TABLE_NAME, "--range=\\x00row1,\\x00row3" }; runRowCount(args, 2); } @@ -197,7 +199,7 @@ public class TestRowCounter { @Test public void testRowCounterRowMultiRange() throws Exception { String[] args = new String[] { - TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8" + TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8" }; runRowCount(args, 5); } @@ -210,7 +212,7 @@ public class TestRowCounter { @Test public void testRowCounterRowMultiEmptyRange() throws Exception { String[] args = new String[] { - TABLE_NAME, "--range=\\x00row1,\\x00row3;;" + TABLE_NAME, "--range=\\x00row1,\\x00row3;;" }; runRowCount(args, 2); } @@ -260,30 +262,30 @@ public class TestRowCounter { table.close(); String[] args = new String[] { - TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, - "--starttime=" + 0, - "--endtime=" + ts + TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, + "--starttime=" + 0, + "--endtime=" + ts }; runRowCount(args, 1); args = new String[] { - TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, - "--starttime=" + 0, - "--endtime=" + (ts - 10) + TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, + "--starttime=" + 0, + "--endtime=" + (ts - 10) }; runRowCount(args, 1); args = new String[] { - TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, - "--starttime=" + ts, - "--endtime=" + (ts + 1000) + TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, + "--starttime=" + ts, + "--endtime=" + (ts + 1000) }; runRowCount(args, 2); args = new String[] { - TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, - "--starttime=" + (ts - 30 * 1000), - "--endtime=" + (ts + 30 * 1000), + TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, + "--starttime=" + (ts - 30 * 1000), + "--endtime=" + (ts + 30 * 1000), }; runRowCount(args, 3); } @@ -307,6 +309,224 @@ public class TestRowCounter { assertTrue(result==0); } + /** + * Run the RowCounter map reduce job and verify the row count. + * + * @param args the command line arguments to be used for rowcounter job. + * @param expectedCount the expected row count (result of map reduce job). + * @throws Exception in case of any unexpected error. + */ + private void runCreateSubmittableJobWithArgs(String[] args, int expectedCount) throws Exception { + Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args); + long start = System.currentTimeMillis(); + job.waitForCompletion(true); + long duration = System.currentTimeMillis() - start; + LOG.debug("row count duration (ms): " + duration); + assertTrue(job.isSuccessful()); + Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS); + assertEquals(expectedCount, counter.getValue()); + } + + @Test + public void testCreateSubmittableJobWithArgsNoColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME + }; + runCreateSubmittableJobWithArgs(args, 10); + } + + /** + * Test a case when the column specified in command line arguments is + * exclusive for few rows. + * + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsExclusiveColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL1 + }; + runCreateSubmittableJobWithArgs(args, 8); + } + + /** + * Test a case when the column specified in command line arguments is + * one for which the qualifier contains colons. + * + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsColumnWithColonInQualifier() throws Exception { + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN + }; + runCreateSubmittableJobWithArgs(args, 8); + } + + /** + * Test a case when the column specified in command line arguments is not part + * of first KV for a row. + * + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsHiddenColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL2 + }; + runCreateSubmittableJobWithArgs(args, 10); + } + + + /** + * Test a case when the column specified in command line arguments is + * exclusive for few rows and also a row range filter is specified + * + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsColumnAndRowRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1 + }; + runCreateSubmittableJobWithArgs(args, 8); + } + + /** + * Test a case when a range is specified with single range of start-end keys + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsRowSingleRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3" + }; + runCreateSubmittableJobWithArgs(args, 2); + } + + /** + * Test a case when a range is specified with single range with end key only + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsRowSingleRangeUpperBound() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=,\\x00row3" + }; + runCreateSubmittableJobWithArgs(args, 3); + } + + /** + * Test a case when a range is specified with two ranges where one range is with end key only + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsRowMultiRangeUpperBound() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7" + }; + runCreateSubmittableJobWithArgs(args, 5); + } + + /** + * Test a case when a range is specified with multiple ranges of start-end keys + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsRowMultiRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8" + }; + runCreateSubmittableJobWithArgs(args, 5); + } + + /** + * Test a case when a range is specified with multiple ranges of start-end keys; + * one range is filled, another two are not + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsRowMultiEmptyRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3;;" + }; + runCreateSubmittableJobWithArgs(args, 2); + } + + @Test + public void testCreateSubmittableJobWithArgs10kRowRange() throws Exception { + String tableName = TABLE_NAME + "CreateSubmittableJobWithArgs10kRowRange"; + + try (Table table = TEST_UTIL.createTable( + TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) { + writeRows(table, 10000, 0); + } + String[] args = new String[] { + tableName, "--range=\\x00row9872,\\x00row9875" + }; + runCreateSubmittableJobWithArgs(args, 3); + } + + /** + * Test a case when the timerange is specified with --starttime and --endtime options + * + * @throws Exception in case of any unexpected error. + */ + @Test + public void testCreateSubmittableJobWithArgsTimeRange() throws Exception { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] col1 = Bytes.toBytes(COL1); + Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1)); + Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2)); + Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3)); + + long ts; + + String tableName = TABLE_NAME_TS_RANGE+"CreateSubmittableJobWithArgs"; + // clean up content of TABLE_NAME + Table table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM)); + + ts = System.currentTimeMillis(); + put1.addColumn(family, col1, ts, Bytes.toBytes("val1")); + table.put(put1); + Thread.sleep(100); + + ts = System.currentTimeMillis(); + put2.addColumn(family, col1, ts, Bytes.toBytes("val2")); + put3.addColumn(family, col1, ts, Bytes.toBytes("val3")); + table.put(put2); + table.put(put3); + table.close(); + + String[] args = new String[] { + tableName, COL_FAM + ":" + COL1, + "--starttime=" + 0, + "--endtime=" + ts + }; + runCreateSubmittableJobWithArgs(args, 1); + + args = new String[] { + tableName, COL_FAM + ":" + COL1, + "--starttime=" + 0, + "--endtime=" + (ts - 10) + }; + runCreateSubmittableJobWithArgs(args, 1); + + args = new String[] { + tableName, COL_FAM + ":" + COL1, + "--starttime=" + ts, + "--endtime=" + (ts + 1000) + }; + runCreateSubmittableJobWithArgs(args, 2); + + args = new String[] { + tableName, COL_FAM + ":" + COL1, + "--starttime=" + (ts - 30 * 1000), + "--endtime=" + (ts + 30 * 1000), + }; + runCreateSubmittableJobWithArgs(args, 3); + } + /** * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have * two columns, Few have one. @@ -405,17 +625,17 @@ public class TestRowCounter { private void assertUsageContent(String usage) { assertTrue(usage.contains("usage: hbase rowcounter " - + " [options] [ ...]")); + + " [options] [ ...]")); assertTrue(usage.contains("Options:\n")); assertTrue(usage.contains("--starttime= " - + "starting time filter to start counting rows from.\n")); + + "starting time filter to start counting rows from.\n")); assertTrue(usage.contains("--endtime= " - + "end time filter limit, to only count rows up to this timestamp.\n")); + + "end time filter limit, to only count rows up to this timestamp.\n")); assertTrue(usage.contains("--range= " - + "[startKey],[endKey][;[startKey],[endKey]...]]\n")); + + "[startKey],[endKey][;[startKey],[endKey]...]]\n")); assertTrue(usage.contains("--expectedCount= expected number of rows to be count.\n")); assertTrue(usage.contains("For performance, " - + "consider the following configuration properties:\n")); + + "consider the following configuration properties:\n")); assertTrue(usage.contains("-Dhbase.client.scanner.caching=100\n")); assertTrue(usage.contains("-Dmapreduce.map.speculative=false\n")); }