From 6c2a2996570d7239dd27481eb2900833d5537e16 Mon Sep 17 00:00:00 2001 From: Esteban Gutierrez Date: Tue, 11 Nov 2014 00:22:54 -0800 Subject: [PATCH] HBASE-12447 Add support for setTimeRange for RowCounter and CellCounter Signed-off-by: stack --- .../hadoop/hbase/mapreduce/CellCounter.java | 36 +++++- .../hadoop/hbase/mapreduce/RowCounter.java | 20 +++ .../hbase/mapreduce/TestCellCounter.java | 121 +++++++++++++++++- .../hbase/mapreduce/TestRowCounter.java | 79 +++++++++++- 4 files changed, 245 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java index 08018ab6dd5..9588916e102 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -222,6 +224,12 @@ public class CellCounter extends Configured implements Tool { LOG.info("Setting Row Filter for counter."); s.setFilter(rowFilter); } + // Set TimeRange if defined + long timeRange[] = getTimeRange(args); + if (timeRange != null) { + LOG.info("Setting TimeRange for counter."); + s.setTimeRange(timeRange[0], timeRange[1]); + } return s; } @@ -239,13 +247,37 @@ public class CellCounter extends Configured implements Tool { return rowFilter; } + private static long[] getTimeRange(String[] args) throws IOException { + final String startTimeArgKey = "--starttime="; + final String endTimeArgKey = "--endtime="; + long startTime = 0L; + long endTime = 0L; + + for (int i = 1; i < args.length; i++) { + System.out.println("i:" + i + "arg[i]" + args[i]); + if (args[i].startsWith(startTimeArgKey)) { + startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); + } + if (args[i].startsWith(endTimeArgKey)) { + endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); + } + } + + if (startTime == 0 && endTime == 0) + return null; + + endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime; + return new long [] {startTime, endTime}; + } + @Override public int run(String[] args) throws Exception { String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("ERROR: Wrong number of parameters: " + args.length); - System.err.println("Usage: CellCounter " + - "[^[regex pattern] or [Prefix] for row filter]] "); + System.err.println("Usage: CellCounter "); + System.err.println(" [^[regex pattern] or " + + "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]"); System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" Additionally, the following SCAN properties can be specified"); System.err.println(" to get fine grained control on what is counted.."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 17f5ea4b758..5a506e1f22a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -94,9 +95,14 @@ public class RowCounter extends Configured implements Tool { String tableName = args[0]; String startKey = null; String endKey = null; + long startTime = 0; + long endTime = 0; + StringBuilder sb = new StringBuilder(); final String rangeSwitch = "--range="; + final String startTimeArgKey = "--starttime="; + final String endTimeArgKey = "--endtime="; // First argument is table name, starting from second for (int i = 1; i < args.length; i++) { @@ -110,6 +116,18 @@ public class RowCounter extends Configured implements Tool { startKey = startEnd[0]; endKey = startEnd[1]; } + if (startTime < endTime) { + printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); + return null; + } + if (args[i].startsWith(startTimeArgKey)) { + startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); + continue; + } + if (args[i].startsWith(endTimeArgKey)) { + endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); + continue; + } else { // if no switch, assume column names sb.append(args[i]); @@ -149,6 +167,7 @@ public class RowCounter extends Configured implements Tool { } else { scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers)); } + scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); @@ -169,6 +188,7 @@ public class RowCounter extends Configured implements Tool { */ private static void printUsage() { System.err.println("Usage: RowCounter [options] " + + "[--starttime=[start] --endtime=[end] " + "[--range=[startKey],[endKey]] [ ...]"); System.err.println("For performance consider the following options:\n" + "-Dhbase.client.scanner.caching=100\n" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java index 1dab9ddc68f..22bc33041ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java @@ -77,7 +77,7 @@ public class TestCellCounter { /** * Test CellCounter all data should print to output - * + * */ @Test (timeout=300000) public void testCellCounter() throws Exception { @@ -97,7 +97,7 @@ public class TestCellCounter { t.put(p); String[] args = { sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1" }; runCount(args); - FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + "part-r-00000"); String data = IOUtils.toString(inputStream); inputStream.close(); @@ -110,10 +110,127 @@ public class TestCellCounter { assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); }finally{ t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); } } + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounterStartTimeRange() throws Exception { + String sourceTable = "testCellCounterStartTimeRange"; + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families); + try{ + Put p = new Put(ROW1); + p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1", "--starttime=" + now, + "--endtime=" + now + 2 }; + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounteEndTimeRange() throws Exception { + String sourceTable = "testCellCounterEndTimeRange"; + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families); + try{ + Put p = new Put(ROW1); + p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1", "--endtime=" + now + 1 }; + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounteOutOfTimeRange() throws Exception { + String sourceTable = "testCellCounterOutTimeRange"; + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families); + try{ + Put p = new Put(ROW1); + p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable, FQ_OUTPUT_DIR.toString(), ";", "--starttime=" + now + 1, + "--endtime=" + now + 2 }; + + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + // nothing should hace been emitted to the reducer + assertTrue(data.isEmpty()); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + private boolean runCount(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // need to make a copy of the configuration because to make sure diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java index 99fdfd44afe..59854ee1690 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -25,13 +25,16 @@ import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.sql.Time; import java.util.ArrayList; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.client.Put; @@ -85,7 +88,7 @@ public class TestRowCounter { /** * Test a case when no column was specified in command line arguments. - * + * * @throws Exception */ @Test @@ -99,7 +102,7 @@ public class TestRowCounter { /** * Test a case when the column specified in command line arguments is * exclusive for few rows. - * + * * @throws Exception */ @Test @@ -127,7 +130,7 @@ public class TestRowCounter { /** * Test a case when the column specified in command line arguments is not part * of first KV for a row. - * + * * @throws Exception */ @Test @@ -138,9 +141,67 @@ public class TestRowCounter { runRowCount(args, 10); } + /** + * Test a case when the timerange is specified with --starttime and --endtime options + * + * @throws Exception + */ + @Test + public void testRowCounterTimeRange() throws Exception { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] col1 = Bytes.toBytes(COL1); + Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1)); + Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2)); + Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3)); + + long ts; + + // clean up content of TABLE_NAME + HTable table = TEST_UTIL.deleteTableData(TableName.valueOf(TABLE_NAME)); + ts = System.currentTimeMillis(); + put1.add(family, col1, ts, Bytes.toBytes("val1")); + table.put(put1); + Thread.sleep(100); + + ts = System.currentTimeMillis(); + put2.add(family, col1, ts, Bytes.toBytes("val2")); + put3.add(family, col1, ts, Bytes.toBytes("val3")); + table.put(put2); + table.put(put3); + table.close(); + + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL1, + "--starttime=" + 0, + "--endtime=" + ts + }; + runRowCount(args, 1); + + args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL1, + "--starttime=" + 0, + "--endtime=" + (ts - 10) + }; + runRowCount(args, 1); + + args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL1, + "--starttime=" + ts, + "--endtime=" + (ts + 1000) + }; + runRowCount(args, 2); + + args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL1, + "--starttime=" + (ts - 30 * 1000), + "--endtime=" + (ts + 30 * 1000), + }; + runRowCount(args, 3); + } + /** * Run the RowCounter map reduce job and verify the row count. - * + * * @param args the command line arguments to be used for rowcounter job. * @param expectedCount the expected row count (result of map reduce job). * @throws Exception @@ -161,7 +222,7 @@ public class TestRowCounter { /** * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have * two columns, Few have one. - * + * * @param table * @throws IOException */ @@ -215,7 +276,9 @@ public class TestRowCounter { assertEquals(-1, newSecurityManager.getExitCode()); assertTrue(data.toString().contains("Wrong number of parameters:")); assertTrue(data.toString().contains( - "Usage: RowCounter [options] [--range=[startKey],[endKey]] " + + "Usage: RowCounter [options] " + + "[--starttime=[start] --endtime=[end] " + + "[--range=[startKey],[endKey]] " + "[ ...]")); assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100")); assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false")); @@ -233,7 +296,9 @@ public class TestRowCounter { "Please specify range in such format as \"--range=a,b\" or, with only one boundary," + " \"--range=,b\" or \"--range=a,\"")); assertTrue(data.toString().contains( - "Usage: RowCounter [options] [--range=[startKey],[endKey]] " + + "Usage: RowCounter [options] " + + "[--starttime=[start] --endtime=[end] " + + "[--range=[startKey],[endKey]] " + "[ ...]")); }