From ff272e45ad6c6280c11fdf2242df41105ad30986 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 9 Jul 2009 04:08:20 +0000 Subject: [PATCH] HBASE-1626 Allow emitting Deletes out of new TableReducer git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@792388 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../apache/hadoop/hbase/client/Delete.java | 12 +++- .../hbase/mapreduce/IdentityTableReducer.java | 50 ++++++++++---- .../hbase/mapreduce/TableOutputFormat.java | 65 +++++++++++++++---- .../hadoop/hbase/mapreduce/TableReducer.java | 23 ++++--- 5 files changed, 116 insertions(+), 36 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 2a540b35c5c..3231f127c88 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -450,6 +450,8 @@ Release 0.20.0 - Unreleased HBASE-1620 Need to use special StoreScanner constructor for major compactions (passed sf, no caching, etc) (Jon Gray via Stack) HBASE-1624 Don't sort Puts if only one in list in HCM#processBatchOfRows + HBASE-1626 Allow emitting Deletes out of new TableReducer + (Lars George via Stack) OPTIMIZATIONS HBASE-1412 Change values for delete column and column family in KeyValue diff --git a/src/java/org/apache/hadoop/hbase/client/Delete.java b/src/java/org/apache/hadoop/hbase/client/Delete.java index 56900b63e37..c38ab639b5d 100644 --- a/src/java/org/apache/hadoop/hbase/client/Delete.java +++ b/src/java/org/apache/hadoop/hbase/client/Delete.java @@ -70,7 +70,7 @@ public class Delete implements Writable { /** Constructor for Writable. DO NOT USE */ public Delete() { - this(null); + this((byte [])null); } /** @@ -107,6 +107,16 @@ public class Delete implements Writable { } } + /** + * @param d Delete to clone. + */ + public Delete(final Delete d) { + this.row = d.getRow(); + this.ts = d.getTimeStamp(); + this.lockId = d.getLockId(); + this.familyMap.putAll(d.getFamilyMap()); + } + /** * Method to check if the familyMap is empty * @return true if empty, false otherwise diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java b/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java index b4c51c2fbba..eb7609cec6d 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java @@ -20,38 +20,60 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.OutputFormat; /** - * Convenience class that simply writes each key, record pair to the configured - * HBase table. + * Convenience class that simply writes all values (which must be + * {@link org.apache.hadoop.hbase.client.Put Put} or + * {@link org.apache.hadoop.hbase.client.Delete Delete} instances) + * passed to it out to the configured HBase table. This works in combination + * with {@link TableOutputFormat} which actually does the writing to HBase.

+ * + * Keys are passed along but ignored in TableOutputFormat. However, they can + * be used to control how your values will be divided up amongst the specified + * number of reducers.

+ * + * You can also use the {@link TableMapReduceUtil} class to set up the two + * classes in one step: + *

+ * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job); + *
+ * This will also set the proper {@link TableOutputFormat} which is given the + * table parameter. The + * {@link org.apache.hadoop.hbase.client.Put Put} or + * {@link org.apache.hadoop.hbase.client.Delete Delete} define the + * row and columns implicitly. */ public class IdentityTableReducer -extends TableReducer { +extends TableReducer { @SuppressWarnings("unused") private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class); /** - * Writes each given record, consisting of the key and the given values, to - * the HBase table. + * Writes each given record, consisting of the row key and the given values, + * to the configured {@link OutputFormat}. It is emitting the row key and each + * {@link org.apache.hadoop.hbase.client.Put Put} or + * {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs. * - * @param key The current row key. - * @param values The values for the given row. + * @param key The current row key. + * @param values The {@link org.apache.hadoop.hbase.client.Put Put} or + * {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given + * row. * @param context The context of the reduce. * @throws IOException When writing the record fails. * @throws InterruptedException When the job gets interrupted. */ - public void reduce(ImmutableBytesWritable key, Iterator values, + @Override + public void reduce(Writable key, Iterable values, Context context) throws IOException, InterruptedException { - while(values.hasNext()) { - context.write(key, values.next()); + for(Writable putOrDelete : values) { + context.write(key, putOrDelete); } } - + } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index b21545c65c9..b31b0c7fac3 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -24,29 +24,36 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** - * Convert Map/Reduce output and write it to an HBase table. + * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored + * while the output value must be either a {@link Put} or a + * {@link Delete} instance. + * + * @param The type of the key. Ignored in this class. */ -public class TableOutputFormat extends - FileOutputFormat { +public class TableOutputFormat extends OutputFormat { private final Log LOG = LogFactory.getLog(TableOutputFormat.class); /** Job parameter that specifies the output table. */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; /** - * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) - * and write to an HBase table + * Writes the reducer output to an HBase table. + * + * @param The type of the key. */ - protected static class TableRecordWriter - extends RecordWriter { + protected static class TableRecordWriter + extends RecordWriter { /** The table to write to. */ private HTable table; @@ -82,9 +89,11 @@ public class TableOutputFormat extends * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object) */ @Override - public void write(ImmutableBytesWritable key, Put value) + public void write(KEY key, Writable value) throws IOException { - table.put(new Put(value)); + if (value instanceof Put) this.table.put(new Put((Put)value)); + else if (value instanceof Delete) this.table.delete(new Delete((Delete)value)); + else throw new IOException("Pass a Delete or a Put"); } } @@ -97,7 +106,7 @@ public class TableOutputFormat extends * @throws InterruptedException When the jobs is cancelled. * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) */ - public RecordWriter getRecordWriter( + public RecordWriter getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { // expecting exactly one path @@ -111,7 +120,37 @@ public class TableOutputFormat extends throw e; } table.setAutoFlush(false); - return new TableRecordWriter(table); + return new TableRecordWriter(table); + } + + /** + * Checks if the output target exists. + * + * @param context The current context. + * @throws IOException When the check fails. + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public void checkOutputSpecs(JobContext context) throws IOException, + InterruptedException { + // TODO Check if the table exists? + + } + + /** + * Returns the output committer. + * + * @param context The current context. + * @return The committer. + * @throws IOException When creating the committer fails. + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new TableOutputCommitter(); } } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java index fcfc14498a5..64540ac73c3 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java @@ -19,19 +19,26 @@ */ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Reducer; /** * Extends the basic Reducer class to add the required key and - * value output classes. + * value input/output classes. While the input key and value as well as the + * output key can be anything handed in from the previous map phase the output + * value must be either a {@link org.apache.hadoop.hbase.client.Put Put} + * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when + * using the {@link TableOutputFormat} class. + *

+ * This class is extended by {@link IdentityTableReducer} but can also be + * subclassed to implement similar features or any custom code needed. It has + * the advantage to enforce the output value to a specific basic type. * - * @param The type of the key. - * @param The type of the value. + * @param The type of the input key. + * @param The type of the input value. + * @param The type of the output key. * @see org.apache.hadoop.mapreduce.Reducer */ -public abstract class TableReducer -extends Reducer { - +public abstract class TableReducer +extends Reducer { } \ No newline at end of file