HBASE-885 TableMap and TableReduce should be interfaces

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@698089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-09-23 05:04:12 +00:00
parent aa419f96dd
commit 2dbdd19ccc
9 changed files with 87 additions and 97 deletions

View File

@ -1,6 +1,8 @@
HBase Change Log HBase Change Log
Release 0.19.0 - Unreleased Release 0.19.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
HBASE-885 TableMap and TableReduce should be interfaces
(Doğacan Güney via Stack)
BUG FIXES BUG FIXES
HBASE-891 HRS.validateValuesLength throws IOE, gets caught in the retries HBASE-891 HRS.validateValuesLength throws IOE, gets caught in the retries

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
@ -37,7 +38,9 @@ import org.apache.hadoop.mapred.Reporter;
/** /**
* Extract grouping columns from input record * Extract grouping columns from input record
*/ */
public class GroupingTableMap extends TableMap<ImmutableBytesWritable,RowResult> { public class GroupingTableMap
extends MapReduceBase
implements TableMap<ImmutableBytesWritable,RowResult> {
/** /**
* JobConf parameter to specify the columns used to produce the key passed to * JobConf parameter to specify the columns used to produce the key passed to
@ -63,7 +66,8 @@ public class GroupingTableMap extends TableMap<ImmutableBytesWritable,RowResult>
public static void initJob(String table, String columns, String groupColumns, public static void initJob(String table, String columns, String groupColumns,
Class<? extends TableMap> mapper, JobConf job) { Class<? extends TableMap> mapper, JobConf job) {
initJob(table, columns, mapper, ImmutableBytesWritable.class, RowResult.class, job); TableMapReduceUtil.initTableMapJob(table, columns, mapper,
ImmutableBytesWritable.class, RowResult.class, job);
job.set(GROUP_COLUMNS, groupColumns); job.set(GROUP_COLUMNS, groupColumns);
} }
@ -83,7 +87,6 @@ public class GroupingTableMap extends TableMap<ImmutableBytesWritable,RowResult>
* Pass the new key and value to reduce. * Pass the new key and value to reduce.
* If any of the grouping columns are not found in the value, the record is skipped. * If any of the grouping columns are not found in the value, the record is skipped.
*/ */
@Override
public void map(@SuppressWarnings("unused") ImmutableBytesWritable key, public void map(@SuppressWarnings("unused") ImmutableBytesWritable key,
RowResult value, OutputCollector<ImmutableBytesWritable,RowResult> output, RowResult value, OutputCollector<ImmutableBytesWritable,RowResult> output,
@SuppressWarnings("unused") Reporter reporter) throws IOException { @SuppressWarnings("unused") Reporter reporter) throws IOException {

View File

@ -24,13 +24,16 @@ import java.io.IOException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
/** /**
* Pass the given key and record as-is to reduce * Pass the given key and record as-is to reduce
*/ */
public class IdentityTableMap extends TableMap<ImmutableBytesWritable, RowResult> { public class IdentityTableMap
extends MapReduceBase
implements TableMap<ImmutableBytesWritable, RowResult> {
/** constructor */ /** constructor */
public IdentityTableMap() { public IdentityTableMap() {
@ -49,14 +52,14 @@ public class IdentityTableMap extends TableMap<ImmutableBytesWritable, RowResult
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void initJob(String table, String columns, public static void initJob(String table, String columns,
Class<? extends TableMap> mapper, JobConf job) { Class<? extends TableMap> mapper, JobConf job) {
TableMap.initJob(table, columns, mapper, ImmutableBytesWritable.class, TableMapReduceUtil.initTableMapJob(table, columns, mapper,
ImmutableBytesWritable.class,
RowResult.class, job); RowResult.class, job);
} }
/** /**
* Pass the key, value to reduce * Pass the key, value to reduce
*/ */
@Override
public void map(ImmutableBytesWritable key, RowResult value, public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<ImmutableBytesWritable,RowResult> output, OutputCollector<ImmutableBytesWritable,RowResult> output,
@SuppressWarnings("unused") Reporter reporter) throws IOException { @SuppressWarnings("unused") Reporter reporter) throws IOException {

View File

@ -25,13 +25,16 @@ import java.util.Iterator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.BatchUpdate;import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.BatchUpdate;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
/** /**
* Write to table each key, record pair * Write to table each key, record pair
*/ */
public class IdentityTableReduce extends TableReduce<ImmutableBytesWritable, BatchUpdate> { public class IdentityTableReduce
extends MapReduceBase
implements TableReduce<ImmutableBytesWritable, BatchUpdate> {
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(IdentityTableReduce.class.getName()); LogFactory.getLog(IdentityTableReduce.class.getName());
@ -41,7 +44,6 @@ public class IdentityTableReduce extends TableReduce<ImmutableBytesWritable, Bat
* *
* @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/ */
@Override
public void reduce(ImmutableBytesWritable key, Iterator<BatchUpdate> values, public void reduce(ImmutableBytesWritable key, Iterator<BatchUpdate> values,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output, OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
@SuppressWarnings("unused") Reporter reporter) @SuppressWarnings("unused") Reporter reporter)

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.IdentityReducer;
@ -43,7 +44,9 @@ import org.apache.hadoop.util.ToolRunner;
* Map outputs table rows IF the input row has columns that have content. * Map outputs table rows IF the input row has columns that have content.
* Uses an {@link IdentityReducer} * Uses an {@link IdentityReducer}
*/ */
public class RowCounter extends TableMap<ImmutableBytesWritable, RowResult> implements Tool { public class RowCounter
extends MapReduceBase
implements TableMap<ImmutableBytesWritable, RowResult>, Tool {
/* Name of this 'program' /* Name of this 'program'
*/ */
static final String NAME = "rowcounter"; static final String NAME = "rowcounter";
@ -53,7 +56,6 @@ public class RowCounter extends TableMap<ImmutableBytesWritable, RowResult> impl
new RowResult(Bytes.toBytes("dummy"),new HbaseMapWritable<byte [], Cell>()); new RowResult(Bytes.toBytes("dummy"),new HbaseMapWritable<byte [], Cell>());
private static enum Counters {ROWS} private static enum Counters {ROWS}
@Override
public void map(ImmutableBytesWritable row, RowResult value, public void map(ImmutableBytesWritable row, RowResult value,
OutputCollector<ImmutableBytesWritable, RowResult> output, OutputCollector<ImmutableBytesWritable, RowResult> output,
@SuppressWarnings("unused") Reporter reporter) @SuppressWarnings("unused") Reporter reporter)
@ -93,7 +95,7 @@ public class RowCounter extends TableMap<ImmutableBytesWritable, RowResult> impl
sb.append(args[i]); sb.append(args[i]);
} }
// Second argument is the table name. // Second argument is the table name.
TableMap.initJob(args[1], sb.toString(), this.getClass(), TableMapReduceUtil.initTableMapJob(args[1], sb.toString(), this.getClass(),
ImmutableBytesWritable.class, RowResult.class, c); ImmutableBytesWritable.class, RowResult.class, c);
c.setReducerClass(IdentityReducer.class); c.setReducerClass(IdentityReducer.class);
// First arg is the output directory. // First arg is the output directory.

View File

@ -19,18 +19,11 @@
*/ */
package org.apache.hadoop.hbase.mapred; package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/** /**
* Scan an HBase table to sort by a specified sort column. * Scan an HBase table to sort by a specified sort column.
@ -40,42 +33,7 @@ import org.apache.hadoop.mapred.Reporter;
* @param <V> Writable value class * @param <V> Writable value class
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public abstract class TableMap<K extends WritableComparable, V extends Writable> public interface TableMap<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Mapper<ImmutableBytesWritable, RowResult, K, V> { extends Mapper<ImmutableBytesWritable, RowResult, K, V> {
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table table name
* @param columns columns to scan
* @param mapper mapper class
* @param outputKeyClass
* @param outputValueClass
* @param job job configuration
*/
public static void initJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, JobConf job) {
job.setInputFormat(TableInputFormat.class);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
}
/**
* Call a user defined function on a single HBase record, represented
* by a key and its associated record value.
*
* @param key
* @param value
* @param output
* @param reporter
* @throws IOException
*/
public abstract void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<K, V> output, Reporter reporter) throws IOException;
} }

View File

@ -0,0 +1,52 @@
package org.apache.hadoop.hbase.mapred;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
@SuppressWarnings("unchecked")
public class TableMapReduceUtil {
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table table name
* @param columns columns to scan
* @param mapper mapper class
* @param outputKeyClass
* @param outputValueClass
* @param job job configuration
*/
public static void initTableMapJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, JobConf job) {
job.setInputFormat(TableInputFormat.class);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table
* @param reducer
* @param job
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job) {
job.setOutputFormat(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(BatchUpdate.class);
}
}

View File

@ -19,18 +19,11 @@
*/ */
package org.apache.hadoop.hbase.mapred; package org.apache.hadoop.hbase.mapred;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/** /**
* Write a table, sorting by the input key * Write a table, sorting by the input key
@ -39,34 +32,7 @@ import org.apache.hadoop.mapred.Reporter;
* @param <V> value class * @param <V> value class
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public abstract class TableReduce<K extends WritableComparable, V extends Writable> public interface TableReduce<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Reducer<K, V, ImmutableBytesWritable, BatchUpdate> { extends Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table
* @param reducer
* @param job
*/
public static void initJob(String table,
Class<? extends TableReduce> reducer, JobConf job) {
job.setOutputFormat(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(BatchUpdate.class);
}
/**
*
* @param key
* @param values
* @param output
* @param reporter
* @throws IOException
*/
public abstract void reduce(K key, Iterator<V> values,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter)
throws IOException;
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
@ -73,12 +74,13 @@ public class TestTableMapReduce extends MultiRegionTable {
/** /**
* Pass the given key and processed record reduce * Pass the given key and processed record reduce
*/ */
public static class ProcessContentsMapper extends TableMap<ImmutableBytesWritable, BatchUpdate> { public static class ProcessContentsMapper
extends MapReduceBase
implements TableMap<ImmutableBytesWritable, BatchUpdate> {
/** /**
* Pass the key, and reversed value to reduce * Pass the key, and reversed value to reduce
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override
public void map(ImmutableBytesWritable key, RowResult value, public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output, OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
@SuppressWarnings("unused") Reporter reporter) @SuppressWarnings("unused") Reporter reporter)
@ -127,10 +129,10 @@ public class TestTableMapReduce extends MultiRegionTable {
jobConf = new JobConf(conf, TestTableMapReduce.class); jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents"); jobConf.setJobName("process column contents");
jobConf.setNumReduceTasks(1); jobConf.setNumReduceTasks(1);
TableMap.initJob(Bytes.toString(table.getTableName()), INPUT_COLUMN, TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
ProcessContentsMapper.class, ImmutableBytesWritable.class, INPUT_COLUMN, ProcessContentsMapper.class,
BatchUpdate.class, jobConf); ImmutableBytesWritable.class, BatchUpdate.class, jobConf);
TableReduce.initJob(Bytes.toString(table.getTableName()), TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
IdentityTableReduce.class, jobConf); IdentityTableReduce.class, jobConf);
LOG.info("Started " + table.getTableName()); LOG.info("Started " + table.getTableName());