diff --git a/CHANGES.txt b/CHANGES.txt index 791d71046f5..c861439796d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,8 @@ HBase Change Log Release 0.19.0 - Unreleased INCOMPATIBLE CHANGES + HBASE-885 TableMap and TableReduce should be interfaces + (Doğacan Güney via Stack) BUG FIXES HBASE-891 HRS.validateValuesLength throws IOE, gets caught in the retries diff --git a/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java b/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java index 07e69a60ece..e40c3dabf4a 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java +++ b/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; @@ -37,7 +38,9 @@ import org.apache.hadoop.mapred.Reporter; /** * Extract grouping columns from input record */ -public class GroupingTableMap extends TableMap { +public class GroupingTableMap +extends MapReduceBase +implements TableMap { /** * JobConf parameter to specify the columns used to produce the key passed to @@ -63,7 +66,8 @@ public class GroupingTableMap extends TableMap public static void initJob(String table, String columns, String groupColumns, Class mapper, JobConf job) { - initJob(table, columns, mapper, ImmutableBytesWritable.class, RowResult.class, job); + TableMapReduceUtil.initTableMapJob(table, columns, mapper, + ImmutableBytesWritable.class, RowResult.class, job); job.set(GROUP_COLUMNS, groupColumns); } @@ -83,7 +87,6 @@ public class GroupingTableMap extends TableMap * Pass the new key and value to reduce. * If any of the grouping columns are not found in the value, the record is skipped. */ - @Override public void map(@SuppressWarnings("unused") ImmutableBytesWritable key, RowResult value, OutputCollector output, @SuppressWarnings("unused") Reporter reporter) throws IOException { diff --git a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java index f4b576ec6e5..97482b988e1 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java +++ b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java @@ -24,13 +24,16 @@ import java.io.IOException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * Pass the given key and record as-is to reduce */ -public class IdentityTableMap extends TableMap { +public class IdentityTableMap +extends MapReduceBase +implements TableMap { /** constructor */ public IdentityTableMap() { @@ -49,14 +52,14 @@ public class IdentityTableMap extends TableMap mapper, JobConf job) { - TableMap.initJob(table, columns, mapper, ImmutableBytesWritable.class, + TableMapReduceUtil.initTableMapJob(table, columns, mapper, + ImmutableBytesWritable.class, RowResult.class, job); } /** * Pass the key, value to reduce */ - @Override public void map(ImmutableBytesWritable key, RowResult value, OutputCollector output, @SuppressWarnings("unused") Reporter reporter) throws IOException { diff --git a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java index 88e7fcdc3fb..f2815b016ec 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java +++ b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java @@ -25,13 +25,16 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.BatchUpdate;import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * Write to table each key, record pair */ -public class IdentityTableReduce extends TableReduce { +public class IdentityTableReduce +extends MapReduceBase +implements TableReduce { @SuppressWarnings("unused") private static final Log LOG = LogFactory.getLog(IdentityTableReduce.class.getName()); @@ -41,7 +44,6 @@ public class IdentityTableReduce extends TableReduce values, OutputCollector output, @SuppressWarnings("unused") Reporter reporter) diff --git a/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java b/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java index a615639b36b..eae05fb4f77 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java +++ b/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityReducer; @@ -43,7 +44,9 @@ import org.apache.hadoop.util.ToolRunner; * Map outputs table rows IF the input row has columns that have content. * Uses an {@link IdentityReducer} */ -public class RowCounter extends TableMap implements Tool { +public class RowCounter +extends MapReduceBase +implements TableMap, Tool { /* Name of this 'program' */ static final String NAME = "rowcounter"; @@ -53,7 +56,6 @@ public class RowCounter extends TableMap impl new RowResult(Bytes.toBytes("dummy"),new HbaseMapWritable()); private static enum Counters {ROWS} - @Override public void map(ImmutableBytesWritable row, RowResult value, OutputCollector output, @SuppressWarnings("unused") Reporter reporter) @@ -93,7 +95,7 @@ public class RowCounter extends TableMap impl sb.append(args[i]); } // Second argument is the table name. - TableMap.initJob(args[1], sb.toString(), this.getClass(), + TableMapReduceUtil.initTableMapJob(args[1], sb.toString(), this.getClass(), ImmutableBytesWritable.class, RowResult.class, c); c.setReducerClass(IdentityReducer.class); // First arg is the output directory. diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableMap.java b/src/java/org/apache/hadoop/hbase/mapred/TableMap.java index 5e38f6e619c..b6da2f56dbf 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableMap.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableMap.java @@ -19,18 +19,11 @@ */ package org.apache.hadoop.hbase.mapred; -import java.io.IOException; - import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; /** * Scan an HBase table to sort by a specified sort column. @@ -40,42 +33,7 @@ import org.apache.hadoop.mapred.Reporter; * @param Writable value class */ @SuppressWarnings("unchecked") -public abstract class TableMap - extends MapReduceBase implements Mapper { - /** - * Use this before submitting a TableMap job. It will - * appropriately set up the JobConf. - * - * @param table table name - * @param columns columns to scan - * @param mapper mapper class - * @param outputKeyClass - * @param outputValueClass - * @param job job configuration - */ - public static void initJob(String table, String columns, - Class mapper, - Class outputKeyClass, - Class outputValueClass, JobConf job) { - - job.setInputFormat(TableInputFormat.class); - job.setMapOutputValueClass(outputValueClass); - job.setMapOutputKeyClass(outputKeyClass); - job.setMapperClass(mapper); - FileInputFormat.addInputPaths(job, table); - job.set(TableInputFormat.COLUMN_LIST, columns); - } +public interface TableMap +extends Mapper { - /** - * Call a user defined function on a single HBase record, represented - * by a key and its associated record value. - * - * @param key - * @param value - * @param output - * @param reporter - * @throws IOException - */ - public abstract void map(ImmutableBytesWritable key, RowResult value, - OutputCollector output, Reporter reporter) throws IOException; } diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java new file mode 100644 index 00000000000..d1a7b91e4bb --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -0,0 +1,52 @@ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; + +@SuppressWarnings("unchecked") +public class TableMapReduceUtil { + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + * + * @param table table name + * @param columns columns to scan + * @param mapper mapper class + * @param outputKeyClass + * @param outputValueClass + * @param job job configuration + */ + public static void initTableMapJob(String table, String columns, + Class mapper, + Class outputKeyClass, + Class outputValueClass, JobConf job) { + + job.setInputFormat(TableInputFormat.class); + job.setMapOutputValueClass(outputValueClass); + job.setMapOutputKeyClass(outputKeyClass); + job.setMapperClass(mapper); + FileInputFormat.addInputPaths(job, table); + job.set(TableInputFormat.COLUMN_LIST, columns); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table + * @param reducer + * @param job + */ + public static void initTableReduceJob(String table, + Class reducer, JobConf job) { + job.setOutputFormat(TableOutputFormat.class); + job.setReducerClass(reducer); + job.set(TableOutputFormat.OUTPUT_TABLE, table); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(BatchUpdate.class); + } +} diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java b/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java index 288f7a4329b..7584770b1dd 100644 --- a/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java +++ b/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java @@ -19,18 +19,11 @@ */ package org.apache.hadoop.hbase.mapred; -import java.io.IOException; -import java.util.Iterator; - import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; /** * Write a table, sorting by the input key @@ -39,34 +32,7 @@ import org.apache.hadoop.mapred.Reporter; * @param value class */ @SuppressWarnings("unchecked") -public abstract class TableReduce - extends MapReduceBase implements Reducer { - /** - * Use this before submitting a TableReduce job. It will - * appropriately set up the JobConf. - * - * @param table - * @param reducer - * @param job - */ - public static void initJob(String table, - Class reducer, JobConf job) { - job.setOutputFormat(TableOutputFormat.class); - job.setReducerClass(reducer); - job.set(TableOutputFormat.OUTPUT_TABLE, table); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(BatchUpdate.class); - } +public interface TableReduce +extends Reducer { - /** - * - * @param key - * @param values - * @param output - * @param reporter - * @throws IOException - */ - public abstract void reduce(K key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException; } \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java b/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java index 7ac4c0e5f46..de8b4db70d1 100644 --- a/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java +++ b/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; @@ -73,12 +74,13 @@ public class TestTableMapReduce extends MultiRegionTable { /** * Pass the given key and processed record reduce */ - public static class ProcessContentsMapper extends TableMap { + public static class ProcessContentsMapper + extends MapReduceBase + implements TableMap { /** * Pass the key, and reversed value to reduce */ @SuppressWarnings("unchecked") - @Override public void map(ImmutableBytesWritable key, RowResult value, OutputCollector output, @SuppressWarnings("unused") Reporter reporter) @@ -127,10 +129,10 @@ public class TestTableMapReduce extends MultiRegionTable { jobConf = new JobConf(conf, TestTableMapReduce.class); jobConf.setJobName("process column contents"); jobConf.setNumReduceTasks(1); - TableMap.initJob(Bytes.toString(table.getTableName()), INPUT_COLUMN, - ProcessContentsMapper.class, ImmutableBytesWritable.class, - BatchUpdate.class, jobConf); - TableReduce.initJob(Bytes.toString(table.getTableName()), + TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()), + INPUT_COLUMN, ProcessContentsMapper.class, + ImmutableBytesWritable.class, BatchUpdate.class, jobConf); + TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()), IdentityTableReduce.class, jobConf); LOG.info("Started " + table.getTableName());