HBASE-15773 Improvements to CellCounter job

This commit is contained in:
Gary Helmling 2016-05-05 12:40:47 -07:00
parent d90f0571e6
commit 86ca09e0e5
2 changed files with 131 additions and 86 deletions

View File

@ -92,7 +92,30 @@ public class CellCounter extends Configured implements Tool {
* Counter enumeration to count the actual rows.
*/
public static enum Counters {
ROWS
ROWS,
CELLS
}
private Configuration conf;
private String separator;
// state of current row, family, column needs to persist across map() invocations
// in order to properly handle scanner batching, where a single qualifier may have too
// many versions for a single map() call
private byte[] lastRow;
private String currentRowKey;
byte[] currentFamily = null;
String currentFamilyName = null;
byte[] currentQualifier = null;
// family + qualifier
String currentQualifierName = null;
// rowkey + family + qualifier
String currentRowQualifierName = null;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
separator = conf.get("ReportSeparator",":");
}
/**
@ -112,49 +135,45 @@ public class CellCounter extends Configured implements Tool {
throws IOException {
Preconditions.checkState(values != null,
"values passed to the map is null");
String currentFamilyName = null;
String currentQualifierName = null;
String currentRowKey = null;
Configuration config = context.getConfiguration();
String separator = config.get("ReportSeparator",":");
try {
context.getCounter(Counters.ROWS).increment(1);
context.write(new Text("Total ROWS"), new IntWritable(1));
if (values != null && !values.isEmpty()) {
byte[] currentRow = values.getRow();
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
lastRow = currentRow;
currentRowKey = Bytes.toStringBinary(currentRow);
currentFamily = null;
currentQualifier = null;
context.getCounter(Counters.ROWS).increment(1);
context.write(new Text("Total ROWS"), new IntWritable(1));
}
if (!values.isEmpty()) {
int cellCount = 0;
for (Cell value : values.listCells()) {
currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
if (!thisRowFamilyName.equals(currentFamilyName)) {
currentFamilyName = thisRowFamilyName;
context.getCounter("CF", thisRowFamilyName).increment(1);
if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) {
cellCount++;
if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) {
currentFamily = CellUtil.cloneFamily(value);
currentFamilyName = Bytes.toStringBinary(currentFamily);
currentQualifier = null;
context.getCounter("CF", currentFamilyName).increment(1);
if (1 == context.getCounter("CF", currentFamilyName).getValue()) {
context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
context.write(new Text(thisRowFamilyName), new IntWritable(1));
context.write(new Text(currentFamily), new IntWritable(1));
}
}
String thisRowQualifierName = thisRowFamilyName + separator
+ Bytes.toStringBinary(CellUtil.cloneQualifier(value));
if (!thisRowQualifierName.equals(currentQualifierName)) {
currentQualifierName = thisRowQualifierName;
context.getCounter("CFQL", thisRowQualifierName).increment(1);
if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) {
currentQualifier = CellUtil.cloneQualifier(value);
currentQualifierName = currentFamilyName + separator +
Bytes.toStringBinary(currentQualifier);
currentRowQualifierName = currentRowKey + separator + currentQualifierName;
context.write(new Text("Total Qualifiers across all Rows"),
new IntWritable(1));
context.write(new Text(thisRowQualifierName), new IntWritable(1));
// Intialize versions
context.getCounter("QL_VERSIONS", currentRowKey + separator +
thisRowQualifierName).increment(1);
context.write(new Text(currentRowKey + separator
+ thisRowQualifierName + "_Versions"), new IntWritable(1));
} else {
// Increment versions
currentQualifierName = thisRowQualifierName;
context.getCounter("QL_VERSIONS", currentRowKey + separator +
thisRowQualifierName).increment(1);
context.write(new Text(currentRowKey + separator
+ thisRowQualifierName + "_Versions"), new IntWritable(1));
context.write(new Text(currentQualifierName), new IntWritable(1));
}
// Increment versions
context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1));
}
context.getCounter(Counters.CELLS).increment(cellCount);
}
} catch (InterruptedException e) {
e.printStackTrace();
@ -208,15 +227,16 @@ public class CellCounter extends Configured implements Tool {
return job;
}
private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
Scan s = new Scan();
private static Scan getConfiguredScanForJob(Configuration conf, String[] args)
throws IOException {
// create scan with any properties set from TableInputFormat
Scan s = TableInputFormat.createScanFromConfiguration(conf);
// Set Scan Versions
s.setMaxVersions(Integer.MAX_VALUE);
s.setCacheBlocks(false);
// Set Scan Column Family
if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) {
// default to all versions unless explicitly set
s.setMaxVersions(Integer.MAX_VALUE);
}
s.setCacheBlocks(false);
// Set RowFilter or Prefix Filter if applicable.
Filter rowFilter = getRowFilter(args);
if (rowFilter!= null) {
@ -277,9 +297,18 @@ public class CellCounter extends Configured implements Tool {
System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
"[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" Additionally, the following SCAN properties can be specified");
System.err.println(" to get fine grained control on what is counted..");
System.err.println(" Additionally, all of the SCAN properties from TableInputFormat");
System.err.println(" can be specified to get fine grained control on what is counted..");
System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>");
System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\"");
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
System.err.println(" -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>");
System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>");
System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>");
System.err.println(" -D " + TableInputFormat.SCAN_MAXVERSIONS + "=<count>");
System.err.println(" -D " + TableInputFormat.SCAN_CACHEDROWS + "=<count>");
System.err.println(" -D " + TableInputFormat.SCAN_BATCHSIZE + "=<count>");
System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
"string : used to separate the rowId/column family name and qualifier name.");
System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +

View File

@ -126,48 +126,7 @@ implements Configurable {
}
} else {
try {
scan = new Scan();
if (conf.get(SCAN_ROW_START) != null) {
scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
}
if (conf.get(SCAN_ROW_STOP) != null) {
scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
}
if (conf.get(SCAN_COLUMNS) != null) {
addColumns(scan, conf.get(SCAN_COLUMNS));
}
if (conf.get(SCAN_COLUMN_FAMILY) != null) {
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
}
if (conf.get(SCAN_TIMESTAMP) != null) {
scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
}
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
scan.setTimeRange(
Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
}
if (conf.get(SCAN_MAXVERSIONS) != null) {
scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
}
if (conf.get(SCAN_CACHEDROWS) != null) {
scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
}
if (conf.get(SCAN_BATCHSIZE) != null) {
scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
}
// false by default, full table scans generate too much BC churn
scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
scan = createScanFromConfiguration(conf);
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
@ -176,6 +135,63 @@ implements Configurable {
setScan(scan);
}
/**
* Sets up a {@link Scan} instance, applying settings from the configuration property
* constants defined in {@code TableInputFormat}. This allows specifying things such as:
* <ul>
* <li>start and stop rows</li>
* <li>column qualifiers or families</li>
* <li>timestamps or timerange</li>
* <li>scanner caching and batch size</li>
* </ul>
*/
public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
Scan scan = new Scan();
if (conf.get(SCAN_ROW_START) != null) {
scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
}
if (conf.get(SCAN_ROW_STOP) != null) {
scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
}
if (conf.get(SCAN_COLUMNS) != null) {
addColumns(scan, conf.get(SCAN_COLUMNS));
}
if (conf.get(SCAN_COLUMN_FAMILY) != null) {
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
}
if (conf.get(SCAN_TIMESTAMP) != null) {
scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
}
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
scan.setTimeRange(
Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
}
if (conf.get(SCAN_MAXVERSIONS) != null) {
scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
}
if (conf.get(SCAN_CACHEDROWS) != null) {
scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
}
if (conf.get(SCAN_BATCHSIZE) != null) {
scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
}
// false by default, full table scans generate too much BC churn
scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
return scan;
}
@Override
protected void initialize(JobContext context) throws IOException {
// Do we have to worry about mis-matches between the Configuration from setConf and the one