HBASE-12447 Add support for setTimeRange for RowCounter and CellCounter

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Esteban Gutierrez 2014-11-11 00:22:54 -08:00 committed by stack
parent e287741388
commit 6c2a299657
4 changed files with 245 additions and 11 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@ -222,6 +224,12 @@ public class CellCounter extends Configured implements Tool {
LOG.info("Setting Row Filter for counter.");
s.setFilter(rowFilter);
}
// Set TimeRange if defined
long timeRange[] = getTimeRange(args);
if (timeRange != null) {
LOG.info("Setting TimeRange for counter.");
s.setTimeRange(timeRange[0], timeRange[1]);
}
return s;
}
@ -239,13 +247,37 @@ public class CellCounter extends Configured implements Tool {
return rowFilter;
}
private static long[] getTimeRange(String[] args) throws IOException {
final String startTimeArgKey = "--starttime=";
final String endTimeArgKey = "--endtime=";
long startTime = 0L;
long endTime = 0L;
for (int i = 1; i < args.length; i++) {
System.out.println("i:" + i + "arg[i]" + args[i]);
if (args[i].startsWith(startTimeArgKey)) {
startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
}
if (args[i].startsWith(endTimeArgKey)) {
endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
}
}
if (startTime == 0 && endTime == 0)
return null;
endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime;
return new long [] {startTime, endTime};
}
@Override
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
System.err.println("Usage: CellCounter <tablename> <outputDir> <reportSeparator> " +
"[^[regex pattern] or [Prefix] for row filter]] ");
System.err.println("Usage: CellCounter ");
System.err.println(" <outputDir> <reportSeparator> [^[regex pattern] or " +
"[Prefix] for row filter]] <tablename> --starttime=[starttime] --endtime=[endtime]");
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" Additionally, the following SCAN properties can be specified");
System.err.println(" to get fine grained control on what is counted..");

View File

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -94,9 +95,14 @@ public class RowCounter extends Configured implements Tool {
String tableName = args[0];
String startKey = null;
String endKey = null;
long startTime = 0;
long endTime = 0;
StringBuilder sb = new StringBuilder();
final String rangeSwitch = "--range=";
final String startTimeArgKey = "--starttime=";
final String endTimeArgKey = "--endtime=";
// First argument is table name, starting from second
for (int i = 1; i < args.length; i++) {
@ -110,6 +116,18 @@ public class RowCounter extends Configured implements Tool {
startKey = startEnd[0];
endKey = startEnd[1];
}
if (startTime < endTime) {
printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
return null;
}
if (args[i].startsWith(startTimeArgKey)) {
startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
continue;
}
if (args[i].startsWith(endTimeArgKey)) {
endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
continue;
}
else {
// if no switch, assume column names
sb.append(args[i]);
@ -149,6 +167,7 @@ public class RowCounter extends Configured implements Tool {
} else {
scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
}
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
@ -169,6 +188,7 @@ public class RowCounter extends Configured implements Tool {
*/
private static void printUsage() {
System.err.println("Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
"[--range=[startKey],[endKey]] [<column1> <column2>...]");
System.err.println("For performance consider the following options:\n"
+ "-Dhbase.client.scanner.caching=100\n"

View File

@ -77,7 +77,7 @@ public class TestCellCounter {
/**
* Test CellCounter all data should print to output
*
*
*/
@Test (timeout=300000)
public void testCellCounter() throws Exception {
@ -97,7 +97,7 @@ public class TestCellCounter {
t.put(p);
String[] args = { sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1" };
runCount(args);
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
"part-r-00000");
String data = IOUtils.toString(inputStream);
inputStream.close();
@ -110,10 +110,127 @@ public class TestCellCounter {
assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1"));
}finally{
t.close();
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
}
/**
* Test CellCounter with time range all data should print to output
*/
@Test (timeout=300000)
public void testCellCounterStartTimeRange() throws Exception {
String sourceTable = "testCellCounterStartTimeRange";
byte[][] families = { FAMILY_A, FAMILY_B };
Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families);
try{
Put p = new Put(ROW1);
p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11"));
p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12"));
p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13"));
t.put(p);
p = new Put(ROW2);
p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21"));
p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22"));
p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23"));
t.put(p);
String[] args = {
sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1", "--starttime=" + now,
"--endtime=" + now + 2 };
runCount(args);
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
"part-r-00000");
String data = IOUtils.toString(inputStream);
inputStream.close();
assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total ROWS" + "\t" + "1"));
assertTrue(data.contains("b;q" + "\t" + "1"));
assertTrue(data.contains("a;q" + "\t" + "1"));
assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1"));
assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1"));
}finally{
t.close();
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
}
/**
* Test CellCounter with time range all data should print to output
*/
@Test (timeout=300000)
public void testCellCounteEndTimeRange() throws Exception {
String sourceTable = "testCellCounterEndTimeRange";
byte[][] families = { FAMILY_A, FAMILY_B };
Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families);
try{
Put p = new Put(ROW1);
p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11"));
p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12"));
p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13"));
t.put(p);
p = new Put(ROW2);
p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21"));
p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22"));
p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23"));
t.put(p);
String[] args = {
sourceTable, FQ_OUTPUT_DIR.toString(), ";", "^row1", "--endtime=" + now + 1 };
runCount(args);
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
"part-r-00000");
String data = IOUtils.toString(inputStream);
inputStream.close();
assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2"));
assertTrue(data.contains("Total ROWS" + "\t" + "1"));
assertTrue(data.contains("b;q" + "\t" + "1"));
assertTrue(data.contains("a;q" + "\t" + "1"));
assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1"));
assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1"));
}finally{
t.close();
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
}
/**
* Test CellCounter with time range all data should print to output
*/
@Test (timeout=300000)
public void testCellCounteOutOfTimeRange() throws Exception {
String sourceTable = "testCellCounterOutTimeRange";
byte[][] families = { FAMILY_A, FAMILY_B };
Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families);
try{
Put p = new Put(ROW1);
p.add(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11"));
p.add(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12"));
p.add(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13"));
t.put(p);
p = new Put(ROW2);
p.add(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21"));
p.add(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22"));
p.add(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23"));
t.put(p);
String[] args = {
sourceTable, FQ_OUTPUT_DIR.toString(), ";", "--starttime=" + now + 1,
"--endtime=" + now + 2 };
runCount(args);
FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator +
"part-r-00000");
String data = IOUtils.toString(inputStream);
inputStream.close();
// nothing should hace been emitted to the reducer
assertTrue(data.isEmpty());
}finally{
t.close();
FileUtil.fullyDelete(new File(OUTPUT_DIR));
}
}
private boolean runCount(String[] args) throws IOException, InterruptedException,
ClassNotFoundException {
// need to make a copy of the configuration because to make sure

View File

@ -25,13 +25,16 @@ import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.client.Put;
@ -85,7 +88,7 @@ public class TestRowCounter {
/**
* Test a case when no column was specified in command line arguments.
*
*
* @throws Exception
*/
@Test
@ -99,7 +102,7 @@ public class TestRowCounter {
/**
* Test a case when the column specified in command line arguments is
* exclusive for few rows.
*
*
* @throws Exception
*/
@Test
@ -127,7 +130,7 @@ public class TestRowCounter {
/**
* Test a case when the column specified in command line arguments is not part
* of first KV for a row.
*
*
* @throws Exception
*/
@Test
@ -138,9 +141,67 @@ public class TestRowCounter {
runRowCount(args, 10);
}
/**
* Test a case when the timerange is specified with --starttime and --endtime options
*
* @throws Exception
*/
@Test
public void testRowCounterTimeRange() throws Exception {
final byte[] family = Bytes.toBytes(COL_FAM);
final byte[] col1 = Bytes.toBytes(COL1);
Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
long ts;
// clean up content of TABLE_NAME
HTable table = TEST_UTIL.deleteTableData(TableName.valueOf(TABLE_NAME));
ts = System.currentTimeMillis();
put1.add(family, col1, ts, Bytes.toBytes("val1"));
table.put(put1);
Thread.sleep(100);
ts = System.currentTimeMillis();
put2.add(family, col1, ts, Bytes.toBytes("val2"));
put3.add(family, col1, ts, Bytes.toBytes("val3"));
table.put(put2);
table.put(put3);
table.close();
String[] args = new String[] {
TABLE_NAME, COL_FAM + ":" + COL1,
"--starttime=" + 0,
"--endtime=" + ts
};
runRowCount(args, 1);
args = new String[] {
TABLE_NAME, COL_FAM + ":" + COL1,
"--starttime=" + 0,
"--endtime=" + (ts - 10)
};
runRowCount(args, 1);
args = new String[] {
TABLE_NAME, COL_FAM + ":" + COL1,
"--starttime=" + ts,
"--endtime=" + (ts + 1000)
};
runRowCount(args, 2);
args = new String[] {
TABLE_NAME, COL_FAM + ":" + COL1,
"--starttime=" + (ts - 30 * 1000),
"--endtime=" + (ts + 30 * 1000),
};
runRowCount(args, 3);
}
/**
* Run the RowCounter map reduce job and verify the row count.
*
*
* @param args the command line arguments to be used for rowcounter job.
* @param expectedCount the expected row count (result of map reduce job).
* @throws Exception
@ -161,7 +222,7 @@ public class TestRowCounter {
/**
* Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
* two columns, Few have one.
*
*
* @param table
* @throws IOException
*/
@ -215,7 +276,9 @@ public class TestRowCounter {
assertEquals(-1, newSecurityManager.getExitCode());
assertTrue(data.toString().contains("Wrong number of parameters:"));
assertTrue(data.toString().contains(
"Usage: RowCounter [options] <tablename> [--range=[startKey],[endKey]] " +
"Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
"[--range=[startKey],[endKey]] " +
"[<column1> <column2>...]"));
assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
@ -233,7 +296,9 @@ public class TestRowCounter {
"Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
" \"--range=,b\" or \"--range=a,\""));
assertTrue(data.toString().contains(
"Usage: RowCounter [options] <tablename> [--range=[startKey],[endKey]] " +
"Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
"[--range=[startKey],[endKey]] " +
"[<column1> <column2>...]"));
}