HBASE-4316 book.xml - overhauled mapreduce examples
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1163781 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e6cc54fa4d
commit
1fd899d9d7
@ -81,38 +81,199 @@
|
|||||||
<para>See <link xlink:href="http://hbase.org/apidocs/org/apache/hadoop/hbase/mapreduce/package-summary.html#package_description">HBase and MapReduce</link> up in javadocs.
|
<para>See <link xlink:href="http://hbase.org/apidocs/org/apache/hadoop/hbase/mapreduce/package-summary.html#package_description">HBase and MapReduce</link> up in javadocs.
|
||||||
Start there. Below is some additional help.</para>
|
Start there. Below is some additional help.</para>
|
||||||
<section xml:id="splitter">
|
<section xml:id="splitter">
|
||||||
<title>The default HBase MapReduce Splitter</title>
|
<title>Map-Task Spitting</title>
|
||||||
<para>When <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html">TableInputFormat</link>,
|
<section xml:id="splitter.default">
|
||||||
is used to source an HBase table in a MapReduce job,
|
<title>The Default HBase MapReduce Splitter</title>
|
||||||
its splitter will make a map task for each region of the table.
|
<para>When <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html">TableInputFormat</link>
|
||||||
Thus, if there are 100 regions in the table, there will be
|
is used to source an HBase table in a MapReduce job,
|
||||||
100 map-tasks for the job - regardless of how many column families are selected in the Scan.</para>
|
its splitter will make a map task for each region of the table.
|
||||||
|
Thus, if there are 100 regions in the table, there will be
|
||||||
|
100 map-tasks for the job - regardless of how many column families are selected in the Scan.</para>
|
||||||
|
</section>
|
||||||
|
<section xml:id="splitter.custom">
|
||||||
|
<title>Custom Splitters</title>
|
||||||
|
<para>For those interested in implementing custom splitters, see the method <code>getSplits</code> in
|
||||||
|
<link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html">TableInputFormatBase</link>.
|
||||||
|
That is where the logic for map-task assignment resides.
|
||||||
|
</para>
|
||||||
|
</section>
|
||||||
</section>
|
</section>
|
||||||
<section xml:id="mapreduce.example">
|
<section xml:id="mapreduce.example">
|
||||||
<title>HBase Input MapReduce Example</title>
|
<title>HBase MapReduce Examples</title>
|
||||||
<para>To use HBase as a MapReduce source,
|
<section xml:id="mapreduce.example.read">
|
||||||
the job would be configured via <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html">TableMapReduceUtil</link> in the following manner...
|
<title>HBase MapReduce Read Example</title>
|
||||||
<programlisting>Job job = ...;
|
<para>The following is an example of using HBase as a MapReduce source in read-only manner. Specifically,
|
||||||
|
there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper. There job would be defined
|
||||||
|
as follows...
|
||||||
|
<programlisting>
|
||||||
|
Configuration config = HBaseConfiguration.create();
|
||||||
|
Job job = new Job(config, "ExampleRead");
|
||||||
|
job.setJarByClass(MyReadJob.class); // class that contains mapper
|
||||||
|
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
|
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
|
||||||
scan.setCacheBlocks(false);
|
scan.setCacheBlocks(false); // don't set to true for MR jobs
|
||||||
// Now set other scan attrs
|
// set other scan attrs
|
||||||
...
|
...
|
||||||
|
|
||||||
TableMapReduceUtil.initTableMapperJob(
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
tableName, // input HBase table name
|
tableName, // input HBase table name
|
||||||
scan, // Scan instance to control CF and attribute selection
|
scan, // Scan instance to control CF and attribute selection
|
||||||
MyMapper.class, // mapper
|
MyMapper.class, // mapper
|
||||||
Text.class, // reducer key
|
null, // mapper output key
|
||||||
LongWritable.class, // reducer value
|
null, // mapper output value
|
||||||
job // job instance
|
job);
|
||||||
);</programlisting>
|
job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
|
||||||
|
|
||||||
|
boolean b = job.waitForCompletion(true);
|
||||||
|
if ( b == false) {
|
||||||
|
throw new IOException("error with job!");
|
||||||
|
}
|
||||||
|
</programlisting>
|
||||||
...and the mapper instance would extend <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html">TableMapper</link>...
|
...and the mapper instance would extend <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html">TableMapper</link>...
|
||||||
<programlisting>
|
<programlisting>
|
||||||
public class MyMapper extends TableMapper<Text, LongWritable> {
|
public static class MyMapper extends TableMapper<Text, Text> {
|
||||||
|
|
||||||
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
|
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
|
||||||
// process data for the row from the Result instance.</programlisting>
|
// process data for the row from the Result instance.
|
||||||
</para>
|
}
|
||||||
|
}
|
||||||
|
</programlisting>
|
||||||
|
</para>
|
||||||
|
</section>
|
||||||
|
<section xml:id="mapreduce.example.readwrite">
|
||||||
|
<title>HBase MapReduce Read/Write Example</title>
|
||||||
|
<para>The following is an example of using HBase both as a source and as a sink with MapReduce.
|
||||||
|
This example will simply copy data from one table to another.
|
||||||
|
<programlisting>
|
||||||
|
Configuration config = HBaseConfiguration.create();
|
||||||
|
Job job = new Job(config,"ExampleReadWrite");
|
||||||
|
job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
|
||||||
|
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
|
||||||
|
scan.setCacheBlocks(false); // don't set to true for MR jobs
|
||||||
|
// set other scan attrs
|
||||||
|
|
||||||
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
|
sourceTable, // input table
|
||||||
|
scan, // Scan instance to control CF and attribute selection
|
||||||
|
MyMapper.class, // mapper class
|
||||||
|
null, // mapper output key
|
||||||
|
null, // mapper output value
|
||||||
|
job);
|
||||||
|
TableMapReduceUtil.initTableReducerJob(
|
||||||
|
targetTable, // output table
|
||||||
|
null, // reducer class
|
||||||
|
job);
|
||||||
|
job.setNumReduceTasks(0);
|
||||||
|
|
||||||
|
boolean b = job.waitForCompletion(true);
|
||||||
|
if ( b == false) {
|
||||||
|
throw new IOException("error with job!");
|
||||||
|
}
|
||||||
|
</programlisting>
|
||||||
|
An explanation is required of what <classname>TableMapReduceUtil</classname> is doing, especially with the reducer.
|
||||||
|
<link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.html">TableOutputFormat</link> is being used
|
||||||
|
as the outputFormat class, and several parameters are being set on the config (e.g., TableOutputFormat.OUTPUT_TABLE), as
|
||||||
|
well as setting the reducer output keys to <classname>ImmutableBytesWritable</classname> and <classname>Writable</classname>.
|
||||||
|
These could be set by the programmer on the job and conf, but <classname>TableMapReduceUtil</classname> tries to make things easier.
|
||||||
|
<para>The following is the example mapper, which will create a <classname>Put</classname> and matching the input <classname>Result</classname>
|
||||||
|
and emit it. Note: this is what the CopyTable utility does.
|
||||||
|
</para>
|
||||||
|
<programlisting>
|
||||||
|
public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
|
||||||
|
|
||||||
|
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
|
||||||
|
// this example is just copying the data from the source table...
|
||||||
|
context.write(row, resultToPut(row,value));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Put resultToPut(ImmutableBytesWritable key, Result result)
|
||||||
|
throws IOException {
|
||||||
|
Put put = new Put(key.get());
|
||||||
|
for (KeyValue kv : result.raw()) {
|
||||||
|
put.add(kv);
|
||||||
|
}
|
||||||
|
return put;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
</programlisting>
|
||||||
|
<para>There isn't actually a reducer step, so <classname>TableOutputFormat</classname> takes care of sending the <classname>Put</classname>
|
||||||
|
to the target table.
|
||||||
|
</para>
|
||||||
|
<para>This is just an example, developers could choose not to use <classname>TableOutputFormat</classname> and connect to the
|
||||||
|
target table themselves.
|
||||||
|
</para>
|
||||||
|
</para>
|
||||||
|
</section>
|
||||||
|
<section xml:id="mapreduce.example.summary">
|
||||||
|
<title>HBase MapReduce Summary Example</title>
|
||||||
|
<para>The following example uses HBase as a MapReduce source and sink with a summarization step. This example will
|
||||||
|
count the number of distinct instances of a value in a table and write those summarized counts in another table.
|
||||||
|
<programlisting>
|
||||||
|
Configuration config = HBaseConfiguration.create();
|
||||||
|
Job job = new Job(config,"ExampleSummary");
|
||||||
|
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
|
||||||
|
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
|
||||||
|
scan.setCacheBlocks(false); // don't set to true for MR jobs
|
||||||
|
// set other scan attrs
|
||||||
|
|
||||||
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
|
sourceTable, // input table
|
||||||
|
scan, // Scan instance to control CF and attribute selection
|
||||||
|
MyMapper.class, // mapper class
|
||||||
|
Text.class, // mapper output key
|
||||||
|
IntWritable.class, // mapper output value
|
||||||
|
job);
|
||||||
|
TableMapReduceUtil.initTableReducerJob(
|
||||||
|
targetTable, // output table
|
||||||
|
MyReducer.class, // reducer class
|
||||||
|
job);
|
||||||
|
job.setNumReduceTasks(1); // at least one, adjust as required
|
||||||
|
|
||||||
|
boolean b = job.waitForCompletion(true);
|
||||||
|
if ( b == false) {
|
||||||
|
throw new IOException("error with job!");
|
||||||
|
}
|
||||||
|
</programlisting>
|
||||||
|
In this example mapper a column with a String-value is chosen as the value to summarize upon.
|
||||||
|
This value is used as the key to emit from the mapper, and an <classname>IntWritable</classname> represents an instance counter.
|
||||||
|
<programlisting>
|
||||||
|
public static class MyMapper extends TableMapper<Text, IntWritable> {
|
||||||
|
|
||||||
|
private final IntWritable ONE = new IntWritable(1);
|
||||||
|
private Text text = new Text();
|
||||||
|
|
||||||
|
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
|
||||||
|
String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1")));
|
||||||
|
text.set(val); // we can only emit Writables...
|
||||||
|
|
||||||
|
context.write(text, ONE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
</programlisting>
|
||||||
|
In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a <classname>Put</classname>.
|
||||||
|
<programlisting>
|
||||||
|
public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
|
||||||
|
|
||||||
|
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
|
||||||
|
int i = 0;
|
||||||
|
for (IntWritable val : values) {
|
||||||
|
i += val.get();
|
||||||
|
}
|
||||||
|
Put put = new Put(Bytes.toBytes(key.toString()));
|
||||||
|
put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i));
|
||||||
|
|
||||||
|
context.write(null, put);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
</programlisting>
|
||||||
|
|
||||||
|
</para>
|
||||||
|
</section>
|
||||||
</section>
|
</section>
|
||||||
<section xml:id="mapreduce.htable.access">
|
<section xml:id="mapreduce.htable.access">
|
||||||
<title>Accessing Other HBase Tables in a MapReduce Job</title>
|
<title>Accessing Other HBase Tables in a MapReduce Job</title>
|
||||||
@ -123,10 +284,16 @@ public class MyMapper extends TableMapper<Text, LongWritable> {
|
|||||||
<programlisting>public class MyMapper extends TableMapper<Text, LongWritable> {
|
<programlisting>public class MyMapper extends TableMapper<Text, LongWritable> {
|
||||||
private HTable myOtherTable;
|
private HTable myOtherTable;
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setup(Context context) {
|
public void setup(Context context) {
|
||||||
myOtherTable = new HTable("myOtherTable");
|
myOtherTable = new HTable("myOtherTable");
|
||||||
}</programlisting>
|
}
|
||||||
|
|
||||||
|
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
|
||||||
|
// process Result...
|
||||||
|
// use 'myOtherTable' for lookups
|
||||||
|
}
|
||||||
|
|
||||||
|
</programlisting>
|
||||||
</para>
|
</para>
|
||||||
</section>
|
</section>
|
||||||
<section xml:id="mapreduce.specex">
|
<section xml:id="mapreduce.specex">
|
||||||
@ -381,7 +548,7 @@ admin.enableTable(table);
|
|||||||
</title>
|
</title>
|
||||||
<para>A secondary index could be created in an other table which is periodically updated via a MapReduce job. The job could be executed intra-day, but depending on
|
<para>A secondary index could be created in an other table which is periodically updated via a MapReduce job. The job could be executed intra-day, but depending on
|
||||||
load-strategy it could still potentially be out of sync with the main data table.</para>
|
load-strategy it could still potentially be out of sync with the main data table.</para>
|
||||||
<para>See <xref linkend="mapreduce"/> for more information.</para>
|
<para>See <xref linkend="mapreduce.example.readwrite"/> for more information.</para>
|
||||||
</section>
|
</section>
|
||||||
<section xml:id="secondary.indexes.dualwrite">
|
<section xml:id="secondary.indexes.dualwrite">
|
||||||
<title>
|
<title>
|
||||||
@ -396,7 +563,7 @@ admin.enableTable(table);
|
|||||||
</title>
|
</title>
|
||||||
<para>Where time-ranges are very wide (e.g., year-long report) and where the data is voluminous, summary tables are a common approach.
|
<para>Where time-ranges are very wide (e.g., year-long report) and where the data is voluminous, summary tables are a common approach.
|
||||||
These would be generated with MapReduce jobs into another table.</para>
|
These would be generated with MapReduce jobs into another table.</para>
|
||||||
<para>See <xref linkend="mapreduce"/> for more information.</para>
|
<para>See <xref linkend="mapreduce.example.summary"/> for more information.</para>
|
||||||
</section>
|
</section>
|
||||||
<section xml:id="secondary.indexes.coproc">
|
<section xml:id="secondary.indexes.coproc">
|
||||||
<title>
|
<title>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user