HBASE-7702: Add filtering to Import jobs

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1440717 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jyates 2013-01-30 22:33:55 +00:00
parent 1b5576c693
commit 27a27a05d4
2 changed files with 247 additions and 8 deletions

View File

@ -19,20 +19,31 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
@ -50,10 +61,15 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Import {
private static final Log LOG = LogFactory.getLog(Import.class);
final static String NAME = "import";
final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
private static final Log LOG = LogFactory.getLog(Import.class);
final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
// Optional filter to use for mappers
private static Filter filter;
/**
* A mapper that just writes out KeyValues.
@ -76,6 +92,10 @@ public class Import {
throws IOException {
try {
for (KeyValue kv : value.raw()) {
kv = filterKv(kv);
// skip if we filtered it out
if (kv == null) continue;
context.write(row, convertKv(kv, cfRenameMap));
}
} catch (InterruptedException e) {
@ -86,6 +106,7 @@ public class Import {
@Override
public void setup(Context context) {
cfRenameMap = createCfRenameMap(context.getConfiguration());
filter = instantiateFilter(context.getConfiguration());
}
}
@ -121,6 +142,10 @@ public class Import {
Put put = null;
Delete delete = null;
for (KeyValue kv : result.raw()) {
kv = filterKv(kv);
// skip if we filter it out
if (kv == null) continue;
kv = convertKv(kv, cfRenameMap);
// Deletes and Puts are gathered and written when finished
if (kv.isDelete()) {
@ -149,6 +174,8 @@ public class Import {
public void setup(Context context) {
Configuration conf = context.getConfiguration();
cfRenameMap = createCfRenameMap(conf);
filter = instantiateFilter(conf);
try {
HConnection connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
@ -165,6 +192,77 @@ public class Import {
}
}
/**
* Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
* optionally not include in the job output
* @param conf {@link Configuration} from which to load the filter
* @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
* @throws IllegalArgumentException if the filter is misconfigured
*/
private static Filter instantiateFilter(Configuration conf) {
// get the filter, if it was configured
Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
if (filterClass == null) {
LOG.debug("No configured filter class, accepting all keyvalues.");
return null;
}
LOG.debug("Attempting to create filter:" + filterClass);
try {
Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
return (Filter) m.invoke(null, getFilterArgs(conf));
} catch (IllegalAccessException e) {
LOG.error("Couldn't instantiate filter!", e);
throw new RuntimeException(e);
} catch (SecurityException e) {
LOG.error("Couldn't instantiate filter!", e);
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
LOG.error("Couldn't instantiate filter!", e);
throw new RuntimeException(e);
} catch (IllegalArgumentException e) {
LOG.error("Couldn't instantiate filter!", e);
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
LOG.error("Couldn't instantiate filter!", e);
throw new RuntimeException(e);
}
}
private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
ArrayList<byte[]> args = new ArrayList<byte[]>();
String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
for (String arg : sargs) {
// all the filters' instantiation methods expected quoted args since they are coming from
// the shell, so add them here, though its shouldn't really be needed :-/
args.add(Bytes.toBytes("'" + arg + "'"));
}
return args;
}
/**
* Attempt to filter out the keyvalue
* @param kv {@link KeyValue} on which to apply the filter
* @return <tt>null</tt> if the key should not be written, otherwise returns the original
* {@link KeyValue}
*/
private static KeyValue filterKv(KeyValue kv) {
// apply the filter and skip this kv if the filter doesn't apply
if (filter != null) {
Filter.ReturnCode code = filter.filterKeyValue(kv);
System.out.println("Filter returned:" + code);
// if its not an accept type, then skip this kv
if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
.equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
if (LOG.isDebugEnabled()) {
System.out.println("Skipping key: " + kv + " from filter decision: " + code);
}
return null;
}
}
return kv;
}
// helper: create a new KeyValue based on CF rename map
private static KeyValue convertKv(KeyValue kv, Map<byte[], byte[]> cfRenameMap) {
if(cfRenameMap != null) {
@ -245,10 +343,30 @@ public class Import {
conf.set(CF_RENAME_PROP, sb.toString());
}
/**
* Add a Filter to be instantiated on import
* @param conf Configuration to update (will be passed to the job)
* @param clazz {@link Filter} subclass to instantiate on the server.
* @param args List of arguments to pass to the filter on instantiation
*/
public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
List<String> args) {
conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
// build the param string for the key
StringBuilder builder = new StringBuilder();
for (int i = 0; i < args.size(); i++) {
String arg = args.get(i);
builder.append(arg);
if (i != args.size() - 1) {
builder.append(",");
}
}
conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
}
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
@ -263,6 +381,17 @@ public class Import {
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
// make sure we get the filter in the jars
try {
Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
if (filter != null) {
TableMapReduceUtil.addDependencyJars(conf, filter);
}
} catch (Exception e) {
throw new IOException(e);
}
if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class);
HTable table = new HTable(conf, tableName);
@ -295,6 +424,15 @@ public class Import {
System.err.println("By default Import will load data directly into HBase. To instead generate");
System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
System.err
.println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+ CF_RENAME_PROP + " property. Futher, filters will only use the"
+ "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+ "the KeyValue.");
System.err.println("For performance consider the following options:\n"
+ " -Dmapred.map.tasks.speculative.execution=false\n"
+ " -Dmapred.reduce.tasks.speculative.execution=false");

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -36,11 +39,14 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -322,4 +328,99 @@ public class TestImportExport {
assertEquals(now, res[6].getTimestamp());
t.close();
}
@Test
public void testWithFilter() throws Exception {
String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
UTIL.getHBaseAdmin().createTable(desc);
HTable exportTable = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
Put p = new Put(ROW1);
p.add(FAMILYA, QUAL, now, QUAL);
p.add(FAMILYA, QUAL, now + 1, QUAL);
p.add(FAMILYA, QUAL, now + 2, QUAL);
p.add(FAMILYA, QUAL, now + 3, QUAL);
p.add(FAMILYA, QUAL, now + 4, QUAL);
exportTable.put(p);
String[] args = new String[] { EXPORT_TABLE, OUTPUT_DIR, "1000" };
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(
cluster.getConfiguration()), args);
Configuration conf = opts.getConfiguration();
args = opts.getRemainingArgs();
Job job = Export.createSubmittableJob(conf, args);
job.getConfiguration().set("mapreduce.framework.name", "yarn");
job.waitForCompletion(false);
assertTrue(job.isSuccessful());
String IMPORT_TABLE = "importWithFilter";
desc = new HTableDescriptor(IMPORT_TABLE);
desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
UTIL.getHBaseAdmin().createTable(desc);
HTable importTable = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, OUTPUT_DIR,
"1000" };
opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
conf = opts.getConfiguration();
args = opts.getRemainingArgs();
job = Import.createSubmittableJob(conf, args);
job.getConfiguration().set("mapreduce.framework.name", "yarn");
job.waitForCompletion(false);
assertTrue(job.isSuccessful());
// get the count of the source table for that time range
PrefixFilter filter = new PrefixFilter(ROW1);
int count = getCount(exportTable, filter);
Assert.assertEquals("Unexpected row count between export and import tables", count,
getCount(importTable, null));
// and then test that a broken command doesn't bork everything - easier here because we don't
// need to re-run the export job
args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
"-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
OUTPUT_DIR, "1000" };
opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
conf = opts.getConfiguration();
args = opts.getRemainingArgs();
job = Import.createSubmittableJob(conf, args);
job.getConfiguration().set("mapreduce.framework.name", "yarn");
job.waitForCompletion(false);
assertFalse("Job succeeedd, but it had a non-instantiable filter!", job.isSuccessful());
// cleanup
exportTable.close();
importTable.close();
}
/**
* Count the number of keyvalues in the specified table for the given timerange
* @param start
* @param end
* @param table
* @return
* @throws IOException
*/
private int getCount(HTable table, Filter filter) throws IOException {
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
count += res.size();
}
results.close();
return count;
}
}