HBASE-10416 Improvements to the import flow (Vasu Mariyala)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1562343 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@ -57,6 +59,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.zookeeper.KeeperException;
* Import data written by {@link Export}.
@ -65,40 +68,38 @@ import org.apache.zookeeper.KeeperException;
public class Import {
private static final Log LOG = LogFactory.getLog(Import.class);
final static String NAME = "import";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
// Optional filter to use for mappers
private static Filter filter;
public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
public final static String TABLE_NAME = "import.table.name";
public final static String WAL_DURABILITY = "import.wal.durability";
* A mapper that just writes out KeyValues.
static class KeyValueImporter
extends TableMapper<ImmutableBytesWritable, KeyValue> {
public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
private Map<byte[], byte[]> cfRenameMap;
private Filter filter;
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
public void map(ImmutableBytesWritable row, Result value,
Context context)
throws IOException {
try {
for (Cell kv : value.rawCells()) {
kv = filterKv(kv);
// skip if we filtered it out
if (kv == null) continue;
// TODO get rid of ensureKeyValue
context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
for (Cell kv : value.rawCells()) {
kv = filterKv(filter, kv);
// skip if we filtered it out
if (kv == null) continue;
// TODO get rid of ensureKeyValue
context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
} catch (InterruptedException e) {
@ -115,18 +116,17 @@ public class Import {
* Write table content out to files in hdfs.
static class Importer
extends TableMapper<ImmutableBytesWritable, Mutation> {
public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
private List<UUID> clusterIds;
private Filter filter;
private Durability durability;
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
public void map(ImmutableBytesWritable row, Result value,
@ -143,32 +143,40 @@ public class Import {
throws IOException, InterruptedException {
Put put = null;
Delete delete = null;
for (Cell kv : result.rawCells()) {
kv = filterKv(kv);
// skip if we filter it out
if (kv == null) continue;
if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
for (Cell kv : result.rawCells()) {
kv = filterKv(filter, kv);
// skip if we filter it out
if (kv == null) continue;
kv = convertKv(kv, cfRenameMap);
// Deletes and Puts are gathered and written when finished
if (CellUtil.isDelete(kv)) {
if (delete == null) {
delete = new Delete(key.get());
kv = convertKv(kv, cfRenameMap);
// Deletes and Puts are gathered and written when finished
if (CellUtil.isDelete(kv)) {
if (delete == null) {
delete = new Delete(key.get());
} else {
if (put == null) {
put = new Put(key.get());
} else {
if (put == null) {
put = new Put(key.get());
if (put != null) {
context.write(key, put);
if (delete != null) {
context.write(key, delete);
if (put != null) {
if (durability != null) {
context.write(key, put);
if (delete != null) {
if (durability != null) {
context.write(key, delete);
@ -177,6 +185,10 @@ public class Import {
Configuration conf = context.getConfiguration();
cfRenameMap = createCfRenameMap(conf);
filter = instantiateFilter(conf);
String durabilityStr = conf.get(WAL_DURABILITY);
if(durabilityStr != null){
durability = Durability.valueOf(durabilityStr.toUpperCase());
// TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
ZooKeeperWatcher zkw = null;
try {
@ -201,18 +213,19 @@ public class Import {
* @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
* @throws IllegalArgumentException if the filter is misconfigured
private static Filter instantiateFilter(Configuration conf) {
// get the filter, if it was configured
public static Filter instantiateFilter(Configuration conf) {
// get the filter, if it was configured
Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
if (filterClass == null) {
LOG.debug("No configured filter class, accepting all keyvalues.");
return null;
LOG.debug("Attempting to create filter:" + filterClass);
String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
try {
Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
return (Filter) m.invoke(null, getFilterArgs(conf));
return (Filter) m.invoke(null, quotedArgs);
} catch (IllegalAccessException e) {
LOG.error("Couldn't instantiate filter!", e);
throw new RuntimeException(e);
@ -231,15 +244,14 @@ public class Import {
private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
ArrayList<byte[]> args = new ArrayList<byte[]>();
String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
for (String arg : sargs) {
private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
for (String stringArg : stringArgs) {
// all the filters' instantiation methods expected quoted args since they are coming from
// the shell, so add them here, though its shouldn't really be needed :-/
args.add(Bytes.toBytes("'" + arg + "'"));
// the shell, so add them here, though it shouldn't really be needed :-/
quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
return args;
return quotedArgs;
@ -248,7 +260,7 @@ public class Import {
* @return <tt>null</tt> if the key should not be written, otherwise returns the original
* {@link KeyValue}
private static Cell filterKv(Cell kv) throws IOException {
public static Cell filterKv(Filter filter, Cell kv) throws IOException {
// apply the filter and skip this kv if the filter doesn't apply
if (filter != null) {
Filter.ReturnCode code = filter.filterKeyValue(kv);
@ -347,22 +359,12 @@ public class Import {
* Add a Filter to be instantiated on import
* @param conf Configuration to update (will be passed to the job)
* @param clazz {@link Filter} subclass to instantiate on the server.
* @param args List of arguments to pass to the filter on instantiation
* @param filterArgs List of arguments to pass to the filter on instantiation
public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
List<String> args) {
List<String> filterArgs) throws IOException {
conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
// build the param string for the key
StringBuilder builder = new StringBuilder();
for (int i = 0; i < args.size(); i++) {
String arg = args.get(i);
if (i != args.size() - 1) {
conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
@ -375,6 +377,7 @@ public class Import {
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
conf.set(TABLE_NAME, tableName);
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
@ -430,12 +433,42 @@ public class Import {
System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+ CF_RENAME_PROP + " property. Futher, filters will only use the"
+ "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+ "the KeyValue.");
+ " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
+ " whether the current row needs to be ignored completely for processing and "
+ " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
+ " the KeyValue.");
System.err.println("For performance consider the following options:\n"
+ " -Dmapred.map.tasks.speculative.execution=false\n"
+ " -Dmapred.reduce.tasks.speculative.execution=false");
+ " -Dmapred.reduce.tasks.speculative.execution=false\n"
+ " -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
+" Allowed values are the supported durability values"
* If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
* need to flush all the regions of the table as the data is held in memory and is also not
* present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
* regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
InterruptedException {
String tableName = conf.get(TABLE_NAME);
HBaseAdmin hAdmin = null;
String durability = conf.get(WAL_DURABILITY);
// Need to flush if the data is written to hbase and skip wal is enabled.
if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
&& Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
try {
hAdmin = new HBaseAdmin(conf);
} finally {
if (hAdmin != null) {
@ -456,6 +489,11 @@ public class Import {
conf.set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
Job job = createSubmittableJob(conf, otherArgs);
boolean isJobSuccessful = job.waitForCompletion(true);
// Flush all the regions of the table
System.exit(job.waitForCompletion(true) ? 0 : 1);
@ -40,11 +40,13 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@ -56,6 +58,10 @@ import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Job;
@ -353,6 +359,13 @@ public class TestImportExport {
p.add(FAMILYA, QUAL, now + 4, QUAL);
// Having another row would actually test the filter.
p = new Put(ROW2);
p.add(FAMILYA, QUAL, now, QUAL);
// Flush the commits.
// Export the simple table
String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
@ -512,7 +525,7 @@ public class TestImportExport {
* parameters into Configuration
public void testAddFilterAndArguments() {
public void testAddFilterAndArguments() throws IOException {
Configuration configuration = new Configuration();
List<String> args = new ArrayList<String>();
@ -524,4 +537,120 @@ public class TestImportExport {
assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
// Create an export table.
String exportTableName = "exporttestDurability";
HTable exportTable = UTIL.createTable(Bytes.toBytes(exportTableName), FAMILYA, 3);
// Insert some data
Put put = new Put(ROW1);
put.add(FAMILYA, QUAL, now, QUAL);
put.add(FAMILYA, QUAL, now + 1, QUAL);
put.add(FAMILYA, QUAL, now + 2, QUAL);
put = new Put(ROW2);
put.add(FAMILYA, QUAL, now, QUAL);
put.add(FAMILYA, QUAL, now + 1, QUAL);
put.add(FAMILYA, QUAL, now + 2, QUAL);
// Run the export
String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
// Create the table for import
String importTableName = "importTestDurability1";
HTable importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
// Register the hlog listener for the import table
TableWALActionListener walListener = new TableWALActionListener(importTableName);
HLog hLog = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL();
// Run the import with SKIP_WAL
args =
new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
importTableName, FQ_OUTPUT_DIR };
//Assert that the wal is not visisted
//Ensure that the count is 2 (only one version of key value is obtained)
assertTrue(getCount(importTable, null) == 2);
// Run the import with the default durability option
importTableName = "importTestDurability2";
importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
walListener = new TableWALActionListener(importTableName);
args = new String[] { importTableName, FQ_OUTPUT_DIR };
//Assert that the wal is visisted
//Ensure that the count is 2 (only one version of key value is obtained)
assertTrue(getCount(importTable, null) == 2);
* This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, HLogKey, WALEdit)} to
* identify that an entry is written to the Write Ahead Log for the given table.
private static class TableWALActionListener implements WALActionsListener {
private String tableName;
private boolean isVisited = false;
public TableWALActionListener(String tableName) {
this.tableName = tableName;
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
// Not interested in this method.
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
// Not interested in this method.
public void preLogArchive(Path oldPath, Path newPath) throws IOException {
// Not interested in this method.
public void postLogArchive(Path oldPath, Path newPath) throws IOException {
// Not interested in this method.
public void logRollRequested() {
// Not interested in this method.
public void logCloseRequested() {
// Not interested in this method.
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
// Not interested in this method.
public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
isVisited = true;
public boolean isWALVisited() {
return isVisited;
Reference in New Issue
Block a user