HBASE-11562 CopyTable should provide an option to shuffle the mapper tasks (Jean-Marc Spaggiari)

This commit is contained in:
stack 2014-10-28 11:28:54 -07:00
parent bb81b9fde5
commit 64b6109ce9
2 changed files with 56 additions and 22 deletions

View File

@ -65,7 +65,8 @@ public class CopyTable extends Configured implements Tool {
String peerAddress = null;
String families = null;
boolean allCells = false;
static boolean shuffle = false;
boolean bulkload = false;
Path bulkloadDir = null;
@ -86,7 +87,7 @@ public class CopyTable extends Configured implements Tool {
if (!doCommandLine(args)) {
return null;
}
Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(CopyTable.class);
Scan scan = new Scan();
@ -98,24 +99,27 @@ public class CopyTable extends Configured implements Tool {
if (allCells) {
scan.setRaw(true);
}
if (shuffle) {
job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
}
if (versions >= 0) {
scan.setMaxVersions(versions);
}
if (startRow != null) {
scan.setStartRow(Bytes.toBytes(startRow));
}
if (stopRow != null) {
scan.setStopRow(Bytes.toBytes(stopRow));
}
if(families != null) {
String[] fams = families.split(",");
Map<String,String> cfRenameMap = new HashMap<String,String>();
for(String fam : fams) {
String sourceCf;
if(fam.contains(":")) {
if(fam.contains(":")) {
// fam looks like "sourceCfName:destCfName"
String[] srcAndDest = fam.split(":", 2);
sourceCf = srcAndDest[0];
@ -123,21 +127,21 @@ public class CopyTable extends Configured implements Tool {
cfRenameMap.put(sourceCf, destCf);
} else {
// fam is just "sourceCf"
sourceCf = fam;
sourceCf = fam;
}
scan.addFamily(Bytes.toBytes(sourceCf));
}
Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
}
job.setNumReduceTasks(0);
if (bulkload) {
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
null, job);
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
FileSystem fs = FileSystem.get(getConf());
Random rand = new Random();
Path root = new Path(fs.getWorkingDirectory(), "copytable");
@ -148,7 +152,7 @@ public class CopyTable extends Configured implements Tool {
break;
}
}
System.out.println("HFiles will be stored at " + this.bulkloadDir);
HFileOutputFormat2.setOutputPath(job, bulkloadDir);
try (Connection conn = ConnectionFactory.createConnection(getConf());
@ -158,11 +162,11 @@ public class CopyTable extends Configured implements Tool {
} else {
TableMapReduceUtil.initTableMapperJob(tableName, scan,
Import.Importer.class, null, null, job);
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
null);
}
return job;
}
@ -227,19 +231,19 @@ public class CopyTable extends Configured implements Tool {
printUsage(null);
return false;
}
final String startRowArgKey = "--startrow=";
if (cmd.startsWith(startRowArgKey)) {
startRow = cmd.substring(startRowArgKey.length());
continue;
}
final String stopRowArgKey = "--stoprow=";
if (cmd.startsWith(stopRowArgKey)) {
stopRow = cmd.substring(stopRowArgKey.length());
continue;
}
final String startTimeArgKey = "--starttime=";
if (cmd.startsWith(startTimeArgKey)) {
startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
@ -280,12 +284,17 @@ public class CopyTable extends Configured implements Tool {
allCells = true;
continue;
}
if (cmd.startsWith("--bulkload")) {
bulkload = true;
continue;
}
if (cmd.startsWith("--shuffle")) {
shuffle = true;
continue;
}
if (i == args.length-1) {
tableName = cmd;
} else {
@ -302,12 +311,12 @@ public class CopyTable extends Configured implements Tool {
printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
return false;
}
if (bulkload && peerAddress != null) {
printUsage("Remote bulkload is not supported!");
return false;
}
// set dstTableName if necessary
if (dstTableName == null) {
dstTableName = tableName;

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,6 +35,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
@ -80,6 +84,8 @@ implements Configurable {
public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
/** Set the maximum number of values to return for each call to next(). */
public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
/** Specify if we have to shuffle the map tasks. */
public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
/** The configuration. */
private Configuration conf = null;
@ -113,7 +119,7 @@ implements Configurable {
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
Scan scan = null;
if (conf.get(SCAN) != null) {
@ -173,7 +179,7 @@ implements Configurable {
setScan(scan);
}
/**
* Parses a combined family and qualifier and adds either both or just the
* family in case there is no qualifier. This assumes the older colon
@ -210,6 +216,25 @@ implements Configurable {
}
}
/**
* Calculates the splits that will serve as input for the map tasks. The
* number of splits matches the number of regions in a table. Splits are shuffled if
* required.
* @param context The current job context.
* @return The list of input splits.
* @throws IOException When creating the list of splits fails.
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
* org.apache.hadoop.mapreduce.JobContext)
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> splits = super.getSplits(context);
if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) {
Collections.shuffle(splits);
}
return splits;
}
/**
* Convenience method to parse a string representation of an array of column specifiers.
*
@ -235,7 +260,7 @@ implements Configurable {
return super.getStartEndKeys();
}
/**
* Sets split table in map-reduce job.
*/