HBASE-15773 Improvements to CellCounter job
This commit is contained in:
parent
0964884b92
commit
e44c360350
|
@ -88,7 +88,30 @@ public class CellCounter {
|
|||
* Counter enumeration to count the actual rows.
|
||||
*/
|
||||
public static enum Counters {
|
||||
ROWS
|
||||
ROWS,
|
||||
CELLS
|
||||
}
|
||||
|
||||
private Configuration conf;
|
||||
private String separator;
|
||||
|
||||
// state of current row, family, column needs to persist across map() invocations
|
||||
// in order to properly handle scanner batching, where a single qualifier may have too
|
||||
// many versions for a single map() call
|
||||
private byte[] lastRow;
|
||||
private String currentRowKey;
|
||||
byte[] currentFamily = null;
|
||||
String currentFamilyName = null;
|
||||
byte[] currentQualifier = null;
|
||||
// family + qualifier
|
||||
String currentQualifierName = null;
|
||||
// rowkey + family + qualifier
|
||||
String currentRowQualifierName = null;
|
||||
|
||||
@Override
|
||||
protected void setup(Context context) throws IOException, InterruptedException {
|
||||
conf = context.getConfiguration();
|
||||
separator = conf.get("ReportSeparator",":");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -108,48 +131,45 @@ public class CellCounter {
|
|||
throws IOException {
|
||||
Preconditions.checkState(values != null,
|
||||
"values passed to the map is null");
|
||||
String currentFamilyName = null;
|
||||
String currentQualifierName = null;
|
||||
String currentRowKey = null;
|
||||
Configuration config = context.getConfiguration();
|
||||
String separator = config.get("ReportSeparator",":");
|
||||
|
||||
try {
|
||||
context.getCounter(Counters.ROWS).increment(1);
|
||||
context.write(new Text("Total ROWS"), new IntWritable(1));
|
||||
|
||||
for (Cell value : values.listCells()) {
|
||||
currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
|
||||
String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
|
||||
if (!thisRowFamilyName.equals(currentFamilyName)) {
|
||||
currentFamilyName = thisRowFamilyName;
|
||||
context.getCounter("CF", thisRowFamilyName).increment(1);
|
||||
if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) {
|
||||
context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
|
||||
context.write(new Text(thisRowFamilyName), new IntWritable(1));
|
||||
byte[] currentRow = values.getRow();
|
||||
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
|
||||
lastRow = currentRow;
|
||||
currentRowKey = Bytes.toStringBinary(currentRow);
|
||||
currentFamily = null;
|
||||
currentQualifier = null;
|
||||
context.getCounter(Counters.ROWS).increment(1);
|
||||
context.write(new Text("Total ROWS"), new IntWritable(1));
|
||||
}
|
||||
if (!values.isEmpty()) {
|
||||
int cellCount = 0;
|
||||
for (Cell value : values.listCells()) {
|
||||
cellCount++;
|
||||
if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) {
|
||||
currentFamily = CellUtil.cloneFamily(value);
|
||||
currentFamilyName = Bytes.toStringBinary(currentFamily);
|
||||
currentQualifier = null;
|
||||
context.getCounter("CF", currentFamilyName).increment(1);
|
||||
if (1 == context.getCounter("CF", currentFamilyName).getValue()) {
|
||||
context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
|
||||
context.write(new Text(currentFamily), new IntWritable(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
String thisRowQualifierName = thisRowFamilyName + separator
|
||||
+ Bytes.toStringBinary(CellUtil.cloneQualifier(value));
|
||||
if (!thisRowQualifierName.equals(currentQualifierName)) {
|
||||
currentQualifierName = thisRowQualifierName;
|
||||
context.getCounter("CFQL", thisRowQualifierName).increment(1);
|
||||
context.write(new Text("Total Qualifiers across all Rows"),
|
||||
new IntWritable(1));
|
||||
context.write(new Text(thisRowQualifierName), new IntWritable(1));
|
||||
// Intialize versions
|
||||
context.getCounter("QL_VERSIONS", currentRowKey + separator +
|
||||
thisRowQualifierName).increment(1);
|
||||
context.write(new Text(currentRowKey + separator
|
||||
+ thisRowQualifierName + "_Versions"), new IntWritable(1));
|
||||
if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) {
|
||||
currentQualifier = CellUtil.cloneQualifier(value);
|
||||
currentQualifierName = currentFamilyName + separator +
|
||||
Bytes.toStringBinary(currentQualifier);
|
||||
currentRowQualifierName = currentRowKey + separator + currentQualifierName;
|
||||
|
||||
} else {
|
||||
context.write(new Text("Total Qualifiers across all Rows"),
|
||||
new IntWritable(1));
|
||||
context.write(new Text(currentQualifierName), new IntWritable(1));
|
||||
}
|
||||
// Increment versions
|
||||
currentQualifierName = thisRowQualifierName;
|
||||
context.getCounter("QL_VERSIONS", currentRowKey + separator +
|
||||
thisRowQualifierName).increment(1);
|
||||
context.write(new Text(currentRowKey + separator
|
||||
+ thisRowQualifierName + "_Versions"), new IntWritable(1));
|
||||
context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1));
|
||||
}
|
||||
context.getCounter(Counters.CELLS).increment(cellCount);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
|
@ -203,15 +223,16 @@ public class CellCounter {
|
|||
return job;
|
||||
}
|
||||
|
||||
private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
|
||||
Scan s = new Scan();
|
||||
private static Scan getConfiguredScanForJob(Configuration conf, String[] args)
|
||||
throws IOException {
|
||||
// create scan with any properties set from TableInputFormat
|
||||
Scan s = TableInputFormat.createScanFromConfiguration(conf);
|
||||
// Set Scan Versions
|
||||
s.setMaxVersions(Integer.MAX_VALUE);
|
||||
s.setCacheBlocks(false);
|
||||
// Set Scan Column Family
|
||||
if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
|
||||
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
|
||||
if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) {
|
||||
// default to all versions unless explicitly set
|
||||
s.setMaxVersions(Integer.MAX_VALUE);
|
||||
}
|
||||
s.setCacheBlocks(false);
|
||||
// Set RowFilter or Prefix Filter if applicable.
|
||||
Filter rowFilter = getRowFilter(args);
|
||||
if (rowFilter!= null) {
|
||||
|
@ -278,9 +299,18 @@ public class CellCounter {
|
|||
System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
|
||||
"[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
|
||||
System.err.println(" Note: -D properties will be applied to the conf used. ");
|
||||
System.err.println(" Additionally, the following SCAN properties can be specified");
|
||||
System.err.println(" to get fine grained control on what is counted..");
|
||||
System.err.println(" Additionally, all of the SCAN properties from TableInputFormat");
|
||||
System.err.println(" can be specified to get fine grained control on what is counted..");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\"");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_MAXVERSIONS + "=<count>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_CACHEDROWS + "=<count>");
|
||||
System.err.println(" -D " + TableInputFormat.SCAN_BATCHSIZE + "=<count>");
|
||||
System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
|
||||
"string : used to separate the rowId/column family name and qualifier name.");
|
||||
System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
|
||||
|
|
|
@ -126,48 +126,7 @@ implements Configurable {
|
|||
}
|
||||
} else {
|
||||
try {
|
||||
scan = new Scan();
|
||||
|
||||
if (conf.get(SCAN_ROW_START) != null) {
|
||||
scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_ROW_STOP) != null) {
|
||||
scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_COLUMNS) != null) {
|
||||
addColumns(scan, conf.get(SCAN_COLUMNS));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_COLUMN_FAMILY) != null) {
|
||||
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_TIMESTAMP) != null) {
|
||||
scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
|
||||
scan.setTimeRange(
|
||||
Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
|
||||
Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_MAXVERSIONS) != null) {
|
||||
scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_CACHEDROWS) != null) {
|
||||
scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_BATCHSIZE) != null) {
|
||||
scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
|
||||
}
|
||||
|
||||
// false by default, full table scans generate too much BC churn
|
||||
scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
|
||||
scan = createScanFromConfiguration(conf);
|
||||
} catch (Exception e) {
|
||||
LOG.error(StringUtils.stringifyException(e));
|
||||
}
|
||||
|
@ -176,6 +135,63 @@ implements Configurable {
|
|||
setScan(scan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up a {@link Scan} instance, applying settings from the configuration property
|
||||
* constants defined in {@code TableInputFormat}. This allows specifying things such as:
|
||||
* <ul>
|
||||
* <li>start and stop rows</li>
|
||||
* <li>column qualifiers or families</li>
|
||||
* <li>timestamps or timerange</li>
|
||||
* <li>scanner caching and batch size</li>
|
||||
* </ul>
|
||||
*/
|
||||
public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
|
||||
Scan scan = new Scan();
|
||||
|
||||
if (conf.get(SCAN_ROW_START) != null) {
|
||||
scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_ROW_STOP) != null) {
|
||||
scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_COLUMNS) != null) {
|
||||
addColumns(scan, conf.get(SCAN_COLUMNS));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_COLUMN_FAMILY) != null) {
|
||||
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_TIMESTAMP) != null) {
|
||||
scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
|
||||
scan.setTimeRange(
|
||||
Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
|
||||
Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_MAXVERSIONS) != null) {
|
||||
scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_CACHEDROWS) != null) {
|
||||
scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
|
||||
}
|
||||
|
||||
if (conf.get(SCAN_BATCHSIZE) != null) {
|
||||
scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
|
||||
}
|
||||
|
||||
// false by default, full table scans generate too much BC churn
|
||||
scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
|
||||
|
||||
return scan;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initialize(JobContext context) throws IOException {
|
||||
// Do we have to worry about mis-matches between the Configuration from setConf and the one
|
||||
|
|
Loading…
Reference in New Issue