From 1fd899d9d7b7e9ab042fac5201e8c11b83f8b524 Mon Sep 17 00:00:00 2001 From: Doug Meil Date: Wed, 31 Aug 2011 20:02:06 +0000 Subject: [PATCH] HBASE-4316 book.xml - overhauled mapreduce examples git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1163781 13f79535-47bb-0310-9956-ffa450edef68 --- src/docbkx/book.xml | 221 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 194 insertions(+), 27 deletions(-) diff --git a/src/docbkx/book.xml b/src/docbkx/book.xml index a4a4bf70c5c..86ffcff35e4 100644 --- a/src/docbkx/book.xml +++ b/src/docbkx/book.xml @@ -81,38 +81,199 @@ See HBase and MapReduce up in javadocs. Start there. Below is some additional help.
- The default HBase MapReduce Splitter - When TableInputFormat, - is used to source an HBase table in a MapReduce job, - its splitter will make a map task for each region of the table. - Thus, if there are 100 regions in the table, there will be - 100 map-tasks for the job - regardless of how many column families are selected in the Scan. + Map-Task Spitting +
+ The Default HBase MapReduce Splitter + When TableInputFormat + is used to source an HBase table in a MapReduce job, + its splitter will make a map task for each region of the table. + Thus, if there are 100 regions in the table, there will be + 100 map-tasks for the job - regardless of how many column families are selected in the Scan. +
+
+ Custom Splitters + For those interested in implementing custom splitters, see the method getSplits in + TableInputFormatBase. + That is where the logic for map-task assignment resides. + +
- HBase Input MapReduce Example - To use HBase as a MapReduce source, - the job would be configured via TableMapReduceUtil in the following manner... - Job job = ...; + HBase MapReduce Examples +
+ HBase MapReduce Read Example + The following is an example of using HBase as a MapReduce source in read-only manner. Specifically, + there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper. There job would be defined + as follows... + +Configuration config = HBaseConfiguration.create(); +Job job = new Job(config, "ExampleRead"); +job.setJarByClass(MyReadJob.class); // class that contains mapper + Scan scan = new Scan(); -scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs -scan.setCacheBlocks(false); -// Now set other scan attrs +scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs +scan.setCacheBlocks(false); // don't set to true for MR jobs +// set other scan attrs ... TableMapReduceUtil.initTableMapperJob( - tableName, // input HBase table name - scan, // Scan instance to control CF and attribute selection - MyMapper.class, // mapper - Text.class, // reducer key - LongWritable.class, // reducer value - job // job instance - ); + tableName, // input HBase table name + scan, // Scan instance to control CF and attribute selection + MyMapper.class, // mapper + null, // mapper output key + null, // mapper output value + job); +job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper + +boolean b = job.waitForCompletion(true); +if ( b == false) { + throw new IOException("error with job!"); +} + ...and the mapper instance would extend TableMapper... -public class MyMapper extends TableMapper<Text, LongWritable> { +public static class MyMapper extends TableMapper<Text, Text> { + public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException { - // process data for the row from the Result instance. - + // process data for the row from the Result instance. + } +} + + +
+
+ HBase MapReduce Read/Write Example + The following is an example of using HBase both as a source and as a sink with MapReduce. + This example will simply copy data from one table to another. + +Configuration config = HBaseConfiguration.create(); +Job job = new Job(config,"ExampleReadWrite"); +job.setJarByClass(MyReadWriteJob.class); // class that contains mapper + +Scan scan = new Scan(); +scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs +scan.setCacheBlocks(false); // don't set to true for MR jobs +// set other scan attrs + +TableMapReduceUtil.initTableMapperJob( + sourceTable, // input table + scan, // Scan instance to control CF and attribute selection + MyMapper.class, // mapper class + null, // mapper output key + null, // mapper output value + job); +TableMapReduceUtil.initTableReducerJob( + targetTable, // output table + null, // reducer class + job); +job.setNumReduceTasks(0); + +boolean b = job.waitForCompletion(true); +if ( b == false) { + throw new IOException("error with job!"); +} + + An explanation is required of what TableMapReduceUtil is doing, especially with the reducer. + TableOutputFormat is being used + as the outputFormat class, and several parameters are being set on the config (e.g., TableOutputFormat.OUTPUT_TABLE), as + well as setting the reducer output keys to ImmutableBytesWritable and Writable. + These could be set by the programmer on the job and conf, but TableMapReduceUtil tries to make things easier. + The following is the example mapper, which will create a Put and matching the input Result + and emit it. Note: this is what the CopyTable utility does. + + +public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> { + + public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { + // this example is just copying the data from the source table... + context.write(row, resultToPut(row,value)); + } + + private static Put resultToPut(ImmutableBytesWritable key, Result result) + throws IOException { + Put put = new Put(key.get()); + for (KeyValue kv : result.raw()) { + put.add(kv); + } + return put; + } +} + + There isn't actually a reducer step, so TableOutputFormat takes care of sending the Put + to the target table. + + This is just an example, developers could choose not to use TableOutputFormat and connect to the + target table themselves. + + +
+
+ HBase MapReduce Summary Example + The following example uses HBase as a MapReduce source and sink with a summarization step. This example will + count the number of distinct instances of a value in a table and write those summarized counts in another table. + +Configuration config = HBaseConfiguration.create(); +Job job = new Job(config,"ExampleSummary"); +job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer + +Scan scan = new Scan(); +scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs +scan.setCacheBlocks(false); // don't set to true for MR jobs +// set other scan attrs + +TableMapReduceUtil.initTableMapperJob( + sourceTable, // input table + scan, // Scan instance to control CF and attribute selection + MyMapper.class, // mapper class + Text.class, // mapper output key + IntWritable.class, // mapper output value + job); +TableMapReduceUtil.initTableReducerJob( + targetTable, // output table + MyReducer.class, // reducer class + job); +job.setNumReduceTasks(1); // at least one, adjust as required + +boolean b = job.waitForCompletion(true); +if ( b == false) { + throw new IOException("error with job!"); +} + + In this example mapper a column with a String-value is chosen as the value to summarize upon. + This value is used as the key to emit from the mapper, and an IntWritable represents an instance counter. + +public static class MyMapper extends TableMapper<Text, IntWritable> { + + private final IntWritable ONE = new IntWritable(1); + private Text text = new Text(); + + public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { + String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1"))); + text.set(val); // we can only emit Writables... + + context.write(text, ONE); + } +} + + In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a Put. + +public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { + + public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { + int i = 0; + for (IntWritable val : values) { + i += val.get(); + } + Put put = new Put(Bytes.toBytes(key.toString())); + put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i)); + + context.write(null, put); + } +} + + + +
Accessing Other HBase Tables in a MapReduce Job @@ -123,10 +284,16 @@ public class MyMapper extends TableMapper<Text, LongWritable> { public class MyMapper extends TableMapper<Text, LongWritable> { private HTable myOtherTable; - @Override public void setup(Context context) { myOtherTable = new HTable("myOtherTable"); - } + } + + public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { + // process Result... + // use 'myOtherTable' for lookups + } + +
@@ -381,7 +548,7 @@ admin.enableTable(table); A secondary index could be created in an other table which is periodically updated via a MapReduce job. The job could be executed intra-day, but depending on load-strategy it could still potentially be out of sync with the main data table. - See for more information. + See for more information.
@@ -396,7 +563,7 @@ admin.enableTable(table); Where time-ranges are very wide (e.g., year-long report) and where the data is voluminous, summary tables are a common approach. These would be generated with MapReduce jobs into another table. - See for more information. + See for more information.