HBASE-8139 Allow job names to be overridden

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Ashish Singhi 2014-09-17 20:59:33 +05:30 committed by Elliott Clark
parent b26ec4e3df
commit 759922ba7e
8 changed files with 29 additions and 9 deletions

View File

@ -81,6 +81,8 @@ public class CellCounter extends Configured implements Tool {
*/ */
static final String NAME = "CellCounter"; static final String NAME = "CellCounter";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
/** /**
* Mapper that runs the count. * Mapper that runs the count.
*/ */
@ -188,7 +190,7 @@ public class CellCounter extends Configured implements Tool {
Path outputDir = new Path(args[1]); Path outputDir = new Path(args[1]);
String reportSeparatorString = (args.length > 2) ? args[2]: ":"; String reportSeparatorString = (args.length > 2) ? args[2]: ":";
conf.set("ReportSeparator", reportSeparatorString); conf.set("ReportSeparator", reportSeparatorString);
Job job = Job.getInstance(conf, NAME + "_" + tableName); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(CellCounter.class); job.setJarByClass(CellCounter.class);
Scan scan = getConfiguredScanForJob(conf, args); Scan scan = getConfiguredScanForJob(conf, args);
TableMapReduceUtil.initTableMapperJob(tableName, scan, TableMapReduceUtil.initTableMapperJob(tableName, scan,

View File

@ -55,7 +55,9 @@ public class CopyTable extends Configured implements Tool {
static String peerAddress = null; static String peerAddress = null;
static String families = null; static String families = null;
static boolean allCells = false; static boolean allCells = false;
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
public CopyTable(Configuration conf) { public CopyTable(Configuration conf) {
super(conf); super(conf);
} }
@ -72,7 +74,7 @@ public class CopyTable extends Configured implements Tool {
if (!doCommandLine(args)) { if (!doCommandLine(args)) {
return null; return null;
} }
Job job = Job.getInstance(conf, NAME + "_" + tableName); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(CopyTable.class); job.setJarByClass(CopyTable.class);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setCacheBlocks(false); scan.setCacheBlocks(false);

View File

@ -58,6 +58,8 @@ public class Export extends Configured implements Tool {
final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows"; final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
final static String EXPORT_BATCHING = "hbase.export.scanner.batch"; final static String EXPORT_BATCHING = "hbase.export.scanner.batch";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
/** /**
* Sets up the actual job. * Sets up the actual job.
* *
@ -70,7 +72,7 @@ public class Export extends Configured implements Tool {
throws IOException { throws IOException {
String tableName = args[0]; String tableName = args[0];
Path outputDir = new Path(args[1]); Path outputDir = new Path(args[1]);
Job job = Job.getInstance(conf, NAME + "_" + tableName); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJobName(NAME + "_" + tableName); job.setJobName(NAME + "_" + tableName);
job.setJarByClass(Export.class); job.setJarByClass(Export.class);
// Set optional scan parameters // Set optional scan parameters
@ -166,6 +168,8 @@ public class Export extends Configured implements Tool {
System.err.println(" -D " + RAW_SCAN + "=true"); System.err.println(" -D " + RAW_SCAN + "=true");
System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>"); System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>"); System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
System.err.println(" -D " + JOB_NAME_CONF_KEY
+ "=jobName - use the specified mapreduce job name for the export");
System.err.println("For performance consider the following properties:\n" System.err.println("For performance consider the following properties:\n"
+ " -Dhbase.client.scanner.caching=100\n" + " -Dhbase.client.scanner.caching=100\n"
+ " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.map.speculative=false\n"

View File

@ -78,6 +78,8 @@ public class Import extends Configured implements Tool {
public final static String TABLE_NAME = "import.table.name"; public final static String TABLE_NAME = "import.table.name";
public final static String WAL_DURABILITY = "import.wal.durability"; public final static String WAL_DURABILITY = "import.wal.durability";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
/** /**
* A mapper that just writes out KeyValues. * A mapper that just writes out KeyValues.
*/ */
@ -402,7 +404,7 @@ public class Import extends Configured implements Tool {
String tableName = args[0]; String tableName = args[0];
conf.set(TABLE_NAME, tableName); conf.set(TABLE_NAME, tableName);
Path inputDir = new Path(args[1]); Path inputDir = new Path(args[1]);
Job job = Job.getInstance(conf, NAME + "_" + tableName); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(Importer.class); job.setJarByClass(Importer.class);
FileInputFormat.setInputPaths(job, inputDir); FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class); job.setInputFormatClass(SequenceFileInputFormat.class);
@ -461,6 +463,8 @@ public class Import extends Configured implements Tool {
+ " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;" + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including" + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
+ " the KeyValue."); + " the KeyValue.");
System.err.println(" -D " + JOB_NAME_CONF_KEY
+ "=jobName - use the specified mapreduce job name for the import");
System.err.println("For performance consider the following options:\n" System.err.println("For performance consider the following options:\n"
+ " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.map.speculative=false\n"
+ " -Dmapreduce.reduce.speculative=false\n" + " -Dmapreduce.reduce.speculative=false\n"

View File

@ -417,7 +417,7 @@ public class ImportTsv extends Configured implements Tool {
String tableName = args[0]; String tableName = args[0];
Path inputDir = new Path(args[1]); Path inputDir = new Path(args[1]);
String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName); String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName);
Job job = Job.getInstance(conf, NAME + "_" + tableName); Job job = Job.getInstance(conf, jobName);
job.setJarByClass(mapperClass); job.setJarByClass(mapperClass);
FileInputFormat.setInputPaths(job, inputDir); FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(TextInputFormat.class); job.setInputFormatClass(TextInputFormat.class);

View File

@ -51,6 +51,8 @@ public class RowCounter extends Configured implements Tool {
/** Name of this 'program'. */ /** Name of this 'program'. */
static final String NAME = "rowcounter"; static final String NAME = "rowcounter";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
/** /**
* Mapper that runs the count. * Mapper that runs the count.
*/ */
@ -115,7 +117,7 @@ public class RowCounter extends Configured implements Tool {
} }
} }
Job job = Job.getInstance(conf, NAME + "_" + tableName); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(RowCounter.class); job.setJarByClass(RowCounter.class);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setCacheBlocks(false); scan.setCacheBlocks(false);

View File

@ -69,6 +69,8 @@ public class WALPlayer extends Configured implements Tool {
final static String TABLES_KEY = "hlog.input.tables"; final static String TABLES_KEY = "hlog.input.tables";
final static String TABLE_MAP_KEY = "hlog.input.tablesmap"; final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
/** /**
* A mapper that just writes out KeyValues. * A mapper that just writes out KeyValues.
* This one can be used together with {@link KeyValueSortReducer} * This one can be used together with {@link KeyValueSortReducer}
@ -234,7 +236,7 @@ public class WALPlayer extends Configured implements Tool {
} }
conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLES_KEY, tables);
conf.setStrings(TABLE_MAP_KEY, tableMap); conf.setStrings(TABLE_MAP_KEY, tableMap);
Job job = Job.getInstance(conf, NAME + "_" + inputDir); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + inputDir));
job.setJarByClass(WALPlayer.class); job.setJarByClass(WALPlayer.class);
FileInputFormat.setInputPaths(job, inputDir); FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(HLogInputFormat.class); job.setInputFormatClass(HLogInputFormat.class);
@ -288,6 +290,8 @@ public class WALPlayer extends Configured implements Tool {
System.err.println("Other options: (specify time range to WAL edit to consider)"); System.err.println("Other options: (specify time range to WAL edit to consider)");
System.err.println(" -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]"); System.err.println(" -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
System.err.println(" -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]"); System.err.println(" -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
System.err.println(" -D " + JOB_NAME_CONF_KEY
+ "=jobName - use the specified mapreduce job name for the wal player");
System.err.println("For performance also consider the following options:\n" System.err.println("For performance also consider the following options:\n"
+ " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.map.speculative=false\n"
+ " -Dmapreduce.reduce.speculative=false"); + " -Dmapreduce.reduce.speculative=false");

View File

@ -75,6 +75,8 @@ public class VerifyReplication extends Configured implements Tool {
static String families = null; static String families = null;
static String peerId = null; static String peerId = null;
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
/** /**
* Map-only comparator for 2 tables * Map-only comparator for 2 tables
*/ */
@ -208,7 +210,7 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
LOG.info("Peer Quorum Address: " + peerQuorumAddress); LOG.info("Peer Quorum Address: " + peerQuorumAddress);
Job job = new Job(conf, NAME + "_" + tableName); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(VerifyReplication.class); job.setJarByClass(VerifyReplication.class);
Scan scan = new Scan(); Scan scan = new Scan();