HBASE-3001 Ship dependency jars to the cluster for all jobs

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1000261 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-09-22 22:16:23 +00:00
parent a46c74894f
commit 17da1d0369
4 changed files with 114 additions and 97 deletions

View File

@ -926,6 +926,7 @@ Release 0.21.0 - Unreleased
HBASE-2782 QOS for META table access
HBASE-3017 More log pruning
HBASE-3022 Change format of enum messages in o.a.h.h.executor package
HBASE-3001 Ship dependency jars to the cluster for all jobs
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -29,6 +29,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
* Utility for {@link TableMap} and {@link TableReduce}
@ -59,6 +63,11 @@ public class TableMapReduceUtil {
job.setMapperClass(mapper);
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
try {
addDependencyJars(job);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
@ -105,6 +114,7 @@ public class TableMapReduceUtil {
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
addDependencyJars(job);
}
/**
@ -181,4 +191,22 @@ public class TableMapReduceUtil {
public static void setScannerCaching(JobConf job, int batchSize) {
job.setInt("hbase.client.scanner.caching", batchSize);
}
}
/**
* @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(Job)
*/
public static void addDependencyJars(JobConf job) throws IOException {
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
job,
org.apache.zookeeper.ZooKeeper.class,
com.google.common.base.Function.class,
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
job.getOutputKeyClass(),
job.getOutputValueClass(),
job.getPartitionerClass(),
job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
job.getCombinerClass());
}
}

View File

@ -47,9 +47,6 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.ZooKeeper;
import com.google.common.base.Function;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
@ -81,6 +78,7 @@ public class TableMapReduceUtil {
job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
job.getConfiguration().set(TableInputFormat.SCAN,
convertScanToString(scan));
addDependencyJars(job);
}
/**
@ -192,6 +190,7 @@ public class TableMapReduceUtil {
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
addDependencyJars(job);
}
/**
@ -246,10 +245,11 @@ public class TableMapReduceUtil {
public static void addDependencyJars(Job job) throws IOException {
try {
addDependencyJars(job.getConfiguration(),
ZooKeeper.class,
Function.class, // Guava collections
org.apache.zookeeper.ZooKeeper.class,
com.google.common.base.Function.class,
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
job.getInputFormatClass(),
job.getOutputKeyClass(),
job.getOutputValueClass(),
job.getOutputFormatClass(),
@ -267,38 +267,38 @@ public class TableMapReduceUtil {
*/
public static void addDependencyJars(Configuration conf,
Class... classes) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
Set<String> jars = new HashSet<String>();
// Add jars that are already in the tmpjars variable
jars.addAll( conf.getStringCollection("tmpjars") );
// Add jars containing the specified classes
for (Class clazz : classes) {
if (clazz == null) continue;
String pathStr = findContainingJar(clazz);
if (pathStr == null) {
LOG.warn("Could not find jar for class " + clazz +
" in order to ship it to the cluster.");
" in order to ship it to the cluster.");
continue;
}
Path path = new Path(pathStr);
if (!localFs.exists(path)) {
LOG.warn("Could not validate jar file " + path + " for class "
+ clazz);
+ clazz);
continue;
}
jars.add(path.makeQualified(localFs).toString());
}
if (jars.isEmpty()) return;
String tmpJars = conf.get("tmpjars");
if (tmpJars == null) {
tmpJars = StringUtils.arrayToString(jars.toArray(new String[0]));
} else {
tmpJars += "," + StringUtils.arrayToString(jars.toArray(new String[0]));
}
conf.set("tmpjars", tmpJars);
conf.set("tmpjars",
StringUtils.arrayToString(jars.toArray(new String[0])));
}
/**
* Find a jar that contains a class of the same name, if any.
* It will return a jar file, even if that is not the first thing

View File

@ -1,5 +1,5 @@
/*
* Copyright 2008 The Apache Software Foundation
* Copyright 20010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -24,6 +24,7 @@ Input/OutputFormats, a table indexing MapReduce job, and utility
<h2>Table of Contents</h2>
<ul>
<li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
<li><a href="#driver">Bundled HBase MapReduce Jobs</a></li>
<li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
<li><a href="#bulk">Bulk Import writing HFiles directly</a></li>
<li><a href="#examples">Example Code</a></li>
@ -33,36 +34,67 @@ Input/OutputFormats, a table indexing MapReduce job, and utility
<p>MapReduce jobs deployed to a MapReduce cluster do not by default have access
to the HBase configuration under <code>$HBASE_CONF_DIR</code> nor to HBase classes.
You could add <code>hbase-site.xml</code> to $HADOOP_HOME/conf and add
hbase jars to the <code>$HADOOP_HOME/lib</code> and copy these
changes across your cluster but a cleaner means of adding hbase configuration
and classes to the cluster <code>CLASSPATH</code> is by uncommenting
<code>HADOOP_CLASSPATH</code> in <code>$HADOOP_HOME/conf/hadoop-env.sh</code>
adding hbase dependencies here. For example, here is how you would amend
<code>hadoop-env.sh</code> adding the
built hbase jar, zookeeper (needed by hbase client), hbase conf, and the
<code>PerformanceEvaluation</code> class from the built hbase test jar to the
hadoop <code>CLASSPATH</code>:
You could add <code>hbase-site.xml</code> to
<code>$HADOOP_HOME/conf</code> and add
HBase jars to the <code>$HADOOP_HOME/lib</code> and copy these
changes across your cluster (or edit conf/hadoop-env.sh and add them to the
<code>HADOOP_CLASSPATH</code> variable) but this will pollute your
hadoop install with HBase references; its also obnoxious requiring restart of
the hadoop cluster before it'll notice your HBase additions.</p>
<blockquote><pre># Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH=
export HADOOP_CLASSPATH=$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf:${HBASE_HOME}/lib/zookeeper-X.X.X.jar</pre></blockquote>
<p>As of 0.90.x, HBase will just add its dependency jars to the job
configuration; the dependencies just need to be available on the local
<code>CLASSPATH</code>. For example, to run the bundled HBase
{@link RowCounter} mapreduce job against a table named <code>usertable</code>,
type:
<p>Expand <code>$HBASE_HOME</code> in the above appropriately to suit your
local environment.</p>
<blockquote><pre>
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-0.90.0.jar rowcounter usertable
</pre></blockquote>
<p>After copying the above change around your cluster (and restarting), this is
how you would run the PerformanceEvaluation MR job to put up 4 clients (Presumes
a ready mapreduce cluster):
Expand <code>$HBASE_HOME</code> and <code>$HADOOP_HOME</code> in the above
appropriately to suit your local environment. The content of <code>HADOOP_CLASSPATH</code>
is set to the HBase <code>CLASSPATH</code> via backticking the command
<code>${HBASE_HOME}/bin/hbase classpath</code>.
<blockquote><pre>$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4</pre></blockquote>
<p>When the above runs, internally, the HBase jar finds its zookeeper and
<a href="http://code.google.com/p/guava-libraries/">guava</a>,
etc., dependencies on the passed
</code>HADOOP_CLASSPATH</code> and adds the found jars to the mapreduce
job configuration. See the source at
{@link TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)}
for how this is done.
</p>
<p>The above may not work if you are running your HBase from its build directory;
i.e. you've done <code>$ mvn test install</code> at
<code>${HBASE_HOME}</code> and you are now
trying to use this build in your mapreduce job. If you get
<blockquote><pre>java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper
...
</pre></blockquote>
exception thrown, try doing the following:
<blockquote><pre>
$ HADOOP_CLASSPATH=${HBASE_HOME}/target/hbase-0.90.0-SNAPSHOT.jar:`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/target/hbase-0.90.0-SNAPSHOT.jar rowcounter usertable
</pre></blockquote>
Notice how we preface the backtick invocation setting
<code>HADOOP_CLASSPATH</code> with reference to the built HBase jar over in
the <code>target</code> directory.
</p>
<p>Another possibility, if for example you do not have access to hadoop-env.sh or
are unable to restart the hadoop cluster, is bundling the hbase jars into a mapreduce
job jar adding it and its dependencies under the job jar <code>lib/</code>
directory and the hbase conf into the job jars top-level directory.
</a>
<h2><a name="driver">Bundled HBase MapReduce Jobs</a></h2>
<p>The HBase jar also serves as a Driver for some bundled mapreduce jobs. To
learn about the bundled mapreduce jobs run:
<blockquote><pre>
$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-0.90.0-SNAPSHOT.jar
An example program must be given as the first argument.
Valid program names are:
copytable: Export a table from local cluster to peer cluster
completebulkload: Complete a bulk data load.
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
</pre></blockquote>
<h2><a name="sink">HBase as MapReduce job data source and sink</a></h2>
@ -79,26 +111,26 @@ involved example, see {@link org.apache.hadoop.hbase.mapreduce.RowCounter}
or review the <code>org.apache.hadoop.hbase.mapreduce.TestTableMapReduce</code> unit test.
</p>
<p>Running mapreduce jobs that have hbase as source or sink, you'll need to
<p>Running mapreduce jobs that have HBase as source or sink, you'll need to
specify source/sink table and column names in your configuration.</p>
<p>Reading from hbase, the TableInputFormat asks hbase for the list of
<p>Reading from HBase, the TableInputFormat asks HBase for the list of
regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
whichever is smaller (If your job only has two maps, up mapred.map.tasks
to a number &gt; number of regions). Maps will run on the adjacent TaskTracker
if you are running a TaskTracer and RegionServer per node.
Writing, it may make sense to avoid the reduce step and write yourself back into
hbase from inside your map. You'd do this when your job does not need the sort
HBase from inside your map. You'd do this when your job does not need the sort
and collation that mapreduce does on the map emitted data; on insert,
hbase 'sorts' so there is no point double-sorting (and shuffling data around
HBase 'sorts' so there is no point double-sorting (and shuffling data around
your mapreduce cluster) unless you need to. If you do not need the reduce,
you might just have your map emit counts of records processed just so the
framework's report at the end of your job has meaning or set the number of
reduces to zero and use TableOutputFormat. See example code
below. If running the reduce step makes sense in your case, its usually better
to have lots of reducers so load is spread across the hbase cluster.</p>
to have lots of reducers so load is spread across the HBase cluster.</p>
<p>There is also a new hbase partitioner that will run as many reducers as
<p>There is also a new HBase partitioner that will run as many reducers as
currently existing regions. The
{@link org.apache.hadoop.hbase.mapreduce.HRegionPartitioner} is suitable
when your table is large and your upload is not such that it will greatly
@ -110,54 +142,10 @@ partitioner.
<p>If importing into a new table, its possible to by-pass the HBase API
and write your content directly to the filesystem properly formatted as
HBase data files (HFiles). Your import will run faster, perhaps an order of
magnitude faster if not more.
magnitude faster if not more. For more on how this mechanism works, see
<a href="http://hbase.apache.org/docs/current/bulk-loads.html">Bulk Loads</code>
documentation.
</p>
<p>You will need to write a MapReduce job. The map task will know how to
pull from your data source. Your reduce task will need to be hooked up to
{@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}. It expects to receive a row id and a value.
The row id must be formatted as a {@link org.apache.hadoop.hbase.io.ImmutableBytesWritable} and the
value as a {@link org.apache.hadoop.hbase.KeyValue} (A KeyValue holds the value for a cell and
its coordinates; row/family/qualifier/timestamp, etc.). Note that you must
specify a timestamp when you create the KeyValue in your map task
otherwise the KeyValue will be created with the default LATEST_TIMESTAMP (Long.MAX_VALUE).
Use System.currentTimeMillis() if your data does not inherently bear a timestamp.
Your reduce task
will also need to emit the KeyValues in order. See {@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer}
for an example reducer that emits KeyValues in order.
</p>
<p>Most importantly, you will also need to ensure that your MapReduce job
ensures a total ordering among all keys. MapReduce by default distributes
keys among reducers using a Partitioner that hashes on the map task output
key: i.e. the reducer a key ends up in is by default determined as follows
<code> (key.hashCode() &amp; Integer.MAX_VALUE) % numReduceTasks</code>.
Keys are sorted by the MapReduce framework before they are passed to the reducer
BUT the sort is scoped to the particular reducer. Its not a global sort.
Given the default hash Partitioner, if the keys were 0-4 (inclusive), and you
had configured two reducers, reducer 0 would have get keys 0, 2 and 4 whereas
reducer 1 would get keys 1 and 3 (in order). For your bulk import to work,
the keys need to be ordered so reducer 0 gets keys 0-2 and reducer 1 gets keys
3-4 (See TotalOrderPartitioner up in hadoop for more on what this means. See
how it runs a sampler step first. You may need to write one of these).
To achieve total ordering, you will likely need to write a Partitioner
that is intimate with your tables key namespace and that knows how
to distribute keys among the reducers so a total order is maintained. If your
keys are distributed with some regularity across a defined key space -- i.e.
you know the start and end keys -- then the
{@link org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner}
may be all you need.
</p>
<p>See org.apache.hadoop.hbase.mapreduce.TestHFileOutputFormat for an example
that puts together {@link org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer},
{@link org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner}, and
{@link org.apache.hadoop.hbase.mapreduce.HFileOutputFormat}.</p>
<p>HFileOutputFormat writes HFiles. When your MapReduce file finishes, in your
output directory you will have many HFiles. Run the script <code>bin/loadtable.rb</code>
to move the files from the MapReduce output directory under hbase. See head of script
for how to run it. This script
also adds the new table data to the hbase catalog tables. When the script completes,
on the next run of the hbase metascanner -- it usually runs every minute -- your
new table should be visible and populated.</p>
<h2><a name="examples">Example Code</a></h2>
<h3>Sample Row Counter</h3>