diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 89ff2863ec7..5f2d16fca9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -20,20 +20,30 @@ package org.apache.hadoop.hbase.mapreduce; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; /** * Tool used to copy a table to another one which can be on a different setup. @@ -43,36 +53,43 @@ import java.util.Map; @InterfaceAudience.Public @InterfaceStability.Stable public class CopyTable extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(CopyTable.class); final static String NAME = "copytable"; - static long startTime = 0; - static long endTime = 0; - static int versions = -1; - static String tableName = null; - static String startRow = null; - static String stopRow = null; - static String newTableName = null; - static String peerAddress = null; - static String families = null; - static boolean allCells = false; + long startTime = 0; + long endTime = 0; + int versions = -1; + String tableName = null; + String startRow = null; + String stopRow = null; + String dstTableName = null; + String peerAddress = null; + String families = null; + boolean allCells = false; + boolean bulkload = false; + Path bulkloadDir = null; + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + public CopyTable(Configuration conf) { super(conf); } /** * Sets up the actual job. * - * @param conf The current configuration. * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ - public static Job createSubmittableJob(Configuration conf, String[] args) + public Job createSubmittableJob(String[] args) throws IOException { if (!doCommandLine(args)) { return null; } - Job job = new Job(conf, NAME + "_" + tableName); + + Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + job.setJarByClass(CopyTable.class); Scan scan = new Scan(); scan.setCacheBlocks(false); @@ -114,12 +131,40 @@ public class CopyTable extends Configured implements Tool { } Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); } - TableMapReduceUtil.initTableMapperJob(tableName, scan, - Import.Importer.class, null, null, job); - TableMapReduceUtil.initTableReducerJob( - newTableName == null ? tableName : newTableName, null, job, - null, peerAddress, null, null); job.setNumReduceTasks(0); + + if (bulkload) { + TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null, + null, job); + + // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. + TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); + + FileSystem fs = FileSystem.get(getConf()); + Random rand = new Random(); + Path root = new Path(fs.getWorkingDirectory(), "copytable"); + fs.mkdirs(root); + while (true) { + bulkloadDir = new Path(root, "" + rand.nextLong()); + if (!fs.exists(bulkloadDir)) { + break; + } + } + + System.out.println("HFiles will be stored at " + this.bulkloadDir); + HFileOutputFormat2.setOutputPath(job, bulkloadDir); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Table htable = conn.getTable(TableName.valueOf(dstTableName))) { + HFileOutputFormat2.configureIncrementalLoadMap(job, htable); + } + } else { + TableMapReduceUtil.initTableMapperJob(tableName, scan, + Import.Importer.class, null, null, job); + + TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, + null); + } + return job; } @@ -150,6 +195,8 @@ public class CopyTable extends Configured implements Tool { System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); System.err.println(" To keep the same name, just give \"cfName\""); System.err.println(" all.cells also copy delete markers and deleted cells"); + System.err.println(" bulkload Write input into HFiles and bulk load to the destination " + + "table"); System.err.println(); System.err.println("Args:"); System.err.println(" tablename Name of the table to copy"); @@ -168,7 +215,7 @@ public class CopyTable extends Configured implements Tool { + " -Dmapreduce.map.speculative=false"); } - private static boolean doCommandLine(final String[] args) { + private boolean doCommandLine(final String[] args) { // Process command-line args. TODO: Better cmd-line processing // (but hopefully something not as painful as cli options). if (args.length < 1) { @@ -215,7 +262,7 @@ public class CopyTable extends Configured implements Tool { final String newNameArgKey = "--new.name="; if (cmd.startsWith(newNameArgKey)) { - newTableName = cmd.substring(newNameArgKey.length()); + dstTableName = cmd.substring(newNameArgKey.length()); continue; } @@ -235,6 +282,11 @@ public class CopyTable extends Configured implements Tool { allCells = true; continue; } + + if (cmd.startsWith("--bulkload")) { + bulkload = true; + continue; + } if (i == args.length-1) { tableName = cmd; @@ -243,7 +295,7 @@ public class CopyTable extends Configured implements Tool { return false; } } - if (newTableName == null && peerAddress == null) { + if (dstTableName == null && peerAddress == null) { printUsage("At least a new table name or a " + "peer address must be specified"); return false; @@ -252,6 +304,16 @@ public class CopyTable extends Configured implements Tool { printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); return false; } + + if (bulkload && peerAddress != null) { + printUsage("Remote bulkload is not supported!"); + return false; + } + + // set dstTableName if necessary + if (dstTableName == null) { + dstTableName = tableName; + } } catch (Exception e) { e.printStackTrace(); printUsage("Can't start because " + e.getMessage()); @@ -274,8 +336,29 @@ public class CopyTable extends Configured implements Tool { @Override public int run(String[] args) throws Exception { String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); - Job job = createSubmittableJob(getConf(), otherArgs); + Job job = createSubmittableJob(otherArgs); if (job == null) return 1; - return job.waitForCompletion(true) ? 0 : 1; + if (!job.waitForCompletion(true)) { + LOG.info("Map-reduce job failed!"); + if (bulkload) { + LOG.info("Files are not bulkloaded!"); + } + return 1; + } + int code = 0; + if (bulkload) { + code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(), + this.dstTableName}); + if (code == 0) { + // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun + // LoadIncrementalHFiles. + FileSystem fs = FileSystem.get(this.getConf()); + if (!fs.delete(this.bulkloadDir, true)) { + LOG.error("Deleting folder " + bulkloadDir + " failed!"); + code = 1; + } + } + } + return code; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index f8f9b4d5770..2c0efc8356d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -404,6 +404,24 @@ public class HFileOutputFormat2 LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); } + + public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { + Configuration conf = job.getConfiguration(); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setOutputFormatClass(HFileOutputFormat2.class); + + // Set compression algorithms based on column families + configureCompression(table, conf); + configureBloomType(table, conf); + configureBlockSize(table, conf); + configureDataBlockEncoding(table, conf); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("Incremental table " + table.getName() + " output configured."); + } /** * Runs inside the task to deserialize column family to compression algorithm diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index c7fa29e20e5..42a231b4680 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -22,15 +22,20 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; /** @@ -45,6 +50,11 @@ implements Configurable { /** Job parameter that specifies the input table. */ public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; + /** + * If specified, use start keys of this table to split. + * This is useful when you are preparing data for bulkload. + */ + private static final String SPLIT_TABLE = "hbase.mapreduce.splittable"; /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. */ @@ -103,7 +113,7 @@ implements Configurable { } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } - + Scan scan = null; if (conf.get(SCAN) != null) { @@ -214,4 +224,23 @@ implements Configurable { } } + @Override + protected Pair getStartEndKeys() throws IOException { + if (conf.get(SPLIT_TABLE) != null) { + TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE)); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + RegionLocator rl = conn.getRegionLocator(splitTableName)) { + return rl.getStartEndKeys(); + } + } + + return super.getStartEndKeys(); + } + + /** + * Sets split table in map-reduce job. + */ + public static void configureSplitTable(Job job, TableName tableName) { + job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index c196eedc9e2..5dcfe009a58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -101,8 +101,7 @@ extends InputFormat { private RegionLocator regionLocator; /** The reader scanning the table, can be a custom one. */ private TableRecordReader tableRecordReader = null; - - + /** The reverse DNS lookup cache mapping: IPAddress => HostName */ private HashMap reverseDNSCacheMap = new HashMap(); @@ -142,6 +141,10 @@ extends InputFormat { trr.setTable(table); return trr; } + + protected Pair getStartEndKeys() throws IOException { + return regionLocator.getStartEndKeys(); + } /** * Calculates the splits that will serve as input for the map tasks. The @@ -160,8 +163,8 @@ extends InputFormat { } RegionSizeCalculator sizeCalculator = new RegionSizeCalculator((HTable) table); - - Pair keys = regionLocator.getStartEndKeys(); + + Pair keys = getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java index 937feaff9e8..471c535e491 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -52,7 +51,6 @@ import org.junit.experimental.categories.Category; @Category(LargeTests.class) public class TestCopyTable { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static MiniHBaseCluster cluster; private static final byte[] ROW1 = Bytes.toBytes("row1"); private static final byte[] ROW2 = Bytes.toBytes("row2"); private static final String FAMILY_A_STRING = "a"; @@ -64,7 +62,7 @@ public class TestCopyTable { @BeforeClass public static void beforeClass() throws Exception { - cluster = TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniMapReduceCluster(); } @@ -74,12 +72,7 @@ public class TestCopyTable { TEST_UTIL.shutdownMiniCluster(); } - /** - * Simple end-to-end test - * @throws Exception - */ - @Test - public void testCopyTable() throws Exception { + private void doCopyTableTest(boolean bulkload) throws Exception { final TableName TABLENAME1 = TableName.valueOf("testCopyTable1"); final TableName TABLENAME2 = TableName.valueOf("testCopyTable2"); final byte[] FAMILY = Bytes.toBytes("family"); @@ -97,10 +90,15 @@ public class TestCopyTable { CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration()); - assertEquals( - 0, - copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(), - TABLENAME1.getNameAsString() })); + int code; + if (bulkload) { + code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(), + "--bulkload", TABLENAME1.getNameAsString() }); + } else { + code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(), + TABLENAME1.getNameAsString() }); + } + assertEquals("copy job failed", 0, code); // verify the data was copied into table 2 for (int i = 0; i < 10; i++) { @@ -116,6 +114,23 @@ public class TestCopyTable { TEST_UTIL.deleteTable(TABLENAME2); } + /** + * Simple end-to-end test + * @throws Exception + */ + @Test + public void testCopyTable() throws Exception { + doCopyTableTest(false); + } + + /** + * Simple end-to-end test with bulkload. + */ + @Test + public void testCopyTableWithBulkload() throws Exception { + doCopyTableTest(true); + } + @Test public void testStartStopRow() throws Exception { final TableName TABLENAME1 = TableName.valueOf("testStartStopRow1"); @@ -195,7 +210,6 @@ public class TestCopyTable { "--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000), "--versions=1", sourceTable }; assertNull(t2.get(new Get(ROW1)).getRow()); - clean(); assertTrue(runCopy(args)); @@ -244,24 +258,8 @@ public class TestCopyTable { new Configuration(TEST_UTIL.getConfiguration()), args); Configuration configuration = opts.getConfiguration(); args = opts.getRemainingArgs(); - clean(); - Job job = CopyTable.createSubmittableJob(configuration, args); + Job job = new CopyTable(configuration).createSubmittableJob(args); job.waitForCompletion(false); return job.isSuccessful(); } - - - private void clean() { - - CopyTable.startTime = 0; - CopyTable.endTime = 0; - CopyTable.versions = -1; - CopyTable.tableName = null; - CopyTable.startRow = null; - CopyTable.stopRow = null; - CopyTable.newTableName = null; - CopyTable.peerAddress = null; - CopyTable.families = null; - CopyTable.allCells = false; - } }