HBASE-21773 rowcounter utility should respond to pleas for help
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
9f961ea1d8
commit
f160dd3c10
|
@ -67,7 +67,7 @@ public abstract class AbstractHBaseTool implements Tool {
|
|||
private HashMap<Option, Integer> optionsOrder = new HashMap<>();
|
||||
private int optionsCount = 0;
|
||||
|
||||
private class OptionsOrderComparator implements Comparator<Option> {
|
||||
public class OptionsOrderComparator implements Comparator<Option> {
|
||||
@Override
|
||||
public int compare(Option o1, Option o2) {
|
||||
return optionsOrder.get(o1) - optionsOrder.get(o2);
|
||||
|
|
|
@ -24,12 +24,18 @@ import java.util.ArrayList;
|
|||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
|
@ -40,15 +46,13 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* A job with a just a map phase to count rows. Map outputs table rows IF the
|
||||
* input row has columns that have content.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class RowCounter extends Configured implements Tool {
|
||||
public class RowCounter extends AbstractHBaseTool {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class);
|
||||
|
||||
|
@ -58,6 +62,18 @@ public class RowCounter extends Configured implements Tool {
|
|||
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||
private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count";
|
||||
|
||||
private final static String OPT_START_TIME = "starttime";
|
||||
private final static String OPT_END_TIME = "endtime";
|
||||
private final static String OPT_RANGE = "range";
|
||||
private final static String OPT_EXPECTED_COUNT = "expectedCount";
|
||||
|
||||
private String tableName;
|
||||
private List<MultiRowRangeFilter.RowRange> rowRangeList;
|
||||
private long startTime;
|
||||
private long endTime;
|
||||
private long expectedCount;
|
||||
private List<String> columns = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* Mapper that runs the count.
|
||||
*/
|
||||
|
@ -89,75 +105,31 @@ public class RowCounter extends Configured implements Tool {
|
|||
* Sets up the actual job.
|
||||
*
|
||||
* @param conf The current configuration.
|
||||
* @param args The command line parameters.
|
||||
* @return The newly created job.
|
||||
* @throws IOException When setting up the job fails.
|
||||
*/
|
||||
public static Job createSubmittableJob(Configuration conf, String[] args)
|
||||
throws IOException {
|
||||
String tableName = args[0];
|
||||
List<MultiRowRangeFilter.RowRange> rowRangeList = null;
|
||||
long startTime = 0;
|
||||
long endTime = 0;
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
final String rangeSwitch = "--range=";
|
||||
final String startTimeArgKey = "--starttime=";
|
||||
final String endTimeArgKey = "--endtime=";
|
||||
final String expectedCountArg = "--expected-count=";
|
||||
|
||||
// First argument is table name, starting from second
|
||||
for (int i = 1; i < args.length; i++) {
|
||||
if (args[i].startsWith(rangeSwitch)) {
|
||||
try {
|
||||
rowRangeList = parseRowRangeParameter(args[i], rangeSwitch);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return null;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (args[i].startsWith(startTimeArgKey)) {
|
||||
startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
if (args[i].startsWith(endTimeArgKey)) {
|
||||
endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
if (args[i].startsWith(expectedCountArg)) {
|
||||
conf.setLong(EXPECTED_COUNT_KEY,
|
||||
Long.parseLong(args[i].substring(expectedCountArg.length())));
|
||||
continue;
|
||||
}
|
||||
// if no switch, assume column names
|
||||
sb.append(args[i]);
|
||||
sb.append(" ");
|
||||
}
|
||||
if (endTime < startTime) {
|
||||
printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
|
||||
return null;
|
||||
}
|
||||
|
||||
public Job createSubmittableJob(Configuration conf) throws IOException {
|
||||
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
|
||||
job.setJarByClass(RowCounter.class);
|
||||
Scan scan = new Scan();
|
||||
scan.setCacheBlocks(false);
|
||||
setScanFilter(scan, rowRangeList);
|
||||
if (sb.length() > 0) {
|
||||
for (String columnName : sb.toString().trim().split(" ")) {
|
||||
String family = StringUtils.substringBefore(columnName, ":");
|
||||
String qualifier = StringUtils.substringAfter(columnName, ":");
|
||||
|
||||
if (StringUtils.isBlank(qualifier)) {
|
||||
scan.addFamily(Bytes.toBytes(family));
|
||||
}
|
||||
else {
|
||||
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
|
||||
}
|
||||
for (String columnName : this.columns) {
|
||||
String family = StringUtils.substringBefore(columnName, ":");
|
||||
String qualifier = StringUtils.substringAfter(columnName, ":");
|
||||
if (StringUtils.isBlank(qualifier)) {
|
||||
scan.addFamily(Bytes.toBytes(family));
|
||||
} else {
|
||||
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
|
||||
}
|
||||
}
|
||||
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
|
||||
|
||||
if(this.expectedCount >= 0) {
|
||||
conf.setLong(EXPECTED_COUNT_KEY, this.expectedCount);
|
||||
}
|
||||
|
||||
scan.setTimeRange(startTime, endTime);
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
||||
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
|
||||
|
@ -165,22 +137,20 @@ public class RowCounter extends Configured implements Tool {
|
|||
return job;
|
||||
}
|
||||
|
||||
private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(
|
||||
String arg, String rangeSwitch) {
|
||||
final String[] ranges = arg.substring(rangeSwitch.length()).split(";");
|
||||
private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) {
|
||||
final List<String> rangesSplit = Splitter.on(";").splitToList(arg);
|
||||
final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>();
|
||||
for (String range : ranges) {
|
||||
String[] startEnd = range.split(",", 2);
|
||||
if (startEnd.length != 2 || startEnd[1].contains(",")) {
|
||||
printUsage("Please specify range in such format as \"--range=a,b\" " +
|
||||
"or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
|
||||
throw new IllegalArgumentException("Wrong range specification: " + range);
|
||||
for (String range : rangesSplit) {
|
||||
if(range!=null && !range.isEmpty()) {
|
||||
List<String> startEnd = Splitter.on(",").splitToList(range);
|
||||
if (startEnd.size() != 2 || startEnd.get(1).contains(",")) {
|
||||
throw new IllegalArgumentException("Wrong range specification: " + range);
|
||||
}
|
||||
String startKey = startEnd.get(0);
|
||||
String endKey = startEnd.get(1);
|
||||
rangeList.add(new MultiRowRangeFilter.RowRange(Bytes.toBytesBinary(startKey),
|
||||
true, Bytes.toBytesBinary(endKey), false));
|
||||
}
|
||||
String startKey = startEnd[0];
|
||||
String endKey = startEnd[1];
|
||||
rangeList.add(new MultiRowRangeFilter.RowRange(
|
||||
Bytes.toBytesBinary(startKey), true,
|
||||
Bytes.toBytesBinary(endKey), false));
|
||||
}
|
||||
return rangeList;
|
||||
}
|
||||
|
@ -208,34 +178,87 @@ public class RowCounter extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @param errorMessage Can attach a message when error occurs.
|
||||
*/
|
||||
private static void printUsage(String errorMessage) {
|
||||
System.err.println("ERROR: " + errorMessage);
|
||||
printUsage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints usage without error message.
|
||||
* Note that we don't document --expected-count, because it's intended for test.
|
||||
*/
|
||||
private static void printUsage() {
|
||||
System.err.println("Usage: hbase rowcounter [options] <tablename> "
|
||||
+ "[--starttime=<start> --endtime=<end>] "
|
||||
+ "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
|
||||
System.err.println("For performance consider the following options:\n"
|
||||
+ "-Dhbase.client.scanner.caching=100\n"
|
||||
+ "-Dmapreduce.map.speculative=false");
|
||||
@Override
|
||||
protected void printUsage() {
|
||||
StringBuilder footerBuilder = new StringBuilder();
|
||||
footerBuilder.append("For performance, consider the following configuration properties:\n");
|
||||
footerBuilder.append("-Dhbase.client.scanner.caching=100\n");
|
||||
footerBuilder.append("-Dmapreduce.map.speculative=false\n");
|
||||
printUsage("hbase rowcounter <tablename> [options] [<column1> <column2>...]",
|
||||
"Options:", footerBuilder.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length < 1) {
|
||||
printUsage("Wrong number of parameters: " + args.length);
|
||||
return -1;
|
||||
protected void printUsage(final String usageStr, final String usageHeader,
|
||||
final String usageFooter) {
|
||||
HelpFormatter helpFormatter = new HelpFormatter();
|
||||
helpFormatter.setWidth(120);
|
||||
helpFormatter.setOptionComparator(new AbstractHBaseTool.OptionsOrderComparator());
|
||||
helpFormatter.setLongOptSeparator("=");
|
||||
helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addOptions() {
|
||||
Option startTimeOption = Option.builder(null).valueSeparator('=').hasArg(true).
|
||||
desc("starting time filter to start counting rows from.").longOpt(OPT_START_TIME).build();
|
||||
Option endTimeOption = Option.builder(null).valueSeparator('=').hasArg(true).
|
||||
desc("end time filter limit, to only count rows up to this timestamp.").
|
||||
longOpt(OPT_END_TIME).build();
|
||||
Option rangeOption = Option.builder(null).valueSeparator('=').hasArg(true).
|
||||
desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build();
|
||||
Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true).
|
||||
desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build();
|
||||
addOption(startTimeOption);
|
||||
addOption(endTimeOption);
|
||||
addOption(rangeOption);
|
||||
addOption(expectedOption);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processOptions(CommandLine cmd) throws IllegalArgumentException{
|
||||
this.tableName = cmd.getArgList().get(0);
|
||||
if(cmd.getOptionValue(OPT_RANGE)!=null) {
|
||||
this.rowRangeList = parseRowRangeParameter(cmd.getOptionValue(OPT_RANGE));
|
||||
}
|
||||
Job job = createSubmittableJob(getConf(), args);
|
||||
this.endTime = cmd.getOptionValue(OPT_END_TIME) == null ? HConstants.LATEST_TIMESTAMP :
|
||||
Long.parseLong(cmd.getOptionValue(OPT_END_TIME));
|
||||
this.expectedCount = cmd.getOptionValue(OPT_EXPECTED_COUNT) == null ? Long.MIN_VALUE :
|
||||
Long.parseLong(cmd.getOptionValue(OPT_EXPECTED_COUNT));
|
||||
this.startTime = cmd.getOptionValue(OPT_START_TIME) == null ? 0 :
|
||||
Long.parseLong(cmd.getOptionValue(OPT_START_TIME));
|
||||
|
||||
for(int i=1; i<cmd.getArgList().size(); i++){
|
||||
String argument = cmd.getArgList().get(i);
|
||||
if(!argument.startsWith("-")){
|
||||
this.columns.add(argument);
|
||||
}
|
||||
}
|
||||
|
||||
if (endTime < startTime) {
|
||||
throw new IllegalArgumentException("--endtime=" + endTime +
|
||||
" needs to be greater than --starttime=" + startTime);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processOldArgs(List<String> args) {
|
||||
List<String> copiedArgs = new ArrayList<>(args);
|
||||
args.removeAll(copiedArgs);
|
||||
for(String arg : copiedArgs){
|
||||
if(arg.startsWith("-") && arg.contains("=")){
|
||||
String[] kv = arg.split("=");
|
||||
args.add(kv[0]);
|
||||
args.add(kv[1]);
|
||||
} else {
|
||||
args.add(arg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWork() throws Exception {
|
||||
Job job = createSubmittableJob(getConf());
|
||||
if (job == null) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -258,8 +281,22 @@ public class RowCounter extends Configured implements Tool {
|
|||
* @throws Exception When running the job fails.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args);
|
||||
System.exit(errCode);
|
||||
new RowCounter().doStaticMain(args);
|
||||
}
|
||||
|
||||
static class RowCounterCommandLineParser extends BasicParser {
|
||||
|
||||
@Override
|
||||
protected void checkRequiredOptions() throws MissingOptionException {
|
||||
if(this.cmd.getArgList().size()<1 || this.cmd.getArgList().get(0).startsWith("-")){
|
||||
throw new MissingOptionException("First argument must be a valid table name.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CommandLineParser newParser() {
|
||||
return new RowCounterCommandLineParser();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -34,8 +36,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
||||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -296,14 +296,15 @@ public class TestRowCounter {
|
|||
* @throws Exception
|
||||
*/
|
||||
private void runRowCount(String[] args, int expectedCount) throws Exception {
|
||||
Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
|
||||
RowCounter rowCounter = new RowCounter();
|
||||
rowCounter.setConf(TEST_UTIL.getConfiguration());
|
||||
args = Arrays.copyOf(args, args.length+1);
|
||||
args[args.length-1]="--expectedCount=" + expectedCount;
|
||||
long start = System.currentTimeMillis();
|
||||
job.waitForCompletion(true);
|
||||
int result = rowCounter.run(args);
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
LOG.debug("row count duration (ms): " + duration);
|
||||
assertTrue(job.isSuccessful());
|
||||
Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
|
||||
assertEquals(expectedCount, counter.getValue());
|
||||
assertTrue(result==0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -347,25 +348,17 @@ public class TestRowCounter {
|
|||
*/
|
||||
@Test
|
||||
public void testImportMain() throws Exception {
|
||||
PrintStream oldPrintStream = System.err;
|
||||
SecurityManager SECURITY_MANAGER = System.getSecurityManager();
|
||||
LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
|
||||
System.setSecurityManager(newSecurityManager);
|
||||
ByteArrayOutputStream data = new ByteArrayOutputStream();
|
||||
String[] args = {};
|
||||
System.setErr(new PrintStream(data));
|
||||
try {
|
||||
System.setErr(new PrintStream(data));
|
||||
|
||||
try {
|
||||
RowCounter.main(args);
|
||||
fail("should be SecurityException");
|
||||
} catch (SecurityException e) {
|
||||
assertEquals(-1, newSecurityManager.getExitCode());
|
||||
assertTrue(data.toString().contains("Wrong number of parameters:"));
|
||||
assertUsageContent(data.toString());
|
||||
assertEquals(RowCounter.EXIT_FAILURE, newSecurityManager.getExitCode());
|
||||
}
|
||||
data.reset();
|
||||
try {
|
||||
args = new String[2];
|
||||
args[0] = "table";
|
||||
|
@ -373,26 +366,58 @@ public class TestRowCounter {
|
|||
RowCounter.main(args);
|
||||
fail("should be SecurityException");
|
||||
} catch (SecurityException e) {
|
||||
assertEquals(-1, newSecurityManager.getExitCode());
|
||||
assertTrue(data.toString().contains(
|
||||
"Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
|
||||
" \"--range=,b\" or \"--range=a,\""));
|
||||
assertUsageContent(data.toString());
|
||||
assertEquals(RowCounter.EXIT_FAILURE, newSecurityManager.getExitCode());
|
||||
}
|
||||
|
||||
} finally {
|
||||
System.setErr(oldPrintStream);
|
||||
System.setSecurityManager(SECURITY_MANAGER);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHelp() throws Exception {
|
||||
PrintStream oldPrintStream = System.out;
|
||||
try {
|
||||
ByteArrayOutputStream data = new ByteArrayOutputStream();
|
||||
PrintStream stream = new PrintStream(data);
|
||||
System.setOut(stream);
|
||||
String[] args = {"-h"};
|
||||
runRowCount(args, 0);
|
||||
assertUsageContent(data.toString());
|
||||
args = new String[]{"--help"};
|
||||
runRowCount(args, 0);
|
||||
assertUsageContent(data.toString());
|
||||
}finally {
|
||||
System.setOut(oldPrintStream);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidTable() throws Exception {
|
||||
try {
|
||||
String[] args = {"invalid"};
|
||||
runRowCount(args, 0);
|
||||
fail("RowCounter should had failed with invalid table.");
|
||||
}catch (Throwable e){
|
||||
assertTrue(e instanceof AssertionError);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertUsageContent(String usage) {
|
||||
assertTrue(usage.contains("Usage: hbase rowcounter [options] <tablename> "
|
||||
+ "[--starttime=<start> --endtime=<end>] "
|
||||
+ "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]"));
|
||||
assertTrue(usage.contains("For performance consider the following options:"));
|
||||
assertTrue(usage.contains("-Dhbase.client.scanner.caching=100"));
|
||||
assertTrue(usage.contains("-Dmapreduce.map.speculative=false"));
|
||||
assertTrue(usage.contains("usage: hbase rowcounter "
|
||||
+ "<tablename> [options] [<column1> <column2>...]"));
|
||||
assertTrue(usage.contains("Options:\n"));
|
||||
assertTrue(usage.contains("--starttime=<arg> "
|
||||
+ "starting time filter to start counting rows from.\n"));
|
||||
assertTrue(usage.contains("--endtime=<arg> "
|
||||
+ "end time filter limit, to only count rows up to this timestamp.\n"));
|
||||
assertTrue(usage.contains("--range=<arg> "
|
||||
+ "[startKey],[endKey][;[startKey],[endKey]...]]\n"));
|
||||
assertTrue(usage.contains("--expectedCount=<arg> expected number of rows to be count.\n"));
|
||||
assertTrue(usage.contains("For performance, "
|
||||
+ "consider the following configuration properties:\n"));
|
||||
assertTrue(usage.contains("-Dhbase.client.scanner.caching=100\n"));
|
||||
assertTrue(usage.contains("-Dmapreduce.map.speculative=false\n"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue