HBASE-987 We need a Hbase Partitioner for TableMapReduceUtil.initTableReduceJob MR Jobs

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@712956 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-11-11 06:22:58 +00:00
parent 4171e16016
commit 2492b8b659
4 changed files with 430 additions and 18 deletions

View File

@ -109,6 +109,8 @@ Release 0.19.0 - Unreleased
(Samuel Guo via Stack)
HBASE-722 Shutdown and Compactions
HBASE-983 Declare Perl namespace in Hbase.thrift
HBASE-987 We need a Hbase Partitioner for TableMapReduceUtil.initTableReduceJob
MR Jobs (Billy Pearson via Stack)
NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]

View File

@ -0,0 +1,89 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
/**
* This is used to partition the output keys into groups of keys.
* Keys are grouped according to the regions that currently exist
* so that each reducer fills a single region so load is distributed.
*/
public class HRegionPartitioner<K2,V2>
implements Partitioner<ImmutableBytesWritable, V2> {
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
private HTable table;
private byte[][] startKeys;
public void configure(JobConf job) {
try {
this.table = new HTable(new HBaseConfiguration(job),
job.get(TableOutputFormat.OUTPUT_TABLE));
} catch (IOException e) {
LOG.error(e);
}
try {
this.startKeys = this.table.getStartKeys();
} catch (IOException e) {
LOG.error(e);
}
}
@Override
public int getPartition(ImmutableBytesWritable key, V2 value,
int numPartitions) {
byte[] region = null;
// Only one region return 0
if (this.startKeys.length == 1){
return 0;
}
try {
// Not sure if this is cached after a split so we could have problems
// here if a region splits while mapping
region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey();
} catch (IOException e) {
LOG.error(e);
}
for (int i = 0; i < this.startKeys.length; i++){
if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
if (i >= numPartitions-1){
// cover if we have less reduces then regions.
return (Integer.toString(i).hashCode()
& Integer.MAX_VALUE) % numPartitions;
} else {
return i;
}
}
}
// if above fails to find start key that match we need to return something
return 0;
}
}

View File

@ -1,5 +1,28 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
@ -7,6 +30,9 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
/**
* Utility for {@link TableMap} and {@link TableReduce}
*/
@SuppressWarnings("unchecked")
public class TableMapReduceUtil {
/**
@ -40,13 +66,39 @@ public class TableMapReduceUtil {
* @param table
* @param reducer
* @param job
* @throws IOException
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job) {
initTableReduceJob(table, reducer, job, null);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table
* @param reducer
* @param job
* @param partitioner Partitioner to use. Pass null to use default
* partitioner.
* @throws IOException
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
throws IOException {
job.setOutputFormat(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(BatchUpdate.class);
if (partitioner != null) {
job.setPartitionerClass(HRegionPartitioner.class);
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumReduceTasks() > regions){
job.setNumReduceTasks(outputTable.getRegionsInfo().size());
}
}
}
}

View File

@ -26,11 +26,10 @@ Input/OutputFormats, a table indexing MapReduce job, and utility
<p>MapReduce jobs deployed to a MapReduce cluster do not by default have access
to the HBase configuration under <code>$HBASE_CONF_DIR</code> nor to HBase classes.
You could add <code>hbase-site.xml</code> to $HADOOP_HOME/conf and add
<code>hbase-X.X.X.jar</code>
to the <code>$HADOOP_HOME/lib</code> and copy these changes across your cluster but the
cleanest means of adding hbase configuration and classes to the cluster
<code>CLASSPATH</code>
is by uncommenting <code>HADOOP_CLASSPATH</code> in <code>$HADOOP_HOME/conf/hadoop-env.sh</code>
<code>hbase-X.X.X.jar</code> to the <code>$HADOOP_HOME/lib</code> and copy these
changes across your cluster but the cleanest means of adding hbase configuration
and classes to the cluster <code>CLASSPATH</code> is by uncommenting
<code>HADOOP_CLASSPATH</code> in <code>$HADOOP_HOME/conf/hadoop-env.sh</code>
and adding the path to the hbase jar and <code>$HBASE_CONF_DIR</code> directory.
Then copy the amended configuration around the cluster.
You'll probably need to restart the MapReduce cluster if you want it to notice
@ -45,7 +44,7 @@ classes to the hadoop <code>CLASSPATH<code>
# export HADOOP_CLASSPATH=
export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf</pre></blockquote>
<p>Expand $HBASE_HOME appropriately in the in accordance with your local environment.</p>
<p>Expand $HBASE_HOME in the above appropriately to suit your local environment.</p>
<p>This is how you would run the PerformanceEvaluation MR job to put up 4 clients:
@ -53,6 +52,8 @@ export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar
The PerformanceEvaluation class wil be found on the CLASSPATH because you added $HBASE_HOME/build/test to HADOOP_CLASSPATH
</p>
<p>NOTE: While previous it used to be possible to bundle the hbase.jar up inside the job jar you submit to hadoop, as of
0.2.0RC2, this is no longer so. See <a href="https://issues.apache.org/jira/browse/HBASE-797">HBASE-797</a>.
<h2>HBase as MapReduce job data source and sink</h2>
@ -70,9 +71,18 @@ or review the <code>org.apache.hadoop.hbase.mapred.TestTableMapReduce</code> uni
<p>Running mapreduce jobs that have hbase as source or sink, you'll need to
specify source/sink table and column names in your configuration.</p>
<p>Reading from hbase, the !TableInputFormat asks hbase for the list of
regions and makes a map-per-region. Writing, its better to have lots of
reducers so load is spread across the hbase cluster.
<p>Reading from hbase, the TableInputFormat asks hbase for the list of
regions and makes a map-per-region.
Writing, it may make sense to avoid the reduce step and write back into hbase from inside your map. You'd do this when your job does not need the sort and collation that MR does inside in its reduce; on insert,
hbase sorts so no point double-sorting (and shuffling data around your MR cluster) unless you need to. If you do not need the reduce, you might just have your map emit counts of records processed just so the
framework can emit that nice report of records processed when the job is done. See example code below. If running the reduce step makes sense in your case, its better to have lots of reducers so load is spread
across the hbase cluster.</p>
<p>There is also a partitioner
that will run as many reducers as currently existing regions. The
partitioner HRegionPartitioner is suitable when your table is large
and your upload is not such that it will greatly alter the number of existing
regions after its done.
</p>
<h2>Example Code</h2>
@ -83,9 +93,269 @@ the hbase MapReduce Driver class. Select 'rowcounter' from the choice of jobs
offered.
</p>
<h3> Sample MR Bulk Uploader </h3>
<p>Read the class comment below for specification of inputs, prerequisites, etc.
<h3>Example to bulk import/load a text file into an HTable
</h3>
<p>Here's a sample program from [WWW] Allen Day that takes an HDFS text file path and an HBase table name as inputs, and loads the contents of the text file to the table.
</p>
<blockquote><pre>package com.spicylogic.hbase;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class BulkImport implements Tool {
private static final String NAME = "BulkImport";
private Configuration conf;
public static class InnerMap extends MapReduceBase implements Mapper&lt;LongWritable, Text, Text, Text> {
private HTable table;
private HBaseConfiguration HBconf;
public void map(LongWritable key, Text value, OutputCollector&lt;&lt;&lt;&lt;&lt;&lt;&lt;&lt;Text, Text> output, Reporter reporter) throws IOException {
if ( table == null )
throw new IOException("table is null");
String [] splits = value.toString().split("\t");
if ( splits.length != 4 )
return;
String rowID = splits[0];
int timestamp = Integer.parseInt( splits[1] );
String colID = splits[2];
String cellValue = splits[3];
reporter.setStatus("Map emitting cell for row='" + rowID + "', column='" + colID + "', time='" + timestamp + "'");
BatchUpdate bu = new BatchUpdate( rowID );
if ( timestamp > 0 )
bu.setTimestamp( timestamp );
bu.put(colID, cellValue.getBytes());
table.commit( bu );
}
public void configure(JobConf job) {
HBconf = new HBaseConfiguration();
try {
table = new HTable( HBconf, job.get("input.table") );
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public JobConf createSubmittableJob(String[] args) {
JobConf c = new JobConf(getConf(), BulkImport.class);
c.setJobName(NAME);
c.setInputPath(new Path(args[0]));
c.set("input.table", args[1]);
c.setMapperClass(InnerMap.class);
c.setNumReduceTasks(0);
c.setOutputFormat(NullOutputFormat.class);
return c;
}
static int printUsage() {
System.err.println("Usage: " + NAME + " &lt;input> &lt;table_name>");
System.err.println("\twhere &lt;input> is a tab-delimited text file with 4 columns.");
System.err.println("\t\tcolumn 1 = row ID");
System.err.println("\t\tcolumn 2 = timestamp (use a negative value for current time)");
System.err.println("\t\tcolumn 3 = column ID");
System.err.println("\t\tcolumn 4 = cell value");
return -1;
}
public int run(@SuppressWarnings("unused") String[] args) throws Exception {
// Make sure there are exactly 3 parameters left.
if (args.length != 2) {
return printUsage();
}
JobClient.runJob(createSubmittableJob(args));
return 0;
}
public Configuration getConf() {
return this.conf;
}
public void setConf(final Configuration c) {
this.conf = c;
}
public static void main(String[] args) throws Exception {
int errCode = ToolRunner.run(new Configuration(), new BulkImport(), args);
System.exit(errCode);
}
}
</pre></blockquote>
<h3>Example to map rows/column families between two HTables
</h3>
<p>Here another sample program from [WWW] Allen Day that will iterate over all rows in one table for specified column families and insert those rows/columns to a second table.
</p>
<blockquote><pre>package com.spicylogic.hbase;
import java.io.IOException;
public class BulkCopy extends TableMap&lt;Text, Text> implements Tool {
static final String NAME = "bulkcopy";
private Configuration conf;
public void map(ImmutableBytesWritable row, RowResult value, OutputCollector&lt;Text, Text> output, Reporter reporter) throws IOException {
HTable table = new HTable(new HBaseConfiguration(), conf.get("output.table"));
if ( table == null ) {
throw new IOException("output table is null");
}
BatchUpdate bu = new BatchUpdate( row.get() );
boolean content = false;
for (Map.Entry&lt;byte [], Cell> e: value.entrySet()) {
Cell cell = e.getValue();
if (cell != null && cell.getValue().length > 0) {
bu.put(e.getKey(), cell.getValue());
}
}
table.commit( bu );
}
public JobConf createSubmittableJob(String[] args) throws IOException {
JobConf c = new JobConf(getConf(), BulkExport.class);
//table = new HTable(new HBaseConfiguration(), args[2]);
c.set("output.table", args[2]);
c.setJobName(NAME);
// Columns are space delimited
StringBuilder sb = new StringBuilder();
final int columnoffset = 3;
for (int i = columnoffset; i &lt; args.length; i++) {
if (i > columnoffset) {
sb.append(" ");
}
sb.append(args[i]);
}
// Second argument is the table name.
TableMap.initJob(args[1], sb.toString(), this.getClass(),
Text.class, Text.class, c);
c.setReducerClass(IdentityReducer.class);
// First arg is the output directory.
c.setOutputPath(new Path(args[0]));
return c;
}
static int printUsage() {
System.out.println(NAME +" &lt;outputdir> &lt;input tablename> &lt;output tablename> &lt;column1> [&lt;column2>...]");
return -1;
}
public int run(final String[] args) throws Exception {
// Make sure there are at least 3 parameters
if (args.length &lt; 3) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
return printUsage();
}
JobClient.runJob(createSubmittableJob(args));
return 0;
}
public Configuration getConf() {
return this.conf;
}
public void setConf(final Configuration c) {
this.conf = c;
}
public static void main(String[] args) throws Exception {
//String[] aa = {"/tmp/foobar", "M2", "M3", "R:"};
int errCode = ToolRunner.run(new HBaseConfiguration(), new BulkCopy(), args);
System.exit(errCode);
}
}
</pre></blockquote>
<h3>Sample running HBase inserts out of Map Task
</h3>
<p>Here's sample code from Andrew Purtell that does HBase insert inside in the mapper rather than via TableReduce.
</p>
<blockquote><pre>
public class MyMap
extends TableMap&lt;ImmutableBytesWritable,MapWritable> // or whatever
{
private HTable table;
public void configure(JobConf job) {
super.configure(job);
try {
HBaseConfiguration conf = new HBaseConfiguration(job);
table = new HTable(conf, "mytable");
} catch (Exception) {
// can't do anything about this now
}
}
public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector&lt;ImmutableBytesWritable,MapWritable> output,
Reporter reporter) throws IOException
{
// now we can report an exception opening the table
if (table == null)
throw new IOException("could not open mytable");
// ...
// commit the result
BatchUpdate update = new BatchUpdate();
// ...
table.commit(update);
}
}
</pre></blockquote>
<p>This assumes that you do this when setting up your job: JobConf conf = new JobConf(new HBaseConfiguration());
</p>
<p>Or maybe something like this:
<blockquote><pre>
JobConf conf = new JobConf(new Configuration());
conf.set("hbase.master", myMaster);
</pre></blockquote>
</p>
<h3>Sample MR Bulk Uploader
</h3>
<p>A students/classes example by Naama Kraus.
</p>
<p>Read the class comment below for specification of inputs, prerequisites, etc. In particular, note that the class comment says that this code is for hbase 0.1.x.
</p>
<blockquote><pre>package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
@ -106,7 +376,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
/*
* Sample uploader.
*
* This is EXAMPLE code. You will need to change it to work for your context.
@ -143,7 +413,7 @@ import org.apache.hadoop.util.ToolRunner;
* &lt;p>This code was written against hbase 0.1 branch.
*&#x2f;
public class SampleUploader extends MapReduceBase
implements Mapper<LongWritable, Text, Text, MapWritable>, Tool {
implements Mapper&lt;LongWritable, Text, Text, MapWritable>, Tool {
private static final String NAME = "SampleUploader";
private Configuration conf;
@ -160,7 +430,7 @@ implements Mapper<LongWritable, Text, Text, MapWritable>, Tool {
}
public void map(LongWritable k, Text v,
OutputCollector<Text, MapWritable> output, Reporter r)
OutputCollector&lt;Text, MapWritable> output, Reporter r)
throws IOException {
// Lines are space-delimited; first item is row, next the columnname and
// then the third the cell value.
@ -179,9 +449,8 @@ implements Mapper<LongWritable, Text, Text, MapWritable>, Tool {
public static class TableUploader
extends TableReduce&lt;Text, MapWritable> {
&#x40;Override
public void reduce(Text k, Iterator<MapWritable> v,
OutputCollector<Text, MapWritable> output, Reporter r)
public void reduce(Text k, Iterator&lt;MapWritable> v,
OutputCollector&lt;Text, MapWritable> output, Reporter r)
throws IOException {
while (v.hasNext()) {
r.setStatus("Reducer committing " + k);