HBASE-12447 Add support for setTimeRange for RowCounter and CellCounter

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Esteban Gutierrez 2014-11-11 13:38:59 -08:00 committed by stack
parent d7f51e1cfa
commit bebf0181c8
4 changed files with 275 additions and 45 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@ -217,6 +219,12 @@ public class CellCounter {
LOG.info("Setting Row Filter for counter.");
s.setFilter(rowFilter);
}
// Set TimeRange if defined
long timeRange[] = getTimeRange(args);
if (timeRange != null) {
LOG.info("Setting TimeRange for counter.");
s.setTimeRange(timeRange[0], timeRange[1]);
}
return s;
}
@ -234,6 +242,28 @@ public class CellCounter {
return rowFilter;
}
private static long[] getTimeRange(String[] args) throws IOException {
final String startTimeArgKey = "--starttime=";
final String endTimeArgKey = "--endtime=";
long startTime = 0L;
long endTime = 0L;
for (int i = 1; i < args.length; i++) {
System.out.println("i:" + i + "arg[i]" + args[i]);
if (args[i].startsWith(startTimeArgKey)) {
startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
}
if (args[i].startsWith(endTimeArgKey)) {
endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
}
}
if (startTime == 0 && endTime == 0)
return null;
endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime;
return new long [] {startTime, endTime};
}
/**
* Main entry point.
*
@ -245,8 +275,9 @@ public class CellCounter {
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
System.err.println("Usage: CellCounter <tablename> <outputDir> <reportSeparator> " +
"[^[regex pattern] or [Prefix] for row filter]] ");
System.err.println("Usage: CellCounter ");
System.err.println(" <outputDir> <reportSeparator> [^[regex pattern] or " +
"[Prefix] for row filter]] <tablename> --starttime=[starttime] --endtime=[endtime]");
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" Additionally, the following SCAN properties can be specified");
System.err.println(" to get fine grained control on what is counted..");

View File

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -89,9 +90,14 @@ public class RowCounter {
String tableName = args[0];
String startKey = null;
String endKey = null;
long startTime = 0;
long endTime = 0;
StringBuilder sb = new StringBuilder();
final String rangeSwitch = "--range=";
final String startTimeArgKey = "--starttime=";
final String endTimeArgKey = "--endtime=";
// First argument is table name, starting from second
for (int i = 1; i < args.length; i++) {
@ -105,6 +111,18 @@ public class RowCounter {
startKey = startEnd[0];
endKey = startEnd[1];
}
if (startTime < endTime) {
printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
return null;
}
if (args[i].startsWith(startTimeArgKey)) {
startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
continue;
}
if (args[i].startsWith(endTimeArgKey)) {
endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
continue;
}
else {
// if no switch, assume column names
sb.append(args[i]);
@ -144,6 +162,7 @@ public class RowCounter {
} else {
scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
}
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
@ -164,6 +183,7 @@ public class RowCounter {
*/
private static void printUsage() {
System.err.println("Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
"[--range=[startKey],[endKey]] [<column1> <column2>...]");
System.err.println("For performance consider the following options:\n"
+ "-Dhbase.client.scanner.caching=100\n"

View File

@ -107,10 +107,127 @@ public class TestCellCounter {
assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1"));
}finally{
t.close();
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
}
/**
* Test CellCounter with time range all data should print to output
*/
@Test (timeout=300000)
public void testCellCounterStartTimeRange() throws Exception {
String sourceTable = "testCellCounterStartTimeRange";
byte[][] families = { FAMILY_A, FAMILY_B };
Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families);
try{
Put p = new Put(ROW1);
p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11"));
p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12"));
p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13"));
t.put(p);
p = new Put(ROW2);
p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21"));
p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22"));
p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23"));
t.put(p);
String[] args = {
sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1", "--starttime=" + now,
"--endtime=" + now + 2 };
runCount(args);
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
"part-r-00000");
String data = IOUtils.toString(inputStream);
inputStream.close();
assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total ROWS" + "\t" + "1"));
assertTrue(data.contains("b;q" + "\t" + "1"));
assertTrue(data.contains("a;q" + "\t" + "1"));
assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1"));
assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1"));
}finally{
t.close();
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
}
/**
* Test CellCounter with time range all data should print to output
*/
@Test (timeout=300000)
public void testCellCounteEndTimeRange() throws Exception {
String sourceTable = "testCellCounterEndTimeRange";
byte[][] families = { FAMILY_A, FAMILY_B };
Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families);
try{
Put p = new Put(ROW1);
p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11"));
p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12"));
p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13"));
t.put(p);
p = new Put(ROW2);
p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21"));
p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22"));
p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23"));
t.put(p);
String[] args = {
sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1", "--endtime=" + now + 1 };
runCount(args);
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
"part-r-00000");
String data = IOUtils.toString(inputStream);
inputStream.close();
assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total ROWS" + "\t" + "1"));
assertTrue(data.contains("b;q" + "\t" + "1"));
assertTrue(data.contains("a;q" + "\t" + "1"));
assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1"));
assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1"));
}finally{
t.close();
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
}
/**
* Test CellCounter with time range all data should print to output
*/
@Test (timeout=300000)
public void testCellCounteOutOfTimeRange() throws Exception {
String sourceTable = "testCellCounterOutTimeRange";
byte[][] families = { FAMILY_A, FAMILY_B };
Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families);
try{
Put p = new Put(ROW1);
p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11"));
p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12"));
p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13"));
t.put(p);
p = new Put(ROW2);
p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21"));
p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22"));
p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23"));
t.put(p);
String[] args = {
sourceTable, FQ_OUTPUT_DIR.toString(), ";", "--starttime=" + now + 1,
"--endtime=" + now + 2 };
runCount(args);
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
"part-r-00000");
String data = IOUtils.toString(inputStream);
inputStream.close();
// nothing should hace been emitted to the reducer
assertTrue(data.isEmpty());
}finally{
t.close();
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
}
private boolean runCount(String[] args) throws IOException, InterruptedException,
ClassNotFoundException {
// need to make a copy of the configuration because to make sure

View File

@ -25,7 +25,9 @@ import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,7 +35,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
import org.apache.hadoop.hbase.util.Bytes;
@ -52,20 +59,29 @@ import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestRowCounter {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static String TABLE_NAME = "testRowCounter";
private final static String COL_FAM = "col_fam";
private final static String COL1 = "c1";
private final static String COL2 = "c2";
private final static String COMPOSITE_COLUMN = "C:A:A";
private final static int TOTAL_ROWS = 10;
private final static int ROWS_WITH_ONE_COL = 2;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
public static void setUpBeforeClass()
throws Exception {
TEST_UTIL.startMiniCluster();
TEST_UTIL.startMiniMapReduceCluster();
Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
@ -77,7 +93,8 @@ public class TestRowCounter {
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
public static void tearDownAfterClass()
throws Exception {
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.shutdownMiniMapReduceCluster();
}
@ -88,10 +105,9 @@ public class TestRowCounter {
* @throws Exception
*/
@Test
public void testRowCounterNoColumn() throws Exception {
String[] args = new String[] {
TABLE_NAME
};
public void testRowCounterNoColumn()
throws Exception {
String[] args = new String[] {TABLE_NAME};
runRowCount(args, 10);
}
@ -102,10 +118,9 @@ public class TestRowCounter {
* @throws Exception
*/
@Test
public void testRowCounterExclusiveColumn() throws Exception {
String[] args = new String[] {
TABLE_NAME, COL_FAM + ":" + COL1
};
public void testRowCounterExclusiveColumn()
throws Exception {
String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1};
runRowCount(args, 8);
}
@ -116,10 +131,9 @@ public class TestRowCounter {
* @throws Exception
*/
@Test
public void testRowCounterColumnWithColonInQualifier() throws Exception {
String[] args = new String[] {
TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
};
public void testRowCounterColumnWithColonInQualifier()
throws Exception {
String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN};
runRowCount(args, 8);
}
@ -130,13 +144,59 @@ public class TestRowCounter {
* @throws Exception
*/
@Test
public void testRowCounterHiddenColumn() throws Exception {
String[] args = new String[] {
TABLE_NAME, COL_FAM + ":" + COL2
};
public void testRowCounterHiddenColumn()
throws Exception {
String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL2};
runRowCount(args, 10);
}
/**
* Test a case when the timerange is specified with --starttime and --endtime options
*
* @throws Exception
*/
@Test
public void testRowCounterTimeRange()
throws Exception {
final byte[] family = Bytes.toBytes(COL_FAM);
final byte[] col1 = Bytes.toBytes(COL1);
Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
long ts;
// clean up content of TABLE_NAME
HTable table = TEST_UTIL.truncateTable(TableName.valueOf(TABLE_NAME));
ts = System.currentTimeMillis();
put1.add(family, col1, ts, Bytes.toBytes("val1"));
table.put(put1);
Thread.sleep(100);
ts = System.currentTimeMillis();
put2.add(family, col1, ts, Bytes.toBytes("val2"));
put3.add(family, col1, ts, Bytes.toBytes("val3"));
table.put(put2);
table.put(put3);
table.close();
String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
"--endtime=" + ts};
runRowCount(args, 1);
args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
"--endtime=" + (ts - 10)};
runRowCount(args, 1);
args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + ts,
"--endtime=" + (ts + 1000)};
runRowCount(args, 2);
args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000),
"--endtime=" + (ts + 30 * 1000),};
runRowCount(args, 3);
}
/**
* Run the RowCounter map reduce job and verify the row count.
*
@ -144,16 +204,15 @@ public class TestRowCounter {
* @param expectedCount the expected row count (result of map reduce job).
* @throws Exception
*/
private void runRowCount(String[] args, int expectedCount) throws Exception {
GenericOptionsParser opts = new GenericOptionsParser(
TEST_UTIL.getConfiguration(), args);
private void runRowCount(String[] args, int expectedCount)
throws Exception {
GenericOptionsParser opts = new GenericOptionsParser(TEST_UTIL.getConfiguration(), args);
Configuration conf = opts.getConfiguration();
args = opts.getRemainingArgs();
Job job = RowCounter.createSubmittableJob(conf, args);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
Counter counter = job.getCounters().findCounter(
RowCounterMapper.Counters.ROWS);
Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
assertEquals(expectedCount, counter.getValue());
}
@ -164,7 +223,8 @@ public class TestRowCounter {
* @param table
* @throws IOException
*/
private static void writeRows(Table table) throws IOException {
private static void writeRows(Table table)
throws IOException {
final byte[] family = Bytes.toBytes(COL_FAM);
final byte[] value = Bytes.toBytes("abcd");
final byte[] col1 = Bytes.toBytes(COL1);
@ -196,7 +256,8 @@ public class TestRowCounter {
* test main method. Import should print help and call System.exit
*/
@Test
public void testImportMain() throws Exception {
public void testImportMain()
throws Exception {
PrintStream oldPrintStream = System.err;
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
@ -213,8 +274,9 @@ public class TestRowCounter {
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("Wrong number of parameters:"));
assertTrue(data.toString().contains(
"Usage: RowCounter [options] <tablename> [--range=[startKey],[endKey]] " +
assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
"[--range=[startKey],[endKey]] " +
"[<column1> <column2>...]"));
assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
@ -228,11 +290,12 @@ public class TestRowCounter {
fail("should be SecurityException");
} catch (SecurityException e) {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains(
"Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
assertTrue(data.toString().contains("Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
" \"--range=,b\" or \"--range=a,\""));
assertTrue(data.toString().contains(
"Usage: RowCounter [options] <tablename> [--range=[startKey],[endKey]] " +
assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
"[--range=[startKey],[endKey]] " +
"[<column1> <column2>...]"));
}
@ -242,5 +305,4 @@ public class TestRowCounter {
}
}
}