HBASE-11997 CopyTable with bulkload (Yi Deng)

This commit is contained in:
Ted Yu 2014-10-07 19:34:31 +00:00
parent 408de0fbb3
commit e1b69bd548
5 changed files with 192 additions and 65 deletions

View File

@ -21,14 +21,23 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Random;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.GenericOptionsParser;
@ -43,18 +52,22 @@ import org.apache.hadoop.util.ToolRunner;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class CopyTable extends Configured implements Tool { public class CopyTable extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(CopyTable.class);
final static String NAME = "copytable"; final static String NAME = "copytable";
static long startTime = 0; long startTime = 0;
static long endTime = 0; long endTime = 0;
static int versions = -1; int versions = -1;
static String tableName = null; String tableName = null;
static String startRow = null; String startRow = null;
static String stopRow = null; String stopRow = null;
static String newTableName = null; String dstTableName = null;
static String peerAddress = null; String peerAddress = null;
static String families = null; String families = null;
static boolean allCells = false; boolean allCells = false;
boolean bulkload = false;
Path bulkloadDir = null;
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
@ -64,17 +77,17 @@ public class CopyTable extends Configured implements Tool {
/** /**
* Sets up the actual job. * Sets up the actual job.
* *
* @param conf The current configuration.
* @param args The command line parameters. * @param args The command line parameters.
* @return The newly created job. * @return The newly created job.
* @throws IOException When setting up the job fails. * @throws IOException When setting up the job fails.
*/ */
public static Job createSubmittableJob(Configuration conf, String[] args) public Job createSubmittableJob(String[] args)
throws IOException { throws IOException {
if (!doCommandLine(args)) { if (!doCommandLine(args)) {
return null; return null;
} }
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(CopyTable.class); job.setJarByClass(CopyTable.class);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setCacheBlocks(false); scan.setCacheBlocks(false);
@ -116,12 +129,40 @@ public class CopyTable extends Configured implements Tool {
} }
Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
} }
TableMapReduceUtil.initTableMapperJob(tableName, scan,
Import.Importer.class, null, null, job);
TableMapReduceUtil.initTableReducerJob(
newTableName == null ? tableName : newTableName, null, job,
null, peerAddress, null, null);
job.setNumReduceTasks(0); job.setNumReduceTasks(0);
if (bulkload) {
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
null, job);
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
FileSystem fs = FileSystem.get(getConf());
Random rand = new Random();
Path root = new Path(fs.getWorkingDirectory(), "copytable");
fs.mkdirs(root);
while (true) {
bulkloadDir = new Path(root, "" + rand.nextLong());
if (!fs.exists(bulkloadDir)) {
break;
}
}
System.out.println("HFiles will be stored at " + this.bulkloadDir);
HFileOutputFormat2.setOutputPath(job, bulkloadDir);
try (Connection conn = ConnectionFactory.createConnection(getConf());
Table htable = conn.getTable(TableName.valueOf(dstTableName))) {
HFileOutputFormat2.configureIncrementalLoadMap(job, htable);
}
} else {
TableMapReduceUtil.initTableMapperJob(tableName, scan,
Import.Importer.class, null, null, job);
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
null);
}
return job; return job;
} }
@ -152,6 +193,8 @@ public class CopyTable extends Configured implements Tool {
System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
System.err.println(" To keep the same name, just give \"cfName\""); System.err.println(" To keep the same name, just give \"cfName\"");
System.err.println(" all.cells also copy delete markers and deleted cells"); System.err.println(" all.cells also copy delete markers and deleted cells");
System.err.println(" bulkload Write input into HFiles and bulk load to the destination "
+ "table");
System.err.println(); System.err.println();
System.err.println("Args:"); System.err.println("Args:");
System.err.println(" tablename Name of the table to copy"); System.err.println(" tablename Name of the table to copy");
@ -170,7 +213,7 @@ public class CopyTable extends Configured implements Tool {
+ " -Dmapreduce.map.speculative=false"); + " -Dmapreduce.map.speculative=false");
} }
private static boolean doCommandLine(final String[] args) { private boolean doCommandLine(final String[] args) {
// Process command-line args. TODO: Better cmd-line processing // Process command-line args. TODO: Better cmd-line processing
// (but hopefully something not as painful as cli options). // (but hopefully something not as painful as cli options).
if (args.length < 1) { if (args.length < 1) {
@ -217,7 +260,7 @@ public class CopyTable extends Configured implements Tool {
final String newNameArgKey = "--new.name="; final String newNameArgKey = "--new.name=";
if (cmd.startsWith(newNameArgKey)) { if (cmd.startsWith(newNameArgKey)) {
newTableName = cmd.substring(newNameArgKey.length()); dstTableName = cmd.substring(newNameArgKey.length());
continue; continue;
} }
@ -237,6 +280,11 @@ public class CopyTable extends Configured implements Tool {
allCells = true; allCells = true;
continue; continue;
} }
if (cmd.startsWith("--bulkload")) {
bulkload = true;
continue;
}
if (i == args.length-1) { if (i == args.length-1) {
tableName = cmd; tableName = cmd;
@ -245,7 +293,7 @@ public class CopyTable extends Configured implements Tool {
return false; return false;
} }
} }
if (newTableName == null && peerAddress == null) { if (dstTableName == null && peerAddress == null) {
printUsage("At least a new table name or a " + printUsage("At least a new table name or a " +
"peer address must be specified"); "peer address must be specified");
return false; return false;
@ -254,6 +302,16 @@ public class CopyTable extends Configured implements Tool {
printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
return false; return false;
} }
if (bulkload && peerAddress != null) {
printUsage("Remote bulkload is not supported!");
return false;
}
// set dstTableName if necessary
if (dstTableName == null) {
dstTableName = tableName;
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
printUsage("Can't start because " + e.getMessage()); printUsage("Can't start because " + e.getMessage());
@ -276,8 +334,29 @@ public class CopyTable extends Configured implements Tool {
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
Job job = createSubmittableJob(getConf(), otherArgs); Job job = createSubmittableJob(otherArgs);
if (job == null) return 1; if (job == null) return 1;
return job.waitForCompletion(true) ? 0 : 1; if (!job.waitForCompletion(true)) {
LOG.info("Map-reduce job failed!");
if (bulkload) {
LOG.info("Files are not bulkloaded!");
}
return 1;
}
int code = 0;
if (bulkload) {
code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
this.dstTableName});
if (code == 0) {
// bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
// LoadIncrementalHFiles.
FileSystem fs = FileSystem.get(this.getConf());
if (!fs.delete(this.bulkloadDir, true)) {
LOG.error("Deleting folder " + bulkloadDir + " failed!");
code = 1;
}
}
}
return code;
} }
} }

View File

@ -404,6 +404,24 @@ public class HFileOutputFormat2
LOG.info("Incremental table " + Bytes.toString(table.getTableName()) LOG.info("Incremental table " + Bytes.toString(table.getTableName())
+ " output configured."); + " output configured.");
} }
public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
// Set compression algorithms based on column families
configureCompression(table, conf);
configureBloomType(table, conf);
configureBlockSize(table, conf);
configureDataBlockEncoding(table, conf);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + table.getName() + " output configured.");
}
/** /**
* Runs inside the task to deserialize column family to compression algorithm * Runs inside the task to deserialize column family to compression algorithm

View File

@ -22,15 +22,20 @@ import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -45,6 +50,11 @@ implements Configurable {
/** Job parameter that specifies the input table. */ /** Job parameter that specifies the input table. */
public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
/**
* If specified, use start keys of this table to split.
* This is useful when you are preparing data for bulkload.
*/
private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
/** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
* See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
*/ */
@ -103,7 +113,7 @@ implements Configurable {
} catch (Exception e) { } catch (Exception e) {
LOG.error(StringUtils.stringifyException(e)); LOG.error(StringUtils.stringifyException(e));
} }
Scan scan = null; Scan scan = null;
if (conf.get(SCAN) != null) { if (conf.get(SCAN) != null) {
@ -214,4 +224,23 @@ implements Configurable {
} }
} }
@Override
protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
if (conf.get(SPLIT_TABLE) != null) {
TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
try (Connection conn = ConnectionFactory.createConnection(getConf());
RegionLocator rl = conn.getRegionLocator(splitTableName)) {
return rl.getStartEndKeys();
}
}
return super.getStartEndKeys();
}
/**
* Sets split table in map-reduce job.
*/
public static void configureSplitTable(Job job, TableName tableName) {
job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
}
} }

View File

@ -101,8 +101,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
private RegionLocator regionLocator; private RegionLocator regionLocator;
/** The reader scanning the table, can be a custom one. */ /** The reader scanning the table, can be a custom one. */
private TableRecordReader tableRecordReader = null; private TableRecordReader tableRecordReader = null;
/** The reverse DNS lookup cache mapping: IPAddress => HostName */ /** The reverse DNS lookup cache mapping: IPAddress => HostName */
private HashMap<InetAddress, String> reverseDNSCacheMap = private HashMap<InetAddress, String> reverseDNSCacheMap =
new HashMap<InetAddress, String>(); new HashMap<InetAddress, String>();
@ -142,6 +141,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
trr.setTable(table); trr.setTable(table);
return trr; return trr;
} }
protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
return regionLocator.getStartEndKeys();
}
/** /**
* Calculates the splits that will serve as input for the map tasks. The * Calculates the splits that will serve as input for the map tasks. The
@ -160,8 +163,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
} }
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator((HTable) table); RegionSizeCalculator sizeCalculator = new RegionSizeCalculator((HTable) table);
Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); Pair<byte[][], byte[][]> keys = getStartEndKeys();
if (keys == null || keys.getFirst() == null || if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) { keys.getFirst().length == 0) {
HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);

View File

@ -31,13 +31,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager; import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
@ -53,7 +52,6 @@ import org.junit.experimental.categories.Category;
@Category({MapReduceTests.class, LargeTests.class}) @Category({MapReduceTests.class, LargeTests.class})
public class TestCopyTable { public class TestCopyTable {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static MiniHBaseCluster cluster;
private static final byte[] ROW1 = Bytes.toBytes("row1"); private static final byte[] ROW1 = Bytes.toBytes("row1");
private static final byte[] ROW2 = Bytes.toBytes("row2"); private static final byte[] ROW2 = Bytes.toBytes("row2");
private static final String FAMILY_A_STRING = "a"; private static final String FAMILY_A_STRING = "a";
@ -65,7 +63,7 @@ public class TestCopyTable {
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
cluster = TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
TEST_UTIL.startMiniMapReduceCluster(); TEST_UTIL.startMiniMapReduceCluster();
} }
@ -75,12 +73,7 @@ public class TestCopyTable {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
/** private void doCopyTableTest(boolean bulkload) throws Exception {
* Simple end-to-end test
* @throws Exception
*/
@Test
public void testCopyTable() throws Exception {
final TableName TABLENAME1 = TableName.valueOf("testCopyTable1"); final TableName TABLENAME1 = TableName.valueOf("testCopyTable1");
final TableName TABLENAME2 = TableName.valueOf("testCopyTable2"); final TableName TABLENAME2 = TableName.valueOf("testCopyTable2");
final byte[] FAMILY = Bytes.toBytes("family"); final byte[] FAMILY = Bytes.toBytes("family");
@ -98,10 +91,15 @@ public class TestCopyTable {
CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration()); CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration());
assertEquals( int code;
0, if (bulkload) {
copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(), code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
TABLENAME1.getNameAsString() })); "--bulkload", TABLENAME1.getNameAsString() });
} else {
code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
TABLENAME1.getNameAsString() });
}
assertEquals("copy job failed", 0, code);
// verify the data was copied into table 2 // verify the data was copied into table 2
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -117,6 +115,23 @@ public class TestCopyTable {
TEST_UTIL.deleteTable(TABLENAME2); TEST_UTIL.deleteTable(TABLENAME2);
} }
/**
* Simple end-to-end test
* @throws Exception
*/
@Test
public void testCopyTable() throws Exception {
doCopyTableTest(false);
}
/**
* Simple end-to-end test with bulkload.
*/
@Test
public void testCopyTableWithBulkload() throws Exception {
doCopyTableTest(true);
}
@Test @Test
public void testStartStopRow() throws Exception { public void testStartStopRow() throws Exception {
final TableName TABLENAME1 = TableName.valueOf("testStartStopRow1"); final TableName TABLENAME1 = TableName.valueOf("testStartStopRow1");
@ -196,7 +211,6 @@ public class TestCopyTable {
"--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000), "--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000),
"--versions=1", sourceTable }; "--versions=1", sourceTable };
assertNull(t2.get(new Get(ROW1)).getRow()); assertNull(t2.get(new Get(ROW1)).getRow());
clean();
assertTrue(runCopy(args)); assertTrue(runCopy(args));
@ -245,24 +259,8 @@ public class TestCopyTable {
new Configuration(TEST_UTIL.getConfiguration()), args); new Configuration(TEST_UTIL.getConfiguration()), args);
Configuration configuration = opts.getConfiguration(); Configuration configuration = opts.getConfiguration();
args = opts.getRemainingArgs(); args = opts.getRemainingArgs();
clean(); Job job = new CopyTable(configuration).createSubmittableJob(args);
Job job = CopyTable.createSubmittableJob(configuration, args);
job.waitForCompletion(false); job.waitForCompletion(false);
return job.isSuccessful(); return job.isSuccessful();
} }
private void clean() {
CopyTable.startTime = 0;
CopyTable.endTime = 0;
CopyTable.versions = -1;
CopyTable.tableName = null;
CopyTable.startRow = null;
CopyTable.stopRow = null;
CopyTable.newTableName = null;
CopyTable.peerAddress = null;
CopyTable.families = null;
CopyTable.allCells = false;
}
} }