HADOOP-2548 Make TableMap and TableReduce generic

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@611488 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-01-12 21:32:46 +00:00
parent d004f6e545
commit 979da3823a
8 changed files with 37 additions and 145 deletions

View File

@ -164,6 +164,8 @@ Trunk (unreleased changes)
HADOOP-2450 Show version (and svn revision) in hbase web ui
HADOOP-2472 Range selection using filter (Edward Yoon via Stack)
HADOOP-2553 Don't make Long objects calculating hbase type hash codes
HADOOP-2548 Make TableMap and TableReduce generic
(Frederik Hedberg via Stack)

View File

@ -27,18 +27,18 @@ import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Extract grouping columns from input record
*/
public class GroupingTableMap extends TableMap {
public class GroupingTableMap extends TableMap<Text,MapWritable> {
/**
* JobConf parameter to specify the columns used to produce the key passed to
@ -49,11 +49,6 @@ public class GroupingTableMap extends TableMap {
protected Text[] m_columns;
/** default constructor */
public GroupingTableMap() {
super();
}
/**
* Use this before submitting a TableMap job. It will appropriately set up the
* JobConf.
@ -65,6 +60,7 @@ public class GroupingTableMap extends TableMap {
* @param mapper map class
* @param job job configuration object
*/
@SuppressWarnings("unchecked")
public static void initJob(String table, String columns, String groupColumns,
Class<? extends TableMap> mapper, JobConf job) {
@ -89,11 +85,11 @@ public class GroupingTableMap extends TableMap {
* Pass the new key and value to reduce.
* If any of the grouping columns are not found in the value, the record is skipped.
*
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(@SuppressWarnings("unused") HStoreKey key,
MapWritable value, TableOutputCollector output,
MapWritable value, OutputCollector<Text,MapWritable> output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
byte[][] keyVals = extractKeyValues(value);

View File

@ -24,13 +24,14 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Pass the given key and record as-is to reduce
*/
public class IdentityTableMap extends TableMap {
public class IdentityTableMap extends TableMap<Text, MapWritable> {
/** constructor */
public IdentityTableMap() {
@ -40,11 +41,11 @@ public class IdentityTableMap extends TableMap {
/**
* Pass the key, value to reduce
*
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
* @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(HStoreKey key, MapWritable value,
TableOutputCollector output,
OutputCollector<Text,MapWritable> output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
Text tKey = key.getRow();

View File

@ -24,28 +24,23 @@ import java.util.Iterator;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Write to table each key, record pair
*/
public class IdentityTableReduce extends TableReduce {
/** constructor */
public IdentityTableReduce() {
super();
}
public class IdentityTableReduce extends TableReduce<Text, MapWritable> {
/**
* No aggregation, output pairs of (key, record)
*
* @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.Text, java.util.Iterator, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
* @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void reduce(Text key, @SuppressWarnings("unchecked") Iterator values,
TableOutputCollector output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
public void reduce(Text key, Iterator<MapWritable> values,
OutputCollector<Text, MapWritable> output, Reporter reporter)
throws IOException {
while(values.hasNext()) {
MapWritable r = (MapWritable)values.next();

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@ -30,8 +32,6 @@ import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HStoreKey;
/**
* Scan an HBase table to sort by a specified sort column.
@ -39,14 +39,8 @@ import org.apache.hadoop.hbase.HStoreKey;
*
*/
@SuppressWarnings("unchecked")
public abstract class TableMap extends MapReduceBase implements Mapper {
private TableOutputCollector m_collector;
/** constructor*/
public TableMap() {
m_collector = new TableOutputCollector();
}
public abstract class TableMap<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Mapper<HStoreKey, MapWritable, K, V> {
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
@ -56,9 +50,8 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
* @param mapper mapper class
* @param job job configuration
*/
public static void initJob(String table, String columns,
public static void initJob(String table, String columns,
Class<? extends TableMap> mapper, JobConf job) {
job.setInputFormat(TableInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MapWritable.class);
@ -67,27 +60,6 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
job.set(TableInputFormat.COLUMN_LIST, columns);
}
/**
* Input:
* @param key is of type HStoreKey
* @param value is of type KeyedDataArrayWritable
* @param output output collector
* @param reporter object to use for status updates
* @throws IOException
*
* Output:
* The key is a specific column, including the input key or any value
* The value is of type LabeledData
*/
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter) throws IOException {
if(m_collector.collector == null) {
m_collector.collector = output;
}
map((HStoreKey)key, (MapWritable)value, m_collector, reporter);
}
/**
* Call a user defined function on a single HBase record, represented
* by a key and its associated record value.
@ -98,6 +70,6 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
* @param reporter
* @throws IOException
*/
public abstract void map(HStoreKey key, MapWritable value,
TableOutputCollector output, Reporter reporter) throws IOException;
public abstract void map(HStoreKey key, MapWritable value,
OutputCollector<K, V> output, Reporter reporter) throws IOException;
}

View File

@ -1,47 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
/**
* Refine the types that can be collected from a Table Map/Reduce jobs.
*/
public class TableOutputCollector {
/** The collector object */
@SuppressWarnings("unchecked")
public OutputCollector collector;
/**
* Restrict Table Map/Reduce's output to be a Text key and a record.
*
* @param key
* @param value
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void collect(Text key, MapWritable value) throws IOException {
collector.collect(key, value);
}
}

View File

@ -22,8 +22,10 @@ package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
@ -34,14 +36,8 @@ import org.apache.hadoop.mapred.Reporter;
* Write a table, sorting by the input key
*/
@SuppressWarnings("unchecked")
public abstract class TableReduce extends MapReduceBase implements Reducer {
TableOutputCollector m_collector;
/** Constructor */
public TableReduce() {
m_collector = new TableOutputCollector();
}
public abstract class TableReduce<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Reducer<K, V, Text, MapWritable> {
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
@ -50,30 +46,13 @@ public abstract class TableReduce extends MapReduceBase implements Reducer {
* @param reducer
* @param job
*/
public static void initJob(String table, Class<? extends TableReduce> reducer,
JobConf job) {
public static void initJob(String table,
Class<? extends TableReduce> reducer, JobConf job) {
job.setOutputFormat(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
}
/**
* Create a unique key for table insertion by appending a local
* counter the given key.
*
* @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@SuppressWarnings("unchecked")
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
if(m_collector.collector == null) {
m_collector.collector = output;
}
reduce((Text)key, values, m_collector, reporter);
}
/**
*
* @param key
@ -82,8 +61,7 @@ public abstract class TableReduce extends MapReduceBase implements Reducer {
* @param reporter
* @throws IOException
*/
@SuppressWarnings("unchecked")
public abstract void reduce(Text key, Iterator values,
TableOutputCollector output, Reporter reporter) throws IOException;
public abstract void reduce(K key, Iterator<V> values,
OutputCollector<Text, MapWritable> output, Reporter reporter)
throws IOException;
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
/**
* Test Map/Reduce job over HBase tables
@ -147,13 +148,7 @@ public class TestTableMapReduce extends MultiRegionTable {
/**
* Pass the given key and processed record reduce
*/
public static class ProcessContentsMapper extends TableMap {
/** constructor */
public ProcessContentsMapper() {
super();
}
public static class ProcessContentsMapper extends TableMap<Text, MapWritable> {
/**
* Pass the key, and reversed value to reduce
*
@ -162,7 +157,7 @@ public class TestTableMapReduce extends MultiRegionTable {
@SuppressWarnings("unchecked")
@Override
public void map(HStoreKey key, MapWritable value,
TableOutputCollector output,
OutputCollector<Text, MapWritable> output,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
Text tKey = key.getRow();