diff --git a/CHANGES.txt b/CHANGES.txt index 635955134c6..eeb553f65f6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -109,6 +109,8 @@ Release 0.19.0 - Unreleased (Samuel Guo via Stack) HBASE-722 Shutdown and Compactions HBASE-983 Declare Perl namespace in Hbase.thrift + HBASE-987 We need a Hbase Partitioner for TableMapReduceUtil.initTableReduceJob + MR Jobs (Billy Pearson via Stack) NEW FEATURES HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] diff --git a/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java b/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java new file mode 100644 index 00000000000..d4f42c2c010 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java @@ -0,0 +1,89 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + + +/** + * This is used to partition the output keys into groups of keys. + * Keys are grouped according to the regions that currently exist + * so that each reducer fills a single region so load is distributed. + */ +public class HRegionPartitioner +implements Partitioner { + private final Log LOG = LogFactory.getLog(TableInputFormat.class); + private HTable table; + private byte[][] startKeys; + + public void configure(JobConf job) { + try { + this.table = new HTable(new HBaseConfiguration(job), + job.get(TableOutputFormat.OUTPUT_TABLE)); + } catch (IOException e) { + LOG.error(e); + } + + try { + this.startKeys = this.table.getStartKeys(); + } catch (IOException e) { + LOG.error(e); + } + } + + @Override + public int getPartition(ImmutableBytesWritable key, V2 value, + int numPartitions) { + byte[] region = null; + // Only one region return 0 + if (this.startKeys.length == 1){ + return 0; + } + try { + // Not sure if this is cached after a split so we could have problems + // here if a region splits while mapping + region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey(); + } catch (IOException e) { + LOG.error(e); + } + for (int i = 0; i < this.startKeys.length; i++){ + if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ + if (i >= numPartitions-1){ + // cover if we have less reduces then regions. + return (Integer.toString(i).hashCode() + & Integer.MAX_VALUE) % numPartitions; + } else { + return i; + } + } + } + // if above fails to find start key that match we need to return something + return 0; + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index d1a7b91e4bb..29a7221d453 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -1,5 +1,28 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.mapred; +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Writable; @@ -7,6 +30,9 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; +/** + * Utility for {@link TableMap} and {@link TableReduce} + */ @SuppressWarnings("unchecked") public class TableMapReduceUtil { /** @@ -40,13 +66,39 @@ public class TableMapReduceUtil { * @param table * @param reducer * @param job + * @throws IOException */ public static void initTableReduceJob(String table, Class reducer, JobConf job) { + initTableReduceJob(table, reducer, job, null); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table + * @param reducer + * @param job + * @param partitioner Partitioner to use. Pass null to use default + * partitioner. + * @throws IOException + */ + public static void initTableReduceJob(String table, + Class reducer, JobConf job, Class partitioner) + throws IOException { job.setOutputFormat(TableOutputFormat.class); job.setReducerClass(reducer); job.set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(BatchUpdate.class); + if (partitioner != null) { + job.setPartitionerClass(HRegionPartitioner.class); + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + if (job.getNumReduceTasks() > regions){ + job.setNumReduceTasks(outputTable.getRegionsInfo().size()); + } + } } } diff --git a/src/java/org/apache/hadoop/hbase/mapred/package-info.java b/src/java/org/apache/hadoop/hbase/mapred/package-info.java index c9157f24041..4e13f50b00a 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/package-info.java +++ b/src/java/org/apache/hadoop/hbase/mapred/package-info.java @@ -26,11 +26,10 @@ Input/OutputFormats, a table indexing MapReduce job, and utility

MapReduce jobs deployed to a MapReduce cluster do not by default have access to the HBase configuration under $HBASE_CONF_DIR nor to HBase classes. You could add hbase-site.xml to $HADOOP_HOME/conf and add -hbase-X.X.X.jar -to the $HADOOP_HOME/lib and copy these changes across your cluster but the -cleanest means of adding hbase configuration and classes to the cluster -CLASSPATH -is by uncommenting HADOOP_CLASSPATH in $HADOOP_HOME/conf/hadoop-env.sh +hbase-X.X.X.jar to the $HADOOP_HOME/lib and copy these +changes across your cluster but the cleanest means of adding hbase configuration +and classes to the cluster CLASSPATH is by uncommenting +HADOOP_CLASSPATH in $HADOOP_HOME/conf/hadoop-env.sh and adding the path to the hbase jar and $HBASE_CONF_DIR directory. Then copy the amended configuration around the cluster. You'll probably need to restart the MapReduce cluster if you want it to notice @@ -45,7 +44,7 @@ classes to the hadoop CLASSPATH # export HADOOP_CLASSPATH= export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf -

Expand $HBASE_HOME appropriately in the in accordance with your local environment.

+

Expand $HBASE_HOME in the above appropriately to suit your local environment.

This is how you would run the PerformanceEvaluation MR job to put up 4 clients: @@ -53,6 +52,8 @@ export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar The PerformanceEvaluation class wil be found on the CLASSPATH because you added $HBASE_HOME/build/test to HADOOP_CLASSPATH

+

NOTE: While previous it used to be possible to bundle the hbase.jar up inside the job jar you submit to hadoop, as of +0.2.0RC2, this is no longer so. See HBASE-797.

HBase as MapReduce job data source and sink

@@ -70,9 +71,18 @@ or review the org.apache.hadoop.hbase.mapred.TestTableMapReduce uni

Running mapreduce jobs that have hbase as source or sink, you'll need to specify source/sink table and column names in your configuration.

-

Reading from hbase, the !TableInputFormat asks hbase for the list of -regions and makes a map-per-region. Writing, its better to have lots of -reducers so load is spread across the hbase cluster. +

Reading from hbase, the TableInputFormat asks hbase for the list of +regions and makes a map-per-region. +Writing, it may make sense to avoid the reduce step and write back into hbase from inside your map. You'd do this when your job does not need the sort and collation that MR does inside in its reduce; on insert, +hbase sorts so no point double-sorting (and shuffling data around your MR cluster) unless you need to. If you do not need the reduce, you might just have your map emit counts of records processed just so the +framework can emit that nice report of records processed when the job is done. See example code below. If running the reduce step makes sense in your case, its better to have lots of reducers so load is spread +across the hbase cluster.

+ +

There is also a partitioner +that will run as many reducers as currently existing regions. The +partitioner HRegionPartitioner is suitable when your table is large +and your upload is not such that it will greatly alter the number of existing +regions after its done.

Example Code

@@ -83,9 +93,269 @@ the hbase MapReduce Driver class. Select 'rowcounter' from the choice of jobs offered.

-

Sample MR Bulk Uploader

-

Read the class comment below for specification of inputs, prerequisites, etc. +

Example to bulk import/load a text file into an HTable +

+ +

Here's a sample program from [WWW] Allen Day that takes an HDFS text file path and an HBase table name as inputs, and loads the contents of the text file to the table.

+ +
package com.spicylogic.hbase;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class BulkImport implements Tool {
+  private static final String NAME = "BulkImport";
+  private Configuration conf;
+
+  public static class InnerMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
+    private HTable table;
+    private HBaseConfiguration HBconf;
+
+    public void map(LongWritable key, Text value, OutputCollector<<<<<<<<Text, Text> output, Reporter reporter) throws IOException {
+      if ( table == null )
+        throw new IOException("table is null");
+      
+      String [] splits = value.toString().split("\t");
+      if ( splits.length != 4 )
+        return;
+
+      String rowID     = splits[0];
+      int timestamp    = Integer.parseInt( splits[1] );
+      String colID     = splits[2];
+      String cellValue = splits[3];
+
+      reporter.setStatus("Map emitting cell for row='" + rowID + "', column='" + colID + "', time='" + timestamp + "'");
+
+      BatchUpdate bu = new BatchUpdate( rowID );
+      if ( timestamp > 0 )
+        bu.setTimestamp( timestamp );
+
+      bu.put(colID, cellValue.getBytes());      
+      table.commit( bu );      
+    }
+    public void configure(JobConf job) {
+      HBconf = new HBaseConfiguration();
+      try {
+        table = new HTable( HBconf, job.get("input.table") );
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+  }
+  
+  
+  public JobConf createSubmittableJob(String[] args) {
+    JobConf c = new JobConf(getConf(), BulkImport.class);
+    c.setJobName(NAME);
+    c.setInputPath(new Path(args[0]));
+
+    c.set("input.table", args[1]);
+    c.setMapperClass(InnerMap.class);
+    c.setNumReduceTasks(0);
+    c.setOutputFormat(NullOutputFormat.class);
+    return c;
+  }
+  
+  static int printUsage() {
+    System.err.println("Usage: " + NAME + " <input> <table_name>");
+    System.err.println("\twhere <input> is a tab-delimited text file with 4 columns.");
+    System.err.println("\t\tcolumn 1 = row ID");
+    System.err.println("\t\tcolumn 2 = timestamp (use a negative value for current time)");
+    System.err.println("\t\tcolumn 3 = column ID");
+    System.err.println("\t\tcolumn 4 = cell value");
+    return -1;
+  } 
+
+  public int run(@SuppressWarnings("unused") String[] args) throws Exception {
+    // Make sure there are exactly 3 parameters left.
+    if (args.length != 2) {
+      return printUsage();
+    }
+    JobClient.runJob(createSubmittableJob(args));
+    return 0;
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  } 
+
+  public void setConf(final Configuration c) {
+    this.conf = c;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int errCode = ToolRunner.run(new Configuration(), new BulkImport(), args);
+    System.exit(errCode);
+  }
+}
+
+ +

Example to map rows/column families between two HTables +

+ +

Here another sample program from [WWW] Allen Day that will iterate over all rows in one table for specified column families and insert those rows/columns to a second table. +

+ +
package com.spicylogic.hbase;
+import java.io.IOException;
+
+public class BulkCopy extends TableMap<Text, Text> implements Tool {
+  static final String NAME = "bulkcopy";  
+  private Configuration conf;
+  
+  public void map(ImmutableBytesWritable row, RowResult value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    HTable table = new HTable(new HBaseConfiguration(), conf.get("output.table"));
+    if ( table == null ) {
+      throw new IOException("output table is null");
+    }
+
+    BatchUpdate bu = new BatchUpdate( row.get() );
+
+    boolean content = false;
+    for (Map.Entry<byte [], Cell> e: value.entrySet()) {
+      Cell cell = e.getValue();
+      if (cell != null && cell.getValue().length > 0) {
+        bu.put(e.getKey(), cell.getValue());
+      }
+    }
+    table.commit( bu );
+  }
+
+  public JobConf createSubmittableJob(String[] args) throws IOException {
+    JobConf c = new JobConf(getConf(), BulkExport.class);
+    //table = new HTable(new HBaseConfiguration(), args[2]);
+    c.set("output.table", args[2]);
+    c.setJobName(NAME);
+    // Columns are space delimited
+    StringBuilder sb = new StringBuilder();
+    final int columnoffset = 3;
+    for (int i = columnoffset; i < args.length; i++) {
+      if (i > columnoffset) {
+        sb.append(" ");
+      }
+      sb.append(args[i]);
+    }
+    // Second argument is the table name.
+    TableMap.initJob(args[1], sb.toString(), this.getClass(),
+    Text.class, Text.class, c);
+    c.setReducerClass(IdentityReducer.class);
+    // First arg is the output directory.
+    c.setOutputPath(new Path(args[0]));
+    return c;
+  }
+  
+  static int printUsage() {
+    System.out.println(NAME +" <outputdir> <input tablename> <output tablename> <column1> [<column2>...]");
+    return -1;
+  }
+  
+  public int run(final String[] args) throws Exception {
+    // Make sure there are at least 3 parameters
+    if (args.length < 3) {
+      System.err.println("ERROR: Wrong number of parameters: " + args.length);
+      return printUsage();
+    }
+    JobClient.runJob(createSubmittableJob(args));
+    return 0;
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  public void setConf(final Configuration c) {
+    this.conf = c;
+  }
+
+  public static void main(String[] args) throws Exception {
+    //String[] aa = {"/tmp/foobar", "M2", "M3", "R:"};
+    int errCode = ToolRunner.run(new HBaseConfiguration(), new BulkCopy(), args);
+    System.exit(errCode);
+  }
+}
+
+ +

Sample running HBase inserts out of Map Task +

+ +

Here's sample code from Andrew Purtell that does HBase insert inside in the mapper rather than via TableReduce. +

+ +
+public class MyMap 
+  extends TableMap<ImmutableBytesWritable,MapWritable> // or whatever
+{
+  private HTable table;
+
+  public void configure(JobConf job) {
+    super.configure(job);
+    try {
+      HBaseConfiguration conf = new HBaseConfiguration(job);
+      table = new HTable(conf, "mytable");
+    } catch (Exception) {
+      // can't do anything about this now
+    }
+  }
+
+  public void map(ImmutableBytesWritable key, RowResult value,
+    OutputCollector<ImmutableBytesWritable,MapWritable> output,
+    Reporter reporter) throws IOException
+  {
+    // now we can report an exception opening the table
+    if (table == null)
+      throw new IOException("could not open mytable");
+
+    // ...
+
+    // commit the result
+    BatchUpdate update = new BatchUpdate();
+    // ...
+    table.commit(update);
+  }
+}
+
+ +

This assumes that you do this when setting up your job: JobConf conf = new JobConf(new HBaseConfiguration()); +

+ +

Or maybe something like this: +

+      JobConf conf = new JobConf(new Configuration()); 
+      conf.set("hbase.master", myMaster);
+
+

+ +

Sample MR Bulk Uploader +

+

A students/classes example by Naama Kraus. +

+ +

Read the class comment below for specification of inputs, prerequisites, etc. In particular, note that the class comment says that this code is for hbase 0.1.x. +

+
package org.apache.hadoop.hbase.mapred;
 
 import java.io.IOException;
@@ -106,7 +376,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-/**
+/*
  * Sample uploader.
  * 
  * This is EXAMPLE code.  You will need to change it to work for your context.
@@ -143,7 +413,7 @@ import org.apache.hadoop.util.ToolRunner;
  * <p>This code was written against hbase 0.1 branch.
  */
 public class SampleUploader extends MapReduceBase
-implements Mapper, Tool {
+implements Mapper<LongWritable, Text, Text, MapWritable>, Tool {
   private static final String NAME = "SampleUploader";
   private Configuration conf;
 
@@ -160,7 +430,7 @@ implements Mapper, Tool {
   } 
 
   public void map(LongWritable k, Text v,
-    OutputCollector output, Reporter r)
+    OutputCollector<Text, MapWritable> output, Reporter r)
   throws IOException {
     // Lines are space-delimited; first item is row, next the columnname and
     // then the third the cell value.
@@ -179,9 +449,8 @@ implements Mapper, Tool {
   
   public static class TableUploader
   extends TableReduce<Text, MapWritable> {
-    @Override
-    public void reduce(Text k, Iterator v,
-      OutputCollector output, Reporter r)
+    public void reduce(Text k, Iterator<MapWritable> v,
+      OutputCollector<Text, MapWritable> output, Reporter r)
     throws IOException {
       while (v.hasNext()) {
         r.setStatus("Reducer committing " + k);
@@ -191,7 +460,7 @@ implements Mapper, Tool {
   }
   
   static int printUsage() {
-    System.out.println(NAME + "<input> <table_name>");
+    System.out.println(NAME + " <input> <table_name>");
     return -1;
   }