diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index f7ff0e9c802..e88d6dff7bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -65,7 +65,8 @@ public class CopyTable extends Configured implements Tool { String peerAddress = null; String families = null; boolean allCells = false; - + static boolean shuffle = false; + boolean bulkload = false; Path bulkloadDir = null; @@ -86,7 +87,7 @@ public class CopyTable extends Configured implements Tool { if (!doCommandLine(args)) { return null; } - + Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(CopyTable.class); Scan scan = new Scan(); @@ -98,24 +99,27 @@ public class CopyTable extends Configured implements Tool { if (allCells) { scan.setRaw(true); } + if (shuffle) { + job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); + } if (versions >= 0) { scan.setMaxVersions(versions); } - + if (startRow != null) { scan.setStartRow(Bytes.toBytes(startRow)); } - + if (stopRow != null) { scan.setStopRow(Bytes.toBytes(stopRow)); } - + if(families != null) { String[] fams = families.split(","); Map cfRenameMap = new HashMap(); for(String fam : fams) { String sourceCf; - if(fam.contains(":")) { + if(fam.contains(":")) { // fam looks like "sourceCfName:destCfName" String[] srcAndDest = fam.split(":", 2); sourceCf = srcAndDest[0]; @@ -123,21 +127,21 @@ public class CopyTable extends Configured implements Tool { cfRenameMap.put(sourceCf, destCf); } else { // fam is just "sourceCf" - sourceCf = fam; + sourceCf = fam; } scan.addFamily(Bytes.toBytes(sourceCf)); } Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); } job.setNumReduceTasks(0); - + if (bulkload) { TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null, null, job); - + // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); - + FileSystem fs = FileSystem.get(getConf()); Random rand = new Random(); Path root = new Path(fs.getWorkingDirectory(), "copytable"); @@ -148,7 +152,7 @@ public class CopyTable extends Configured implements Tool { break; } } - + System.out.println("HFiles will be stored at " + this.bulkloadDir); HFileOutputFormat2.setOutputPath(job, bulkloadDir); try (Connection conn = ConnectionFactory.createConnection(getConf()); @@ -158,11 +162,11 @@ public class CopyTable extends Configured implements Tool { } else { TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.Importer.class, null, null, job); - + TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, null); } - + return job; } @@ -227,19 +231,19 @@ public class CopyTable extends Configured implements Tool { printUsage(null); return false; } - + final String startRowArgKey = "--startrow="; if (cmd.startsWith(startRowArgKey)) { startRow = cmd.substring(startRowArgKey.length()); continue; } - + final String stopRowArgKey = "--stoprow="; if (cmd.startsWith(stopRowArgKey)) { stopRow = cmd.substring(stopRowArgKey.length()); continue; } - + final String startTimeArgKey = "--starttime="; if (cmd.startsWith(startTimeArgKey)) { startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); @@ -280,12 +284,17 @@ public class CopyTable extends Configured implements Tool { allCells = true; continue; } - + if (cmd.startsWith("--bulkload")) { bulkload = true; continue; } + if (cmd.startsWith("--shuffle")) { + shuffle = true; + continue; + } + if (i == args.length-1) { tableName = cmd; } else { @@ -302,12 +311,12 @@ public class CopyTable extends Configured implements Tool { printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); return false; } - + if (bulkload && peerAddress != null) { printUsage("Remote bulkload is not supported!"); return false; } - + // set dstTableName if necessary if (dstTableName == null) { dstTableName = tableName; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 56ae349843e..dccaa25fa68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.Collections; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +35,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; @@ -80,6 +84,8 @@ implements Configurable { public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; /** Set the maximum number of values to return for each call to next(). */ public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize"; + /** Specify if we have to shuffle the map tasks. */ + public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps"; /** The configuration. */ private Configuration conf = null; @@ -113,7 +119,7 @@ implements Configurable { } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } - + Scan scan = null; if (conf.get(SCAN) != null) { @@ -173,7 +179,7 @@ implements Configurable { setScan(scan); } - + /** * Parses a combined family and qualifier and adds either both or just the * family in case there is no qualifier. This assumes the older colon @@ -210,6 +216,25 @@ implements Configurable { } } + /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits matches the number of regions in a table. Splits are shuffled if + * required. + * @param context The current job context. + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + * org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public List getSplits(JobContext context) throws IOException { + List splits = super.getSplits(context); + if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) { + Collections.shuffle(splits); + } + return splits; + } + /** * Convenience method to parse a string representation of an array of column specifiers. * @@ -235,7 +260,7 @@ implements Configurable { return super.getStartEndKeys(); } - + /** * Sets split table in map-reduce job. */