HBASE-25109 Add MR Counters to WALPlayer; currently hard to tell if it is doing anything (#2468)
Add MR counters so operator can see if WALPlayer run actually did anything. Fix bugs in usage (it enforced two args though usage describes allowing one arg only). Clean up usage output. In particular add note on wal file separator as hbase by default uses the ',' in its WAL file names which could befuddle operator trying to do simple import. Signed-off-by: Huaxiang Sun <huaxiangsun@apache.com>
This commit is contained in:
parent
9fc29c4cbf
commit
01876071b9
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -34,6 +34,7 @@ import org.apache.yetus.audience.InterfaceStability;
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class Driver {
|
public class Driver {
|
||||||
|
private Driver() {}
|
||||||
|
|
||||||
public static void main(String[] args) throws Throwable {
|
public static void main(String[] args) throws Throwable {
|
||||||
ProgramDriver pgd = new ProgramDriver();
|
ProgramDriver pgd = new ProgramDriver();
|
||||||
|
|
|
@ -154,14 +154,13 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
|
||||||
WALSplit hsplit = (WALSplit)split;
|
WALSplit hsplit = (WALSplit)split;
|
||||||
logFile = new Path(hsplit.getLogFileName());
|
logFile = new Path(hsplit.getLogFileName());
|
||||||
conf = context.getConfiguration();
|
conf = context.getConfiguration();
|
||||||
LOG.info("Opening reader for "+split);
|
LOG.info("Opening {} for {}", logFile, split);
|
||||||
openReader(logFile);
|
openReader(logFile);
|
||||||
this.startTime = hsplit.getStartTime();
|
this.startTime = hsplit.getStartTime();
|
||||||
this.endTime = hsplit.getEndTime();
|
this.endTime = hsplit.getEndTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void openReader(Path path) throws IOException
|
private void openReader(Path path) throws IOException {
|
||||||
{
|
|
||||||
closeReader();
|
closeReader();
|
||||||
reader = AbstractFSWALProvider.openReader(path, conf);
|
reader = AbstractFSWALProvider.openReader(path, conf);
|
||||||
seek();
|
seek();
|
||||||
|
|
|
@ -58,6 +58,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A tool to replay WAL files as a M/R job.
|
* A tool to replay WAL files as a M/R job.
|
||||||
* The WAL can be replayed for a set of tables or all tables,
|
* The WAL can be replayed for a set of tables or all tables,
|
||||||
|
@ -140,7 +142,22 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mapper that writes out {@link Mutation} to be directly applied to a running HBase instance.
|
* Enum for map metrics. Keep it out here rather than inside in the Map
|
||||||
|
* inner-class so we can find associated properties.
|
||||||
|
*/
|
||||||
|
protected static enum Counter {
|
||||||
|
/** Number of aggregated writes */
|
||||||
|
PUTS,
|
||||||
|
/** Number of aggregated deletes */
|
||||||
|
DELETES,
|
||||||
|
CELLS_READ,
|
||||||
|
CELLS_WRITTEN,
|
||||||
|
WALEDITS
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A mapper that writes out {@link Mutation} to be directly applied to
|
||||||
|
* a running HBase instance.
|
||||||
*/
|
*/
|
||||||
protected static class WALMapper
|
protected static class WALMapper
|
||||||
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
|
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
|
||||||
|
@ -148,6 +165,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void map(WALKey key, WALEdit value, Context context) throws IOException {
|
public void map(WALKey key, WALEdit value, Context context) throws IOException {
|
||||||
|
context.getCounter(Counter.WALEDITS).increment(1);
|
||||||
try {
|
try {
|
||||||
if (tables.isEmpty() || tables.containsKey(key.getTableName())) {
|
if (tables.isEmpty() || tables.containsKey(key.getTableName())) {
|
||||||
TableName targetTable =
|
TableName targetTable =
|
||||||
|
@ -157,6 +175,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
Delete del = null;
|
Delete del = null;
|
||||||
Cell lastCell = null;
|
Cell lastCell = null;
|
||||||
for (Cell cell : value.getCells()) {
|
for (Cell cell : value.getCells()) {
|
||||||
|
context.getCounter(Counter.CELLS_READ).increment(1);
|
||||||
// Filtering WAL meta marker entries.
|
// Filtering WAL meta marker entries.
|
||||||
if (WALEdit.isMetaEditFamily(cell)) {
|
if (WALEdit.isMetaEditFamily(cell)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -172,9 +191,11 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
// row or type changed, write out aggregate KVs.
|
// row or type changed, write out aggregate KVs.
|
||||||
if (put != null) {
|
if (put != null) {
|
||||||
context.write(tableOut, put);
|
context.write(tableOut, put);
|
||||||
|
context.getCounter(Counter.PUTS).increment(1);
|
||||||
}
|
}
|
||||||
if (del != null) {
|
if (del != null) {
|
||||||
context.write(tableOut, del);
|
context.write(tableOut, del);
|
||||||
|
context.getCounter(Counter.DELETES).increment(1);
|
||||||
}
|
}
|
||||||
if (CellUtil.isDelete(cell)) {
|
if (CellUtil.isDelete(cell)) {
|
||||||
del = new Delete(CellUtil.cloneRow(cell));
|
del = new Delete(CellUtil.cloneRow(cell));
|
||||||
|
@ -187,14 +208,17 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
} else {
|
} else {
|
||||||
put.add(cell);
|
put.add(cell);
|
||||||
}
|
}
|
||||||
|
context.getCounter(Counter.CELLS_WRITTEN).increment(1);
|
||||||
}
|
}
|
||||||
lastCell = cell;
|
lastCell = cell;
|
||||||
}
|
}
|
||||||
// write residual KVs
|
// write residual KVs
|
||||||
if (put != null) {
|
if (put != null) {
|
||||||
context.write(tableOut, put);
|
context.write(tableOut, put);
|
||||||
|
context.getCounter(Counter.PUTS).increment(1);
|
||||||
}
|
}
|
||||||
if (del != null) {
|
if (del != null) {
|
||||||
|
context.getCounter(Counter.DELETES).increment(1);
|
||||||
context.write(tableOut, del);
|
context.write(tableOut, del);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -270,7 +294,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
setupTime(conf, WALInputFormat.START_TIME_KEY);
|
setupTime(conf, WALInputFormat.START_TIME_KEY);
|
||||||
setupTime(conf, WALInputFormat.END_TIME_KEY);
|
setupTime(conf, WALInputFormat.END_TIME_KEY);
|
||||||
String inputDirs = args[0];
|
String inputDirs = args[0];
|
||||||
String[] tables = args[1].split(",");
|
String[] tables = args.length == 1? new String [] {}: args[1].split(",");
|
||||||
String[] tableMap;
|
String[] tableMap;
|
||||||
if (args.length > 2) {
|
if (args.length > 2) {
|
||||||
tableMap = args[2].split(",");
|
tableMap = args[2].split(",");
|
||||||
|
@ -278,7 +302,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
throw new IOException("The same number of tables and mapping must be provided.");
|
throw new IOException("The same number of tables and mapping must be provided.");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// if not mapping is specified map each table to itself
|
// if no mapping is specified, map each table to itself
|
||||||
tableMap = tables;
|
tableMap = tables;
|
||||||
}
|
}
|
||||||
conf.setStrings(TABLES_KEY, tables);
|
conf.setStrings(TABLES_KEY, tables);
|
||||||
|
@ -349,27 +373,27 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
if (errorMsg != null && errorMsg.length() > 0) {
|
if (errorMsg != null && errorMsg.length() > 0) {
|
||||||
System.err.println("ERROR: " + errorMsg);
|
System.err.println("ERROR: " + errorMsg);
|
||||||
}
|
}
|
||||||
System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
|
System.err.println("Usage: " + NAME + " [options] <WAL inputdir> [<tables> <tableMappings>]");
|
||||||
System.err.println("Replay all WAL files into HBase.");
|
System.err.println(" <WAL inputdir> directory of WALs to replay.");
|
||||||
System.err.println("<tables> is a comma separated list of tables.");
|
System.err.println(" <tables> comma separated list of tables. If no tables specified,");
|
||||||
System.err.println("If no tables (\"\") are specified, all tables are imported.");
|
System.err.println(" all are imported (even hbase:meta if present).");
|
||||||
System.err.println("(Be careful, hbase:meta entries will be imported in this case.)\n");
|
System.err.println(" <tableMappings> WAL entries can be mapped to a new set of tables by passing");
|
||||||
System.err.println("WAL entries can be mapped to new set of tables via <tableMappings>.");
|
System.err.println(" <tableMappings>, a comma separated list of target tables.");
|
||||||
System.err.println("<tableMappings> is a comma separated list of target tables.");
|
System.err.println(" If specified, each table in <tables> must have a mapping.");
|
||||||
System.err.println("If specified, each table in <tables> must have a mapping.\n");
|
System.err.println("To generate HFiles to bulk load instead of loading HBase directly, pass:");
|
||||||
System.err.println("By default " + NAME + " will load data directly into HBase.");
|
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
|
||||||
System.err.println("To generate HFiles for a bulk data load instead, pass the following option:");
|
System.err.println(" Only one table can be specified, and no mapping allowed!");
|
||||||
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
|
System.err.println("To specify a time range, pass:");
|
||||||
System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
|
System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
|
||||||
System.err.println("Time range options:");
|
System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
|
||||||
System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
|
System.err.println(" The start and the end date of timerange. The dates can be expressed");
|
||||||
System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
|
System.err.println(" in milliseconds since epoch or in yyyy-MM-dd'T'HH:mm:ss.SS format.");
|
||||||
System.err.println(" (The start and the end date of timerange. The dates can be expressed");
|
System.err.println(" E.g. 1234567890120 or 2009-02-13T23:32:30.12");
|
||||||
System.err.println(" in milliseconds since epoch or in yyyy-MM-dd'T'HH:mm:ss.SS format.");
|
|
||||||
System.err.println(" E.g. 1234567890120 or 2009-02-13T23:32:30.12)");
|
|
||||||
System.err.println("Other options:");
|
System.err.println("Other options:");
|
||||||
System.err.println(" -D" + JOB_NAME_CONF_KEY + "=jobName");
|
System.err.println(" -D" + JOB_NAME_CONF_KEY + "=jobName");
|
||||||
System.err.println(" Use the specified mapreduce job name for the wal player");
|
System.err.println(" Use the specified mapreduce job name for the wal player");
|
||||||
|
System.err.println(" -Dwal.input.separator=' '");
|
||||||
|
System.err.println(" Change WAL filename separator (WAL dir names use default ','.)");
|
||||||
System.err.println("For performance also consider the following options:\n"
|
System.err.println("For performance also consider the following options:\n"
|
||||||
+ " -Dmapreduce.map.speculative=false\n"
|
+ " -Dmapreduce.map.speculative=false\n"
|
||||||
+ " -Dmapreduce.reduce.speculative=false");
|
+ " -Dmapreduce.reduce.speculative=false");
|
||||||
|
@ -387,7 +411,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
if (args.length < 2) {
|
if (args.length < 1) {
|
||||||
usage("Wrong number of arguments: " + args.length);
|
usage("Wrong number of arguments: " + args.length);
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -223,8 +223,8 @@ public class TestWALPlayer {
|
||||||
} catch (SecurityException e) {
|
} catch (SecurityException e) {
|
||||||
assertEquals(-1, newSecurityManager.getExitCode());
|
assertEquals(-1, newSecurityManager.getExitCode());
|
||||||
assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
|
assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
|
||||||
assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
|
assertTrue(data.toString().contains("Usage: WALPlayer [options] <WAL inputdir>" +
|
||||||
" <tables> [<tableMappings>]"));
|
" [<tables> <tableMappings>]"));
|
||||||
assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
|
assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -914,7 +914,7 @@ see <<_wal_tools>>.
|
||||||
Invoke via:
|
Invoke via:
|
||||||
|
|
||||||
----
|
----
|
||||||
$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer [options] <wal inputdir> <tables> [<tableMappings>]>
|
$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer [options] <WAL inputdir> [<tables> <tableMappings>]>
|
||||||
----
|
----
|
||||||
|
|
||||||
For example:
|
For example:
|
||||||
|
@ -932,29 +932,27 @@ To NOT run WALPlayer as a mapreduce job on your cluster, force it to run all in
|
||||||
Running `WALPlayer` with no arguments prints brief usage information:
|
Running `WALPlayer` with no arguments prints brief usage information:
|
||||||
|
|
||||||
----
|
----
|
||||||
Usage: WALPlayer [options] <wal inputdir> <tables> [<tableMappings>]
|
Usage: WALPlayer [options] <WAL inputdir> [<tables> <tableMappings>]
|
||||||
Replay all WAL files into HBase.
|
<WAL inputdir> directory of WALs to replay.
|
||||||
<tables> is a comma separated list of tables.
|
<tables> comma separated list of tables. If no tables specified,
|
||||||
If no tables ("") are specified, all tables are imported.
|
all are imported (even hbase:meta if present).
|
||||||
(Be careful, hbase:meta entries will be imported in this case.)
|
<tableMappings> WAL entries can be mapped to a new set of tables by passing
|
||||||
|
<tableMappings>, a comma separated list of target tables.
|
||||||
WAL entries can be mapped to new set of tables via <tableMappings>.
|
If specified, each table in <tables> must have a mapping.
|
||||||
<tableMappings> is a comma separated list of target tables.
|
To generate HFiles to bulk load instead of loading HBase directly, pass:
|
||||||
If specified, each table in <tables> must have a mapping.
|
-Dwal.bulk.output=/path/for/output
|
||||||
|
Only one table can be specified, and no mapping allowed!
|
||||||
By default WALPlayer will load data directly into HBase.
|
To specify a time range, pass:
|
||||||
To generate HFiles for a bulk data load instead, pass the following option:
|
-Dwal.start.time=[date|ms]
|
||||||
-Dwal.bulk.output=/path/for/output
|
-Dwal.end.time=[date|ms]
|
||||||
(Only one table can be specified, and no mapping is allowed!)
|
The start and the end date of timerange. The dates can be expressed
|
||||||
Time range options:
|
in milliseconds since epoch or in yyyy-MM-dd'T'HH:mm:ss.SS format.
|
||||||
-Dwal.start.time=[date|ms]
|
E.g. 1234567890120 or 2009-02-13T23:32:30.12
|
||||||
-Dwal.end.time=[date|ms]
|
|
||||||
(The start and the end date of timerange. The dates can be expressed
|
|
||||||
in milliseconds since epoch or in yyyy-MM-dd'T'HH:mm:ss.SS format.
|
|
||||||
E.g. 1234567890120 or 2009-02-13T23:32:30.12)
|
|
||||||
Other options:
|
Other options:
|
||||||
-Dmapreduce.job.name=jobName
|
-Dmapreduce.job.name=jobName
|
||||||
Use the specified mapreduce job name for the wal player
|
Use the specified mapreduce job name for the wal player
|
||||||
|
-Dwal.input.separator=' '
|
||||||
|
Change WAL filename separator (WAL dir names use default ','.)
|
||||||
For performance also consider the following options:
|
For performance also consider the following options:
|
||||||
-Dmapreduce.map.speculative=false
|
-Dmapreduce.map.speculative=false
|
||||||
-Dmapreduce.reduce.speculative=false
|
-Dmapreduce.reduce.speculative=false
|
||||||
|
|
Loading…
Reference in New Issue