From 27a27a05d41756917d8bdb4fc7961a043ba45064 Mon Sep 17 00:00:00 2001 From: jyates Date: Wed, 30 Jan 2013 22:33:55 +0000 Subject: [PATCH] HBASE-7702: Add filtering to Import jobs git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1440717 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/mapreduce/Import.java | 154 +++++++++++++++++- .../hbase/mapreduce/TestImportExport.java | 101 ++++++++++++ 2 files changed, 247 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index ad7815014dd..6155043534d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -19,20 +19,31 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; @@ -50,10 +61,15 @@ import org.apache.zookeeper.KeeperException; @InterfaceAudience.Public @InterfaceStability.Stable public class Import { + private static final Log LOG = LogFactory.getLog(Import.class); final static String NAME = "import"; final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output"; - private static final Log LOG = LogFactory.getLog(Import.class); + final static String FILTER_CLASS_CONF_KEY = "import.filter.class"; + final static String FILTER_ARGS_CONF_KEY = "import.filter.args"; + + // Optional filter to use for mappers + private static Filter filter; /** * A mapper that just writes out KeyValues. @@ -76,6 +92,10 @@ public class Import { throws IOException { try { for (KeyValue kv : value.raw()) { + kv = filterKv(kv); + // skip if we filtered it out + if (kv == null) continue; + context.write(row, convertKv(kv, cfRenameMap)); } } catch (InterruptedException e) { @@ -86,6 +106,7 @@ public class Import { @Override public void setup(Context context) { cfRenameMap = createCfRenameMap(context.getConfiguration()); + filter = instantiateFilter(context.getConfiguration()); } } @@ -121,6 +142,10 @@ public class Import { Put put = null; Delete delete = null; for (KeyValue kv : result.raw()) { + kv = filterKv(kv); + // skip if we filter it out + if (kv == null) continue; + kv = convertKv(kv, cfRenameMap); // Deletes and Puts are gathered and written when finished if (kv.isDelete()) { @@ -149,6 +174,8 @@ public class Import { public void setup(Context context) { Configuration conf = context.getConfiguration(); cfRenameMap = createCfRenameMap(conf); + filter = instantiateFilter(conf); + try { HConnection connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); @@ -165,6 +192,77 @@ public class Import { } } + /** + * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to + * optionally not include in the job output + * @param conf {@link Configuration} from which to load the filter + * @return the filter to use for the task, or null if no filter to should be used + * @throws IllegalArgumentException if the filter is misconfigured + */ + private static Filter instantiateFilter(Configuration conf) { + // get the filter, if it was configured + Class filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); + if (filterClass == null) { + LOG.debug("No configured filter class, accepting all keyvalues."); + return null; + } + LOG.debug("Attempting to create filter:" + filterClass); + + try { + Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class); + return (Filter) m.invoke(null, getFilterArgs(conf)); + } catch (IllegalAccessException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } catch (SecurityException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } catch (NoSuchMethodException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } catch (IllegalArgumentException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } + } + + private static ArrayList getFilterArgs(Configuration conf) { + ArrayList args = new ArrayList(); + String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY); + for (String arg : sargs) { + // all the filters' instantiation methods expected quoted args since they are coming from + // the shell, so add them here, though its shouldn't really be needed :-/ + args.add(Bytes.toBytes("'" + arg + "'")); + } + return args; + } + + /** + * Attempt to filter out the keyvalue + * @param kv {@link KeyValue} on which to apply the filter + * @return null if the key should not be written, otherwise returns the original + * {@link KeyValue} + */ + private static KeyValue filterKv(KeyValue kv) { + // apply the filter and skip this kv if the filter doesn't apply + if (filter != null) { + Filter.ReturnCode code = filter.filterKeyValue(kv); + System.out.println("Filter returned:" + code); + // if its not an accept type, then skip this kv + if (!(code.equals(Filter.ReturnCode.INCLUDE) || code + .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) { + if (LOG.isDebugEnabled()) { + System.out.println("Skipping key: " + kv + " from filter decision: " + code); + } + return null; + } + } + return kv; + } + // helper: create a new KeyValue based on CF rename map private static KeyValue convertKv(KeyValue kv, Map cfRenameMap) { if(cfRenameMap != null) { @@ -244,13 +342,33 @@ public class Import { } conf.set(CF_RENAME_PROP, sb.toString()); } - + + /** + * Add a Filter to be instantiated on import + * @param conf Configuration to update (will be passed to the job) + * @param clazz {@link Filter} subclass to instantiate on the server. + * @param args List of arguments to pass to the filter on instantiation + */ + public static void addFilterAndArguments(Configuration conf, Class clazz, + List args) { + conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName()); + + // build the param string for the key + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < args.size(); i++) { + String arg = args.get(i); + builder.append(arg); + if (i != args.size() - 1) { + builder.append(","); + } + } + conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString()); + } /** * Sets up the actual job. - * - * @param conf The current configuration. - * @param args The command line parameters. + * @param conf The current configuration. + * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ @@ -263,6 +381,17 @@ public class Import { FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(SequenceFileInputFormat.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + + // make sure we get the filter in the jars + try { + Class filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); + if (filter != null) { + TableMapReduceUtil.addDependencyJars(conf, filter); + } + } catch (Exception e) { + throw new IOException(e); + } + if (hfileOutPath != null) { job.setMapperClass(KeyValueImporter.class); HTable table = new HTable(conf, tableName); @@ -295,6 +424,15 @@ public class Import { System.err.println("By default Import will load data directly into HBase. To instead generate"); System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err + .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use"); + System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "="); + System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=