HBASE-991 Update the mapred package document examples so they work with TRUNK/0.19.0.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@718159 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
16973669dc
commit
f759be8c0d
|
@ -72,6 +72,8 @@ Release 0.19.0 - Unreleased
|
|||
HBASE-990 NoSuchElementException in flushSomeRegions
|
||||
HBASE-602 HBase Crash when network card has a IPv6 address
|
||||
HBASE-996 Migration script to up the versions in catalog tables
|
||||
HBASE-991 Update the mapred package document examples so they work with
|
||||
TRUNK/0.19.0.
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-901 Add a limit to key length, check key and value length on client side
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.Mapper;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/*
|
||||
* Sample uploader.
|
||||
*
|
||||
* This is EXAMPLE code. You will need to change it to work for your context.
|
||||
*
|
||||
* Uses TableReduce to put the data into hbase. Change the InputFormat to suit
|
||||
* your data. Use the map to massage the input so it fits hbase. Currently its
|
||||
* just a pass-through map. In the reduce, you need to output a row and a
|
||||
* map of columns to cells. Change map and reduce to suit your input.
|
||||
*
|
||||
* <p>The below is wired up to handle an input whose format is a text file
|
||||
* which has a line format as follow:
|
||||
* <pre>
|
||||
* row columnname columndata
|
||||
* </pre>
|
||||
*
|
||||
* <p>The table and columnfamily we're to insert into must preexist.
|
||||
*
|
||||
* <p>Do the following to start the MR job:
|
||||
* <pre>
|
||||
* ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
|
||||
* </pre>
|
||||
*
|
||||
* <p>This code was written against hbase 0.1 branch.
|
||||
*/
|
||||
public class SampleUploader extends MapReduceBase
|
||||
implements Mapper<LongWritable, Text, ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>>,
|
||||
Tool {
|
||||
private static final String NAME = "SampleUploader";
|
||||
private Configuration conf;
|
||||
|
||||
public JobConf createSubmittableJob(String[] args)
|
||||
throws IOException {
|
||||
JobConf c = new JobConf(getConf(), SampleUploader.class);
|
||||
c.setJobName(NAME);
|
||||
FileInputFormat.setInputPaths(c, new Path(args[0]));
|
||||
c.setMapperClass(this.getClass());
|
||||
c.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||
c.setMapOutputValueClass(HbaseMapWritable.class);
|
||||
c.setReducerClass(TableUploader.class);
|
||||
TableMapReduceUtil.initTableReduceJob(args[1], TableUploader.class, c);
|
||||
return c;
|
||||
}
|
||||
|
||||
public void map(LongWritable k, Text v,
|
||||
OutputCollector<ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>> output,
|
||||
Reporter r)
|
||||
throws IOException {
|
||||
// Lines are space-delimited; first item is row, next the columnname and
|
||||
// then the third the cell value.
|
||||
String tmp = v.toString();
|
||||
if (tmp.length() == 0) {
|
||||
return;
|
||||
}
|
||||
String [] splits = v.toString().split(" ");
|
||||
HbaseMapWritable<byte [], byte []> mw =
|
||||
new HbaseMapWritable<byte [], byte []>();
|
||||
mw.put(Bytes.toBytes(splits[1]), Bytes.toBytes(splits[2]));
|
||||
byte [] row = Bytes.toBytes(splits[0]);
|
||||
r.setStatus("Map emitting " + splits[0] + " for record " + k.toString());
|
||||
output.collect(new ImmutableBytesWritable(row), mw);
|
||||
}
|
||||
|
||||
public static class TableUploader extends MapReduceBase
|
||||
implements TableReduce<ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>> {
|
||||
public void reduce(ImmutableBytesWritable k, Iterator<HbaseMapWritable<byte [], byte []>> v,
|
||||
OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
|
||||
Reporter r)
|
||||
throws IOException {
|
||||
while (v.hasNext()) {
|
||||
r.setStatus("Reducer committing " + k);
|
||||
BatchUpdate bu = new BatchUpdate(k.get());
|
||||
while (v.hasNext()) {
|
||||
HbaseMapWritable<byte [], byte []> hmw = v.next();
|
||||
for (Entry<byte [], byte []> e: hmw.entrySet()) {
|
||||
bu.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
output.collect(k, bu);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int printUsage() {
|
||||
System.out.println(NAME + " <input> <table_name>");
|
||||
return -1;
|
||||
}
|
||||
|
||||
public int run(@SuppressWarnings("unused") String[] args) throws Exception {
|
||||
// Make sure there are exactly 2 parameters left.
|
||||
if (args.length != 2) {
|
||||
System.out.println("ERROR: Wrong number of parameters: " +
|
||||
args.length + " instead of 2.");
|
||||
return printUsage();
|
||||
}
|
||||
JobClient.runJob(createSubmittableJob(args));
|
||||
return 0;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
public void setConf(final Configuration c) {
|
||||
this.conf = c;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
|
||||
args);
|
||||
System.exit(errCode);
|
||||
}
|
||||
}
|
|
@ -42,12 +42,11 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||
/**
|
||||
* A Writable Map.
|
||||
* Like {@link org.apache.hadoop.io.MapWritable} but dumb. It will fail
|
||||
* if passed a Writable it has not already been told about. Its also been
|
||||
* primed with hbase Writables. Keys are always byte arrays. Thats other
|
||||
* difference from MapWritable.
|
||||
* if passed a value type that it has not already been told about. Its been
|
||||
* primed with hbase Writables and byte []. Keys are always byte arrays.
|
||||
*
|
||||
* @param <K> key
|
||||
* @param <V> value
|
||||
* @param <byte []> key
|
||||
* @param <V> value Expects a Writable or byte [].
|
||||
*/
|
||||
public class HbaseMapWritable <K, V>
|
||||
implements SortedMap<byte [], V>, Writable, Configurable {
|
||||
|
@ -192,8 +191,14 @@ implements SortedMap<byte [], V>, Writable, Configurable {
|
|||
// Then write out each key/value pair
|
||||
for (Map.Entry<byte [], V> e: instance.entrySet()) {
|
||||
Bytes.writeByteArray(out, e.getKey());
|
||||
out.writeByte(getId(e.getValue().getClass()));
|
||||
((Writable)e.getValue()).write(out);
|
||||
Byte id =getId(e.getValue().getClass());
|
||||
out.writeByte(id);
|
||||
Object value = e.getValue();
|
||||
if (value instanceof byte []) {
|
||||
Bytes.writeByteArray(out, (byte [])value);
|
||||
} else {
|
||||
((Writable)value).write(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -209,11 +214,18 @@ implements SortedMap<byte [], V>, Writable, Configurable {
|
|||
// Then read each key/value pair
|
||||
for (int i = 0; i < entries; i++) {
|
||||
byte [] key = Bytes.readByteArray(in);
|
||||
Writable value = (Writable)ReflectionUtils.
|
||||
newInstance(getClass(in.readByte()), getConf());
|
||||
value.readFields(in);
|
||||
V v = (V)value;
|
||||
this.instance.put(key, v);
|
||||
Class clazz = getClass(in.readByte());
|
||||
V value = null;
|
||||
if (clazz.equals(byte [].class)) {
|
||||
byte [] bytes = Bytes.readByteArray(in);
|
||||
value = (V)bytes;
|
||||
} else {
|
||||
Writable w = (Writable)ReflectionUtils.
|
||||
newInstance(clazz, getConf());
|
||||
w.readFields(in);
|
||||
value = (V)w;
|
||||
}
|
||||
this.instance.put(key, value);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,14 @@
|
|||
Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
|
||||
Input/OutputFormats, a table indexing MapReduce job, and utility
|
||||
|
||||
<h2> HBase, MapReduce and the CLASSPATH </h2>
|
||||
<h2>Table of Contents</h2>
|
||||
<ul>
|
||||
<li><a href="#classpath">HBase, MapReduce and the CLASSPATH</a></li>
|
||||
<li><a href="#sink">HBase as MapReduce job data source and sink</a></li>
|
||||
<li><a href="#examples">Example Code</a></li>
|
||||
</ul>
|
||||
|
||||
<h2><a name="classpath">HBase, MapReduce and the CLASSPATH</a></h2>
|
||||
|
||||
<p>MapReduce jobs deployed to a MapReduce cluster do not by default have access
|
||||
to the HBase configuration under <code>$HBASE_CONF_DIR</code> nor to HBase classes.
|
||||
|
@ -33,29 +40,35 @@ and classes to the cluster <code>CLASSPATH</code> is by uncommenting
|
|||
and adding the path to the hbase jar and <code>$HBASE_CONF_DIR</code> directory.
|
||||
Then copy the amended configuration around the cluster.
|
||||
You'll probably need to restart the MapReduce cluster if you want it to notice
|
||||
the new configuration (You may not have to).
|
||||
the new configuration.
|
||||
</p>
|
||||
|
||||
<p>For example, here is how you would amend <code>hadoop-env.sh</code> adding
|
||||
the hbase jar, conf, and the <code>PerformanceEvaluation</code> class from hbase test
|
||||
classes to the hadoop <code>CLASSPATH<code>
|
||||
<p>For example, here is how you would amend <code>hadoop-env.sh</code> adding the
|
||||
built hbase jar, hbase conf, and the <code>PerformanceEvaluation</code> class from
|
||||
the built hbase test jar to the hadoop <code>CLASSPATH<code>:
|
||||
|
||||
<blockquote><pre># Extra Java CLASSPATH elements. Optional.
|
||||
<blockquote><pre># Extra Java CLASSPATH elements. Optional.
|
||||
# export HADOOP_CLASSPATH=
|
||||
export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf</pre></blockquote>
|
||||
|
||||
<p>Expand $HBASE_HOME in the above appropriately to suit your local environment.</p>
|
||||
<p>Expand <code>$HBASE_HOME</code> in the above appropriately to suit your
|
||||
local environment.</p>
|
||||
|
||||
<p>This is how you would run the PerformanceEvaluation MR job to put up 4 clients:
|
||||
<p>After copying the above change around your cluster, this is how you would run
|
||||
the PerformanceEvaluation MR job to put up 4 clients (Presumes a ready mapreduce
|
||||
cluster):
|
||||
|
||||
<blockquote><pre>$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4</pre></blockquote>
|
||||
|
||||
The PerformanceEvaluation class wil be found on the CLASSPATH because you added $HBASE_HOME/build/test to HADOOP_CLASSPATH
|
||||
The PerformanceEvaluation class wil be found on the CLASSPATH because you
|
||||
added <code>$HBASE_HOME/build/test</code> to HADOOP_CLASSPATH
|
||||
</p>
|
||||
<p>NOTE: While previous it used to be possible to bundle the hbase.jar up inside the job jar you submit to hadoop, as of
|
||||
0.2.0RC2, this is no longer so. See <a href="https://issues.apache.org/jira/browse/HBASE-797">HBASE-797</a>.
|
||||
<p>NOTE: While previous it used to be possible to bundle the hbase.jar up inside the
|
||||
job jar you submit to hadoop, as of hbase 0.2.0RC2, this is no longer so.
|
||||
See <a href="https://issues.apache.org/jira/browse/HBASE-797">HBASE-797</a>
|
||||
for details.
|
||||
|
||||
<h2>HBase as MapReduce job data source and sink</h2>
|
||||
<h2><a name="sink">HBase as MapReduce job data source and sink</a></h2>
|
||||
|
||||
<p>HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapred.TableInputFormat TableInputFormat},
|
||||
and data sink, {@link org.apache.hadoop.hbase.mapred.TableOutputFormat TableOutputFormat}, for MapReduce jobs.
|
||||
|
@ -73,58 +86,83 @@ specify source/sink table and column names in your configuration.</p>
|
|||
|
||||
<p>Reading from hbase, the TableInputFormat asks hbase for the list of
|
||||
regions and makes a map-per-region.
|
||||
Writing, it may make sense to avoid the reduce step and write back into hbase from inside your map. You'd do this when your job does not need the sort and collation that MR does inside in its reduce; on insert,
|
||||
hbase sorts so no point double-sorting (and shuffling data around your MR cluster) unless you need to. If you do not need the reduce, you might just have your map emit counts of records processed just so the
|
||||
framework can emit that nice report of records processed when the job is done. See example code below. If running the reduce step makes sense in your case, its better to have lots of reducers so load is spread
|
||||
across the hbase cluster.</p>
|
||||
Writing, it may make sense to avoid the reduce step and write yourself back into
|
||||
hbase from inside your map. You'd do this when your job does not need the sort
|
||||
and collation that mapreduce does on the map emitted data; on insert,
|
||||
hbase 'sorts' so there is no point double-sorting (and shuffling data around
|
||||
your mapreduce cluster) unless you need to. If you do not need the reduce,
|
||||
you might just have your map emit counts of records processed just so the
|
||||
framework's report at the end of your job has meaning. See example code
|
||||
below. If running the reduce step makes sense in your case, its usually better
|
||||
to have lots of reducers so load is spread across the hbase cluster.</p>
|
||||
|
||||
<p>There is also a partitioner
|
||||
that will run as many reducers as currently existing regions. The
|
||||
partitioner HRegionPartitioner is suitable when your table is large
|
||||
and your upload is not such that it will greatly alter the number of existing
|
||||
regions after its done.
|
||||
<p>There is also a new hbase partitioner that will run as many reducers as
|
||||
currently existing regions. The
|
||||
{@link org.apache.hadoop.hbase.mapred.HRegionPartitioner} is suitable
|
||||
when your table is large and your upload is not such that it will greatly
|
||||
alter the number of existing regions when done; other use the default
|
||||
partitioner.
|
||||
</p>
|
||||
|
||||
<h2>Example Code</h2>
|
||||
<h2><a name="examples">Example Code</a></h2>
|
||||
<h3>Sample Row Counter</h3>
|
||||
<p>See {@link org.apache.hadoop.hbase.mapred.RowCounter}. You should be able to run
|
||||
it by doing: <code>% ./bin/hadoop jar hbase-X.X.X.jar</code>. This will invoke
|
||||
the hbase MapReduce Driver class. Select 'rowcounter' from the choice of jobs
|
||||
offered.
|
||||
</p>
|
||||
<h3>PerformanceEvaluation</h3>
|
||||
<p>See org.apache.hadoop.hbase.PerformanceEvaluation from hbase src/test. It runs
|
||||
a mapreduce job to run concurrent clients reading and writing hbase.
|
||||
</p>
|
||||
|
||||
<h3>Sample MR Bulk Uploader</h3>
|
||||
<p>A students/classes example based on a contribution by Naama Kraus with logs of
|
||||
documentation can be found over in src/examples/mapred.
|
||||
Its the <code>org.apache.hadoop.hbase.mapred.SampleUploader</code> class.
|
||||
Just copy it under src/java/org/apache/hadoop/hbase/mapred to compile and try it
|
||||
(until we start generating an hbase examples jar). The class reads a data file
|
||||
from HDFS and per line, does an upload to HBase using TableReduce.
|
||||
Read the class comment for specification of inputs, prerequisites, etc.
|
||||
</p>
|
||||
|
||||
<h3>Example to bulk import/load a text file into an HTable
|
||||
</h3>
|
||||
|
||||
<p>Here's a sample program from [WWW] Allen Day that takes an HDFS text file path and an HBase table name as inputs, and loads the contents of the text file to the table.
|
||||
<p>Here's a sample program from
|
||||
<a href="http://www.spicylogic.com/allenday/blog/category/computing/distributed-systems/hadoop/hbase/">Allen Day</a>
|
||||
that takes an HDFS text file path and an HBase table name as inputs, and loads the contents of the text file to the table
|
||||
all up in the map phase.
|
||||
</p>
|
||||
|
||||
<blockquote><pre>package com.spicylogic.hbase;
|
||||
<blockquote><pre>
|
||||
package com.spicylogic.hbase;
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
|
||||
import org.apache.hadoop.hbase.mapred.TableReduce;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.MapWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.Mapper;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reducer;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.mapred.lib.NullOutputFormat;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* Class that adds the parsed line from the input to hbase
|
||||
* in the map function. Map has no emissions and job
|
||||
* has no reduce.
|
||||
*/
|
||||
public class BulkImport implements Tool {
|
||||
private static final String NAME = "BulkImport";
|
||||
private Configuration conf;
|
||||
|
@ -133,20 +171,24 @@ public class BulkImport implements Tool {
|
|||
private HTable table;
|
||||
private HBaseConfiguration HBconf;
|
||||
|
||||
public void map(LongWritable key, Text value, OutputCollector<<<<<<<<Text, Text> output, Reporter reporter) throws IOException {
|
||||
public void map(LongWritable key, Text value,
|
||||
OutputCollector<Text, Text> output, Reporter reporter)
|
||||
throws IOException {
|
||||
if ( table == null )
|
||||
throw new IOException("table is null");
|
||||
|
||||
// Split input line on tab character
|
||||
String [] splits = value.toString().split("\t");
|
||||
if ( splits.length != 4 )
|
||||
return;
|
||||
|
||||
String rowID = splits[0];
|
||||
int timestamp = Integer.parseInt( splits[1] );
|
||||
String colID = splits[2];
|
||||
|
||||
String rowID = splits[0];
|
||||
int timestamp = Integer.parseInt( splits[1] );
|
||||
String colID = splits[2];
|
||||
String cellValue = splits[3];
|
||||
|
||||
reporter.setStatus("Map emitting cell for row='" + rowID + "', column='" + colID + "', time='" + timestamp + "'");
|
||||
reporter.setStatus("Map emitting cell for row='" + rowID +
|
||||
"', column='" + colID + "', time='" + timestamp + "'");
|
||||
|
||||
BatchUpdate bu = new BatchUpdate( rowID );
|
||||
if ( timestamp > 0 )
|
||||
|
@ -155,8 +197,9 @@ public class BulkImport implements Tool {
|
|||
bu.put(colID, cellValue.getBytes());
|
||||
table.commit( bu );
|
||||
}
|
||||
|
||||
public void configure(JobConf job) {
|
||||
HBconf = new HBaseConfiguration();
|
||||
HBconf = new HBaseConfiguration(job);
|
||||
try {
|
||||
table = new HTable( HBconf, job.get("input.table") );
|
||||
} catch (IOException e) {
|
||||
|
@ -166,11 +209,10 @@ public class BulkImport implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public JobConf createSubmittableJob(String[] args) {
|
||||
JobConf c = new JobConf(getConf(), BulkImport.class);
|
||||
c.setJobName(NAME);
|
||||
c.setInputPath(new Path(args[0]));
|
||||
FileInputFormat.setInputPaths(c, new Path(args[0]));
|
||||
|
||||
c.set("input.table", args[1]);
|
||||
c.setMapperClass(InnerMap.class);
|
||||
|
@ -213,282 +255,5 @@ public class BulkImport implements Tool {
|
|||
}
|
||||
</pre></blockquote>
|
||||
|
||||
<h3>Example to map rows/column families between two HTables
|
||||
</h3>
|
||||
|
||||
<p>Here another sample program from [WWW] Allen Day that will iterate over all rows in one table for specified column families and insert those rows/columns to a second table.
|
||||
</p>
|
||||
|
||||
<blockquote><pre>package com.spicylogic.hbase;
|
||||
import java.io.IOException;
|
||||
|
||||
public class BulkCopy extends TableMap<Text, Text> implements Tool {
|
||||
static final String NAME = "bulkcopy";
|
||||
private Configuration conf;
|
||||
|
||||
public void map(ImmutableBytesWritable row, RowResult value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
|
||||
HTable table = new HTable(new HBaseConfiguration(), conf.get("output.table"));
|
||||
if ( table == null ) {
|
||||
throw new IOException("output table is null");
|
||||
}
|
||||
|
||||
BatchUpdate bu = new BatchUpdate( row.get() );
|
||||
|
||||
boolean content = false;
|
||||
for (Map.Entry<byte [], Cell> e: value.entrySet()) {
|
||||
Cell cell = e.getValue();
|
||||
if (cell != null && cell.getValue().length > 0) {
|
||||
bu.put(e.getKey(), cell.getValue());
|
||||
}
|
||||
}
|
||||
table.commit( bu );
|
||||
}
|
||||
|
||||
public JobConf createSubmittableJob(String[] args) throws IOException {
|
||||
JobConf c = new JobConf(getConf(), BulkExport.class);
|
||||
//table = new HTable(new HBaseConfiguration(), args[2]);
|
||||
c.set("output.table", args[2]);
|
||||
c.setJobName(NAME);
|
||||
// Columns are space delimited
|
||||
StringBuilder sb = new StringBuilder();
|
||||
final int columnoffset = 3;
|
||||
for (int i = columnoffset; i < args.length; i++) {
|
||||
if (i > columnoffset) {
|
||||
sb.append(" ");
|
||||
}
|
||||
sb.append(args[i]);
|
||||
}
|
||||
// Second argument is the table name.
|
||||
TableMap.initJob(args[1], sb.toString(), this.getClass(),
|
||||
Text.class, Text.class, c);
|
||||
c.setReducerClass(IdentityReducer.class);
|
||||
// First arg is the output directory.
|
||||
c.setOutputPath(new Path(args[0]));
|
||||
return c;
|
||||
}
|
||||
|
||||
static int printUsage() {
|
||||
System.out.println(NAME +" <outputdir> <input tablename> <output tablename> <column1> [<column2>...]");
|
||||
return -1;
|
||||
}
|
||||
|
||||
public int run(final String[] args) throws Exception {
|
||||
// Make sure there are at least 3 parameters
|
||||
if (args.length < 3) {
|
||||
System.err.println("ERROR: Wrong number of parameters: " + args.length);
|
||||
return printUsage();
|
||||
}
|
||||
JobClient.runJob(createSubmittableJob(args));
|
||||
return 0;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
public void setConf(final Configuration c) {
|
||||
this.conf = c;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
//String[] aa = {"/tmp/foobar", "M2", "M3", "R:"};
|
||||
int errCode = ToolRunner.run(new HBaseConfiguration(), new BulkCopy(), args);
|
||||
System.exit(errCode);
|
||||
}
|
||||
}
|
||||
</pre></blockquote>
|
||||
|
||||
<h3>Sample running HBase inserts out of Map Task
|
||||
</h3>
|
||||
|
||||
<p>Here's sample code from Andrew Purtell that does HBase insert inside in the mapper rather than via TableReduce.
|
||||
</p>
|
||||
|
||||
<blockquote><pre>
|
||||
public class MyMap
|
||||
extends TableMap<ImmutableBytesWritable,MapWritable> // or whatever
|
||||
{
|
||||
private HTable table;
|
||||
|
||||
public void configure(JobConf job) {
|
||||
super.configure(job);
|
||||
try {
|
||||
HBaseConfiguration conf = new HBaseConfiguration(job);
|
||||
table = new HTable(conf, "mytable");
|
||||
} catch (Exception) {
|
||||
// can't do anything about this now
|
||||
}
|
||||
}
|
||||
|
||||
public void map(ImmutableBytesWritable key, RowResult value,
|
||||
OutputCollector<ImmutableBytesWritable,MapWritable> output,
|
||||
Reporter reporter) throws IOException
|
||||
{
|
||||
// now we can report an exception opening the table
|
||||
if (table == null)
|
||||
throw new IOException("could not open mytable");
|
||||
|
||||
// ...
|
||||
|
||||
// commit the result
|
||||
BatchUpdate update = new BatchUpdate();
|
||||
// ...
|
||||
table.commit(update);
|
||||
}
|
||||
}
|
||||
</pre></blockquote>
|
||||
|
||||
<p>This assumes that you do this when setting up your job: JobConf conf = new JobConf(new HBaseConfiguration());
|
||||
</p>
|
||||
|
||||
<p>Or maybe something like this:
|
||||
<blockquote><pre>
|
||||
JobConf conf = new JobConf(new Configuration());
|
||||
conf.set("hbase.master", myMaster);
|
||||
</pre></blockquote>
|
||||
</p>
|
||||
|
||||
<h3>Sample MR Bulk Uploader
|
||||
</h3>
|
||||
<p>A students/classes example by Naama Kraus.
|
||||
</p>
|
||||
|
||||
<p>Read the class comment below for specification of inputs, prerequisites, etc. In particular, note that the class comment says that this code is for hbase 0.1.x.
|
||||
</p>
|
||||
|
||||
<blockquote><pre>package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.MapWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.Mapper;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/*
|
||||
* Sample uploader.
|
||||
*
|
||||
* This is EXAMPLE code. You will need to change it to work for your context.
|
||||
*
|
||||
* Uses TableReduce to put the data into hbase. Change the InputFormat to suit
|
||||
* your data. Use the map to massage the input so it fits hbase. Currently its
|
||||
* just a pass-through map. In the reduce, you need to output a row and a
|
||||
* map of columns to cells. Change map and reduce to suit your input.
|
||||
*
|
||||
* <p>The below is wired up to handle an input whose format is a text file
|
||||
* which has a line format as follow:
|
||||
* <pre>
|
||||
* row columnname columndata
|
||||
* </pre>
|
||||
*
|
||||
* <p>The table and columnfamily we're to insert into must preexist.
|
||||
*
|
||||
* <p> To run, edit your hadoop-env.sh and add hbase classes and conf to your
|
||||
* HADOOP_CLASSPATH. For example:
|
||||
* <pre>
|
||||
* export HADOOP_CLASSPATH=/Users/stack/Documents/checkouts/hbase/branches/0.1/build/classes:/Users/stack/Documents/checkouts/hbase/branches/0.1/conf
|
||||
* </pre>
|
||||
* <p>Restart your MR cluster after making the following change (You need to
|
||||
* be running in pseudo-distributed mode at a minimum for the hadoop to see
|
||||
* the above additions to your CLASSPATH).
|
||||
*
|
||||
* <p>Start up your hbase cluster.
|
||||
*
|
||||
* <p>Next do the following to start the MR job:
|
||||
* <pre>
|
||||
* ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
|
||||
* </pre>
|
||||
*
|
||||
* <p>This code was written against hbase 0.1 branch.
|
||||
*/
|
||||
public class SampleUploader extends MapReduceBase
|
||||
implements Mapper<LongWritable, Text, Text, MapWritable>, Tool {
|
||||
private static final String NAME = "SampleUploader";
|
||||
private Configuration conf;
|
||||
|
||||
public JobConf createSubmittableJob(String[] args) {
|
||||
JobConf c = new JobConf(getConf(), SampleUploader.class);
|
||||
c.setJobName(NAME);
|
||||
c.setInputPath(new Path(args[0]));
|
||||
c.setMapperClass(this.getClass());
|
||||
c.setMapOutputKeyClass(Text.class);
|
||||
c.setMapOutputValueClass(MapWritable.class);
|
||||
c.setReducerClass(TableUploader.class);
|
||||
TableReduce.initJob(args[1], TableUploader.class, c);
|
||||
return c;
|
||||
}
|
||||
|
||||
public void map(LongWritable k, Text v,
|
||||
OutputCollector<Text, MapWritable> output, Reporter r)
|
||||
throws IOException {
|
||||
// Lines are space-delimited; first item is row, next the columnname and
|
||||
// then the third the cell value.
|
||||
String tmp = v.toString();
|
||||
if (tmp.length() == 0) {
|
||||
return;
|
||||
}
|
||||
String [] splits = v.toString().split(" ");
|
||||
MapWritable mw = new MapWritable();
|
||||
mw.put(new Text(splits[1]),
|
||||
new ImmutableBytesWritable(splits[2].getBytes()));
|
||||
String row = splits[0];
|
||||
r.setStatus("Map emitting " + row + " for record " + k.toString());
|
||||
output.collect(new Text(row), mw);
|
||||
}
|
||||
|
||||
public static class TableUploader
|
||||
extends TableReduce<Text, MapWritable> {
|
||||
public void reduce(Text k, Iterator<MapWritable> v,
|
||||
OutputCollector<Text, MapWritable> output, Reporter r)
|
||||
throws IOException {
|
||||
while (v.hasNext()) {
|
||||
r.setStatus("Reducer committing " + k);
|
||||
output.collect(k, v.next());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int printUsage() {
|
||||
System.out.println(NAME + " <input> <table_name>");
|
||||
return -1;
|
||||
}
|
||||
|
||||
public int run(@SuppressWarnings("unused") String[] args) throws Exception {
|
||||
// Make sure there are exactly 2 parameters left.
|
||||
if (args.length != 2) {
|
||||
System.out.println("ERROR: Wrong number of parameters: " +
|
||||
args.length + " instead of 2.");
|
||||
return printUsage();
|
||||
}
|
||||
JobClient.runJob(createSubmittableJob(args));
|
||||
return 0;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
public void setConf(final Configuration c) {
|
||||
this.conf = c;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
|
||||
args);
|
||||
System.exit(errCode);
|
||||
}
|
||||
}
|
||||
</pre></blockquote>
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
|
|
@ -41,6 +41,17 @@ public class TestSerialization extends HBaseTestCase {
|
|||
super.tearDown();
|
||||
}
|
||||
|
||||
public void testHbaseMapWritable() throws Exception {
|
||||
HbaseMapWritable<byte [], byte []> hmw =
|
||||
new HbaseMapWritable<byte[], byte[]>();
|
||||
hmw.put("key".getBytes(), "value".getBytes());
|
||||
byte [] bytes = Writables.getBytes(hmw);
|
||||
hmw = (HbaseMapWritable<byte[], byte[]>)
|
||||
Writables.getWritable(bytes, new HbaseMapWritable<byte [], byte []>());
|
||||
assertTrue(hmw.size() == 1);
|
||||
assertTrue(Bytes.equals("value".getBytes(), hmw.get("key".getBytes())));
|
||||
}
|
||||
|
||||
public void testHMsg() throws Exception {
|
||||
HMsg m = HMsg.REGIONSERVER_QUIESCE;
|
||||
byte [] mb = Writables.getBytes(m);
|
||||
|
|
Loading…
Reference in New Issue