diff --git a/CHANGES.txt b/CHANGES.txt
index c2788f3c98f..d6790aa559f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -926,6 +926,7 @@ Release 0.21.0 - Unreleased
HBASE-2782 QOS for META table access
HBASE-3017 More log pruning
HBASE-3022 Change format of enum messages in o.a.h.h.executor package
+ HBASE-3001 Ship dependency jars to the cluster for all jobs
NEW FEATURES
HBASE-1961 HBase EC2 scripts
diff --git a/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
index 41748fe6de3..6e7526b2578 100644
--- a/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ b/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -29,6 +29,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
/**
* Utility for {@link TableMap} and {@link TableReduce}
@@ -59,6 +63,11 @@ public class TableMapReduceUtil {
job.setMapperClass(mapper);
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
+ try {
+ addDependencyJars(job);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
/**
@@ -105,6 +114,7 @@ public class TableMapReduceUtil {
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
+ addDependencyJars(job);
}
/**
@@ -181,4 +191,22 @@ public class TableMapReduceUtil {
public static void setScannerCaching(JobConf job, int batchSize) {
job.setInt("hbase.client.scanner.caching", batchSize);
}
-}
\ No newline at end of file
+
+ /**
+ * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(Job)
+ */
+ public static void addDependencyJars(JobConf job) throws IOException {
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
+ job,
+ org.apache.zookeeper.ZooKeeper.class,
+ com.google.common.base.Function.class,
+ job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(),
+ job.getOutputKeyClass(),
+ job.getOutputValueClass(),
+ job.getPartitionerClass(),
+ job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
+ job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
+ job.getCombinerClass());
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index ff0c542e1d3..22533e14334 100644
--- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -47,9 +47,6 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.zookeeper.ZooKeeper;
-
-import com.google.common.base.Function;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
@@ -81,6 +78,7 @@ public class TableMapReduceUtil {
job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
job.getConfiguration().set(TableInputFormat.SCAN,
convertScanToString(scan));
+ addDependencyJars(job);
}
/**
@@ -192,6 +190,7 @@ public class TableMapReduceUtil {
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
+ addDependencyJars(job);
}
/**
@@ -246,10 +245,11 @@ public class TableMapReduceUtil {
public static void addDependencyJars(Job job) throws IOException {
try {
addDependencyJars(job.getConfiguration(),
- ZooKeeper.class,
- Function.class, // Guava collections
+ org.apache.zookeeper.ZooKeeper.class,
+ com.google.common.base.Function.class,
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
+ job.getInputFormatClass(),
job.getOutputKeyClass(),
job.getOutputValueClass(),
job.getOutputFormatClass(),
@@ -267,38 +267,38 @@ public class TableMapReduceUtil {
*/
public static void addDependencyJars(Configuration conf,
Class... classes) throws IOException {
-
+
FileSystem localFs = FileSystem.getLocal(conf);
Set MapReduce jobs deployed to a MapReduce cluster do not by default have access
to the HBase configuration under As of 0.90.x, HBase will just add its dependency jars to the job
+configuration; the dependencies just need to be available on the local
+ Expand After copying the above change around your cluster (and restarting), this is
-how you would run the PerformanceEvaluation MR job to put up 4 clients (Presumes
-a ready mapreduce cluster):
+Expand When the above runs, internally, the HBase jar finds its zookeeper and
+guava,
+etc., dependencies on the passed
+HADOOP_CLASSPATH and adds the found jars to the mapreduce
+job configuration. See the source at
+{@link TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)}
+for how this is done.
+ The above may not work if you are running your HBase from its build directory;
+i.e. you've done Table of Contents
$HBASE_CONF_DIR
nor to HBase classes.
-You could add hbase-site.xml
to $HADOOP_HOME/conf and add
-hbase jars to the $HADOOP_HOME/lib
and copy these
-changes across your cluster but a cleaner means of adding hbase configuration
-and classes to the cluster CLASSPATH
is by uncommenting
-HADOOP_CLASSPATH
in $HADOOP_HOME/conf/hadoop-env.sh
-adding hbase dependencies here. For example, here is how you would amend
-hadoop-env.sh
adding the
-built hbase jar, zookeeper (needed by hbase client), hbase conf, and the
-PerformanceEvaluation
class from the built hbase test jar to the
-hadoop CLASSPATH
:
+You could add hbase-site.xml
to
+$HADOOP_HOME/conf
and add
+HBase jars to the $HADOOP_HOME/lib
and copy these
+changes across your cluster (or edit conf/hadoop-env.sh and add them to the
+HADOOP_CLASSPATH
variable) but this will pollute your
+hadoop install with HBase references; its also obnoxious requiring restart of
+the hadoop cluster before it'll notice your HBase additions.
+# Extra Java CLASSPATH elements. Optional.
-# export HADOOP_CLASSPATH=
-export HADOOP_CLASSPATH=$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf:${HBASE_HOME}/lib/zookeeper-X.X.X.jar
CLASSPATH
. For example, to run the bundled HBase
+{@link RowCounter} mapreduce job against a table named usertable
,
+type:
-$HBASE_HOME
in the above appropriately to suit your
-local environment.
-
+$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-0.90.0.jar rowcounter usertable
+
$HBASE_HOME
and $HADOOP_HOME
in the above
+appropriately to suit your local environment. The content of HADOOP_CLASSPATH
+is set to the HBase CLASSPATH
via backticking the command
+${HBASE_HOME}/bin/hbase classpath
.
-
+$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4
$ mvn test install
at
+${HBASE_HOME}
and you are now
+trying to use this build in your mapreduce job. If you get
+
+exception thrown, try doing the following:
+java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper
+...
+
+Notice how we preface the backtick invocation setting
+
+$ HADOOP_CLASSPATH=${HBASE_HOME}/target/hbase-0.90.0-SNAPSHOT.jar:`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/target/hbase-0.90.0-SNAPSHOT.jar rowcounter usertable
+
HADOOP_CLASSPATH
with reference to the built HBase jar over in
+the target
directory.
Another possibility, if for example you do not have access to hadoop-env.sh or
-are unable to restart the hadoop cluster, is bundling the hbase jars into a mapreduce
-job jar adding it and its dependencies under the job jar lib/
-directory and the hbase conf into the job jars top-level directory.
-
+
The HBase jar also serves as a Driver for some bundled mapreduce jobs. To +learn about the bundled mapreduce jobs run: +
+$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-0.90.0-SNAPSHOT.jar +An example program must be given as the first argument. +Valid program names are: + copytable: Export a table from local cluster to peer cluster + completebulkload: Complete a bulk data load. + export: Write table data to HDFS. + import: Import data written by Export. + importtsv: Import data in TSV format. + rowcounter: Count rows in HBase table +
org.apache.hadoop.hbase.mapreduce.TestTableMapReduce
unit test.
-Running mapreduce jobs that have hbase as source or sink, you'll need to +
Running mapreduce jobs that have HBase as source or sink, you'll need to specify source/sink table and column names in your configuration.
-Reading from hbase, the TableInputFormat asks hbase for the list of +
Reading from HBase, the TableInputFormat asks HBase for the list of
regions and makes a map-per-region or mapred.map.tasks maps
,
whichever is smaller (If your job only has two maps, up mapred.map.tasks
to a number > number of regions). Maps will run on the adjacent TaskTracker
if you are running a TaskTracer and RegionServer per node.
Writing, it may make sense to avoid the reduce step and write yourself back into
-hbase from inside your map. You'd do this when your job does not need the sort
+HBase from inside your map. You'd do this when your job does not need the sort
and collation that mapreduce does on the map emitted data; on insert,
-hbase 'sorts' so there is no point double-sorting (and shuffling data around
+HBase 'sorts' so there is no point double-sorting (and shuffling data around
your mapreduce cluster) unless you need to. If you do not need the reduce,
you might just have your map emit counts of records processed just so the
framework's report at the end of your job has meaning or set the number of
reduces to zero and use TableOutputFormat. See example code
below. If running the reduce step makes sense in your case, its usually better
-to have lots of reducers so load is spread across the hbase cluster.
There is also a new hbase partitioner that will run as many reducers as +
There is also a new HBase partitioner that will run as many reducers as currently existing regions. The {@link org.apache.hadoop.hbase.mapreduce.HRegionPartitioner} is suitable when your table is large and your upload is not such that it will greatly @@ -110,54 +142,10 @@ partitioner.
If importing into a new table, its possible to by-pass the HBase API and write your content directly to the filesystem properly formatted as HBase data files (HFiles). Your import will run faster, perhaps an order of -magnitude faster if not more. +magnitude faster if not more. For more on how this mechanism works, see +Bulk Loads +documentation.
-You will need to write a MapReduce job. The map task will know how to -pull from your data source. Your reduce task will need to be hooked up to -{@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}. It expects to receive a row id and a value. -The row id must be formatted as a {@link org.apache.hadoop.hbase.io.ImmutableBytesWritable} and the -value as a {@link org.apache.hadoop.hbase.KeyValue} (A KeyValue holds the value for a cell and -its coordinates; row/family/qualifier/timestamp, etc.). Note that you must -specify a timestamp when you create the KeyValue in your map task -otherwise the KeyValue will be created with the default LATEST_TIMESTAMP (Long.MAX_VALUE). -Use System.currentTimeMillis() if your data does not inherently bear a timestamp. -Your reduce task -will also need to emit the KeyValues in order. See {@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer} -for an example reducer that emits KeyValues in order. -
-Most importantly, you will also need to ensure that your MapReduce job
-ensures a total ordering among all keys. MapReduce by default distributes
-keys among reducers using a Partitioner that hashes on the map task output
-key: i.e. the reducer a key ends up in is by default determined as follows
- (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
.
-Keys are sorted by the MapReduce framework before they are passed to the reducer
-BUT the sort is scoped to the particular reducer. Its not a global sort.
-Given the default hash Partitioner, if the keys were 0-4 (inclusive), and you
-had configured two reducers, reducer 0 would have get keys 0, 2 and 4 whereas
-reducer 1 would get keys 1 and 3 (in order). For your bulk import to work,
-the keys need to be ordered so reducer 0 gets keys 0-2 and reducer 1 gets keys
-3-4 (See TotalOrderPartitioner up in hadoop for more on what this means. See
-how it runs a sampler step first. You may need to write one of these).
-To achieve total ordering, you will likely need to write a Partitioner
-that is intimate with your tables key namespace and that knows how
-to distribute keys among the reducers so a total order is maintained. If your
-keys are distributed with some regularity across a defined key space -- i.e.
-you know the start and end keys -- then the
- {@link org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner}
-may be all you need.
-
See org.apache.hadoop.hbase.mapreduce.TestHFileOutputFormat for an example -that puts together {@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer}, -{@link org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner}, and -{@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.
- -HFileOutputFormat writes HFiles. When your MapReduce file finishes, in your
-output directory you will have many HFiles. Run the script bin/loadtable.rb
-to move the files from the MapReduce output directory under hbase. See head of script
-for how to run it. This script
-also adds the new table data to the hbase catalog tables. When the script completes,
-on the next run of the hbase metascanner -- it usually runs every minute -- your
-new table should be visible and populated.