HBASE-5440 Allow Import to optionally use HFileOutputFormat

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1293101 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-02-24 06:47:29 +00:00
parent e531e2595c
commit a54ec1b7cc
1 changed files with 110 additions and 45 deletions

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
@ -43,7 +45,42 @@ import org.apache.hadoop.util.GenericOptionsParser;
*/
public class Import {
final static String NAME = "import";
public static final String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
/**
* A mapper that just writes out KeyValues.
*/
static class KeyValueImporter
extends TableMapper<ImmutableBytesWritable, KeyValue> {
private Map<byte[], byte[]> cfRenameMap;
/**
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
public void map(ImmutableBytesWritable row, Result value,
Context context)
throws IOException {
try {
for (KeyValue kv : value.raw()) {
context.write(row, convertKv(kv, cfRenameMap));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void setup(Context context) {
cfRenameMap = createCfRenameMap(context.getConfiguration());
}
}
/**
* Write table content out to files in hdfs.
@ -76,26 +113,7 @@ public class Import {
Put put = null;
Delete delete = null;
for (KeyValue kv : result.raw()) {
if(cfRenameMap != null) {
// If there's a rename mapping for this CF, create a new KeyValue
byte[] newCfName = cfRenameMap.get(kv.getFamily());
if(newCfName != null) {
kv = new KeyValue(kv.getBuffer(), // row buffer
kv.getRowOffset(), // row offset
kv.getRowLength(), // row length
newCfName, // CF buffer
0, // CF offset
newCfName.length, // CF length
kv.getBuffer(), // qualifier buffer
kv.getQualifierOffset(), // qualifier offset
kv.getQualifierLength(), // qualifier length
kv.getTimestamp(), // timestamp
KeyValue.Type.codeToType(kv.getType()), // KV Type
kv.getBuffer(), // value buffer
kv.getValueOffset(), // value offset
kv.getValueLength()); // value length
}
}
kv = convertKv(kv, cfRenameMap);
// Deletes and Puts are gathered and written when finished
if (kv.isDelete()) {
if (delete == null) {
@ -119,26 +137,56 @@ public class Import {
@Override
public void setup(Context context) {
// Make a map from sourceCfName to destCfName by parsing a config key
cfRenameMap = null;
String allMappingsPropVal = context.getConfiguration().get(CF_RENAME_PROP);
if(allMappingsPropVal != null) {
// The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
String[] allMappings = allMappingsPropVal.split(",");
for (String mapping: allMappings) {
if(cfRenameMap == null) {
cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
}
String [] srcAndDest = mapping.split(":");
if(srcAndDest.length != 2) {
continue;
}
cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
}
}
cfRenameMap = createCfRenameMap(context.getConfiguration());
}
}
// helper: create a new KeyValue based on CF rename map
private static KeyValue convertKv(KeyValue kv, Map<byte[], byte[]> cfRenameMap) {
if(cfRenameMap != null) {
// If there's a rename mapping for this CF, create a new KeyValue
byte[] newCfName = cfRenameMap.get(kv.getFamily());
if(newCfName != null) {
kv = new KeyValue(kv.getBuffer(), // row buffer
kv.getRowOffset(), // row offset
kv.getRowLength(), // row length
newCfName, // CF buffer
0, // CF offset
newCfName.length, // CF length
kv.getBuffer(), // qualifier buffer
kv.getQualifierOffset(), // qualifier offset
kv.getQualifierLength(), // qualifier length
kv.getTimestamp(), // timestamp
KeyValue.Type.codeToType(kv.getType()), // KV Type
kv.getBuffer(), // value buffer
kv.getValueOffset(), // value offset
kv.getValueLength()); // value length
}
}
return kv;
}
// helper: make a map from sourceCfName to destCfName by parsing a config key
private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
Map<byte[], byte[]> cfRenameMap = null;
String allMappingsPropVal = conf.get(CF_RENAME_PROP);
if(allMappingsPropVal != null) {
// The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
String[] allMappings = allMappingsPropVal.split(",");
for (String mapping: allMappings) {
if(cfRenameMap == null) {
cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
}
String [] srcAndDest = mapping.split(":");
if(srcAndDest.length != 2) {
continue;
}
cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
}
}
return cfRenameMap;
}
/**
* <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
* the mapper how to rename column families.
@ -190,11 +238,25 @@ public class Import {
job.setJarByClass(Importer.class);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(Importer.class);
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class);
HTable table = new HTable(conf, tableName);
job.setReducerClass(KeyValueSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat.configureIncrementalLoad(job, table);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Preconditions.class);
} else {
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat.
job.setMapperClass(Importer.class);
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
}
return job;
}
@ -205,7 +267,10 @@ public class Import {
if (errorMsg != null && errorMsg.length() > 0) {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: Import <tablename> <inputdir>");
System.err.println("Usage: Import [options] <tablename> <inputdir>");
System.err.println("By default Import will load data directly into HBase. To instead generate");
System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
}
/**