From bebf0181c8609870034dbcbe21f1bd45b3db3ea7 Mon Sep 17 00:00:00 2001 From: Esteban Gutierrez Date: Tue, 11 Nov 2014 13:38:59 -0800 Subject: [PATCH] HBASE-12447 Add support for setTimeRange for RowCounter and CellCounter Signed-off-by: stack --- .../hadoop/hbase/mapreduce/CellCounter.java | 35 ++++- .../hadoop/hbase/mapreduce/RowCounter.java | 20 +++ .../hbase/mapreduce/TestCellCounter.java | 121 ++++++++++++++- .../hbase/mapreduce/TestRowCounter.java | 144 +++++++++++++----- 4 files changed, 275 insertions(+), 45 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java index df97f633280..b36f3d24463 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -217,6 +219,12 @@ public class CellCounter { LOG.info("Setting Row Filter for counter."); s.setFilter(rowFilter); } + // Set TimeRange if defined + long timeRange[] = getTimeRange(args); + if (timeRange != null) { + LOG.info("Setting TimeRange for counter."); + s.setTimeRange(timeRange[0], timeRange[1]); + } return s; } @@ -234,6 +242,28 @@ public class CellCounter { return rowFilter; } + private static long[] getTimeRange(String[] args) throws IOException { + final String startTimeArgKey = "--starttime="; + final String endTimeArgKey = "--endtime="; + long startTime = 0L; + long endTime = 0L; + + for (int i = 1; i < args.length; i++) { + System.out.println("i:" + i + "arg[i]" + args[i]); + if (args[i].startsWith(startTimeArgKey)) { + startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); + } + if (args[i].startsWith(endTimeArgKey)) { + endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); + } + } + + if (startTime == 0 && endTime == 0) + return null; + + endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime; + return new long [] {startTime, endTime}; + } /** * Main entry point. * @@ -245,8 +275,9 @@ public class CellCounter { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("ERROR: Wrong number of parameters: " + args.length); - System.err.println("Usage: CellCounter " + - "[^[regex pattern] or [Prefix] for row filter]] "); + System.err.println("Usage: CellCounter "); + System.err.println(" [^[regex pattern] or " + + "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]"); System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" Additionally, the following SCAN properties can be specified"); System.err.println(" to get fine grained control on what is counted.."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index c2b627cbaf6..04450f885fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -89,9 +90,14 @@ public class RowCounter { String tableName = args[0]; String startKey = null; String endKey = null; + long startTime = 0; + long endTime = 0; + StringBuilder sb = new StringBuilder(); final String rangeSwitch = "--range="; + final String startTimeArgKey = "--starttime="; + final String endTimeArgKey = "--endtime="; // First argument is table name, starting from second for (int i = 1; i < args.length; i++) { @@ -105,6 +111,18 @@ public class RowCounter { startKey = startEnd[0]; endKey = startEnd[1]; } + if (startTime < endTime) { + printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); + return null; + } + if (args[i].startsWith(startTimeArgKey)) { + startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); + continue; + } + if (args[i].startsWith(endTimeArgKey)) { + endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); + continue; + } else { // if no switch, assume column names sb.append(args[i]); @@ -144,6 +162,7 @@ public class RowCounter { } else { scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers)); } + scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); @@ -164,6 +183,7 @@ public class RowCounter { */ private static void printUsage() { System.err.println("Usage: RowCounter [options] " + + "[--starttime=[start] --endtime=[end] " + "[--range=[startKey],[endKey]] [ ...]"); System.err.println("For performance consider the following options:\n" + "-Dhbase.client.scanner.caching=100\n" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java index 64c9d984f93..16fd013fabb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java @@ -74,7 +74,7 @@ public class TestCellCounter { /** * Test CellCounter all data should print to output - * + * */ @Test (timeout=300000) public void testCellCounter() throws Exception { @@ -94,7 +94,7 @@ public class TestCellCounter { t.put(p); String[] args = { sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1" }; runCount(args); - FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + "part-r-00000"); String data = IOUtils.toString(inputStream); inputStream.close(); @@ -107,10 +107,127 @@ public class TestCellCounter { assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); }finally{ t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); } } + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounterStartTimeRange() throws Exception { + String sourceTable = "testCellCounterStartTimeRange"; + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families); + try{ + Put p = new Put(ROW1); + p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1", "--starttime=" + now, + "--endtime=" + now + 2 }; + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounteEndTimeRange() throws Exception { + String sourceTable = "testCellCounterEndTimeRange"; + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families); + try{ + Put p = new Put(ROW1); + p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1", "--endtime=" + now + 1 }; + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounteOutOfTimeRange() throws Exception { + String sourceTable = "testCellCounterOutTimeRange"; + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families); + try{ + Put p = new Put(ROW1); + p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable, FQ_OUTPUT_DIR.toString(), ";", "--starttime=" + now + 1, + "--endtime=" + now + 2 }; + + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + // nothing should hace been emitted to the reducer + assertTrue(data.isEmpty()); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + private boolean runCount(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // need to make a copy of the configuration because to make sure diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java index 4027d6d7e6c..5e8c2161987 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -25,7 +25,9 @@ import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.sql.Time; import java.util.ArrayList; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +35,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper; import org.apache.hadoop.hbase.util.Bytes; @@ -52,20 +59,29 @@ import org.junit.experimental.categories.Category; @Category(MediumTests.class) public class TestRowCounter { final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String TABLE_NAME = "testRowCounter"; + private final static String COL_FAM = "col_fam"; + private final static String COL1 = "c1"; + private final static String COL2 = "c2"; + private final static String COMPOSITE_COLUMN = "C:A:A"; + private final static int TOTAL_ROWS = 10; + private final static int ROWS_WITH_ONE_COL = 2; /** * @throws java.lang.Exception */ @BeforeClass - public static void setUpBeforeClass() throws Exception { + public static void setUpBeforeClass() + throws Exception { TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniMapReduceCluster(); Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM)); @@ -77,35 +93,34 @@ public class TestRowCounter { * @throws java.lang.Exception */ @AfterClass - public static void tearDownAfterClass() throws Exception { + public static void tearDownAfterClass() + throws Exception { TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniMapReduceCluster(); } /** * Test a case when no column was specified in command line arguments. - * + * * @throws Exception */ @Test - public void testRowCounterNoColumn() throws Exception { - String[] args = new String[] { - TABLE_NAME - }; + public void testRowCounterNoColumn() + throws Exception { + String[] args = new String[] {TABLE_NAME}; runRowCount(args, 10); } /** * Test a case when the column specified in command line arguments is * exclusive for few rows. - * + * * @throws Exception */ @Test - public void testRowCounterExclusiveColumn() throws Exception { - String[] args = new String[] { - TABLE_NAME, COL_FAM + ":" + COL1 - }; + public void testRowCounterExclusiveColumn() + throws Exception { + String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1}; runRowCount(args, 8); } @@ -116,55 +131,100 @@ public class TestRowCounter { * @throws Exception */ @Test - public void testRowCounterColumnWithColonInQualifier() throws Exception { - String[] args = new String[] { - TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN - }; + public void testRowCounterColumnWithColonInQualifier() + throws Exception { + String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN}; runRowCount(args, 8); } /** * Test a case when the column specified in command line arguments is not part * of first KV for a row. - * + * * @throws Exception */ @Test - public void testRowCounterHiddenColumn() throws Exception { - String[] args = new String[] { - TABLE_NAME, COL_FAM + ":" + COL2 - }; + public void testRowCounterHiddenColumn() + throws Exception { + String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL2}; runRowCount(args, 10); } + /** + * Test a case when the timerange is specified with --starttime and --endtime options + * + * @throws Exception + */ + @Test + public void testRowCounterTimeRange() + throws Exception { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] col1 = Bytes.toBytes(COL1); + Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1)); + Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2)); + Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3)); + + long ts; + + // clean up content of TABLE_NAME + HTable table = TEST_UTIL.truncateTable(TableName.valueOf(TABLE_NAME)); + ts = System.currentTimeMillis(); + put1.add(family, col1, ts, Bytes.toBytes("val1")); + table.put(put1); + Thread.sleep(100); + + ts = System.currentTimeMillis(); + put2.add(family, col1, ts, Bytes.toBytes("val2")); + put3.add(family, col1, ts, Bytes.toBytes("val3")); + table.put(put2); + table.put(put3); + table.close(); + + String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0, + "--endtime=" + ts}; + runRowCount(args, 1); + + args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0, + "--endtime=" + (ts - 10)}; + runRowCount(args, 1); + + args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + ts, + "--endtime=" + (ts + 1000)}; + runRowCount(args, 2); + + args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000), + "--endtime=" + (ts + 30 * 1000),}; + runRowCount(args, 3); + } + /** * Run the RowCounter map reduce job and verify the row count. - * + * * @param args the command line arguments to be used for rowcounter job. * @param expectedCount the expected row count (result of map reduce job). * @throws Exception */ - private void runRowCount(String[] args, int expectedCount) throws Exception { - GenericOptionsParser opts = new GenericOptionsParser( - TEST_UTIL.getConfiguration(), args); + private void runRowCount(String[] args, int expectedCount) + throws Exception { + GenericOptionsParser opts = new GenericOptionsParser(TEST_UTIL.getConfiguration(), args); Configuration conf = opts.getConfiguration(); args = opts.getRemainingArgs(); Job job = RowCounter.createSubmittableJob(conf, args); job.waitForCompletion(true); assertTrue(job.isSuccessful()); - Counter counter = job.getCounters().findCounter( - RowCounterMapper.Counters.ROWS); + Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); assertEquals(expectedCount, counter.getValue()); } /** * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have * two columns, Few have one. - * + * * @param table * @throws IOException */ - private static void writeRows(Table table) throws IOException { + private static void writeRows(Table table) + throws IOException { final byte[] family = Bytes.toBytes(COL_FAM); final byte[] value = Bytes.toBytes("abcd"); final byte[] col1 = Bytes.toBytes(COL1); @@ -196,10 +256,11 @@ public class TestRowCounter { * test main method. Import should print help and call System.exit */ @Test - public void testImportMain() throws Exception { + public void testImportMain() + throws Exception { PrintStream oldPrintStream = System.err; SecurityManager SECURITY_MANAGER = System.getSecurityManager(); - LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); + LauncherSecurityManager newSecurityManager = new LauncherSecurityManager(); System.setSecurityManager(newSecurityManager); ByteArrayOutputStream data = new ByteArrayOutputStream(); String[] args = {}; @@ -213,9 +274,10 @@ public class TestRowCounter { } catch (SecurityException e) { assertEquals(-1, newSecurityManager.getExitCode()); assertTrue(data.toString().contains("Wrong number of parameters:")); - assertTrue(data.toString().contains( - "Usage: RowCounter [options] [--range=[startKey],[endKey]] " + - "[ ...]")); + assertTrue(data.toString().contains("Usage: RowCounter [options] " + + "[--starttime=[start] --endtime=[end] " + + "[--range=[startKey],[endKey]] " + + "[ ...]")); assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100")); assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false")); } @@ -228,12 +290,13 @@ public class TestRowCounter { fail("should be SecurityException"); } catch (SecurityException e) { assertEquals(-1, newSecurityManager.getExitCode()); - assertTrue(data.toString().contains( - "Please specify range in such format as \"--range=a,b\" or, with only one boundary," + - " \"--range=,b\" or \"--range=a,\"")); - assertTrue(data.toString().contains( - "Usage: RowCounter [options] [--range=[startKey],[endKey]] " + - "[ ...]")); + assertTrue(data.toString().contains("Please specify range in such format as \"--range=a,b\" or, with only one boundary," + + + " \"--range=,b\" or \"--range=a,\"")); + assertTrue(data.toString().contains("Usage: RowCounter [options] " + + "[--starttime=[start] --endtime=[end] " + + "[--range=[startKey],[endKey]] " + + "[ ...]")); } } finally { @@ -242,5 +305,4 @@ public class TestRowCounter { } } - }