diff --git a/CHANGES.txt b/CHANGES.txt
index 2a540b35c5c..3231f127c88 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -450,6 +450,8 @@ Release 0.20.0 - Unreleased
HBASE-1620 Need to use special StoreScanner constructor for major compactions
(passed sf, no caching, etc) (Jon Gray via Stack)
HBASE-1624 Don't sort Puts if only one in list in HCM#processBatchOfRows
+ HBASE-1626 Allow emitting Deletes out of new TableReducer
+ (Lars George via Stack)
OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue
diff --git a/src/java/org/apache/hadoop/hbase/client/Delete.java b/src/java/org/apache/hadoop/hbase/client/Delete.java
index 56900b63e37..c38ab639b5d 100644
--- a/src/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/src/java/org/apache/hadoop/hbase/client/Delete.java
@@ -70,7 +70,7 @@ public class Delete implements Writable {
/** Constructor for Writable. DO NOT USE */
public Delete() {
- this(null);
+ this((byte [])null);
}
/**
@@ -107,6 +107,16 @@ public class Delete implements Writable {
}
}
+ /**
+ * @param d Delete to clone.
+ */
+ public Delete(final Delete d) {
+ this.row = d.getRow();
+ this.ts = d.getTimeStamp();
+ this.lockId = d.getLockId();
+ this.familyMap.putAll(d.getFamilyMap());
+ }
+
/**
* Method to check if the familyMap is empty
* @return true if empty, false otherwise
diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java b/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
index b4c51c2fbba..eb7609cec6d 100644
--- a/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
+++ b/src/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
@@ -20,38 +20,60 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.OutputFormat;
/**
- * Convenience class that simply writes each key, record pair to the configured
- * HBase table.
+ * Convenience class that simply writes all values (which must be
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} instances)
+ * passed to it out to the configured HBase table. This works in combination
+ * with {@link TableOutputFormat} which actually does the writing to HBase.
+ *
+ * Keys are passed along but ignored in TableOutputFormat. However, they can
+ * be used to control how your values will be divided up amongst the specified
+ * number of reducers.
+ *
+ * You can also use the {@link TableMapReduceUtil} class to set up the two
+ * classes in one step:
+ *
+ * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job);
+ *
+ * This will also set the proper {@link TableOutputFormat} which is given the
+ * table
parameter. The
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} define the
+ * row and columns implicitly.
*/
public class IdentityTableReducer
-extends TableReducer {
+extends TableReducer {
@SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
/**
- * Writes each given record, consisting of the key and the given values, to
- * the HBase table.
+ * Writes each given record, consisting of the row key and the given values,
+ * to the configured {@link OutputFormat}. It is emitting the row key and each
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
*
- * @param key The current row key.
- * @param values The values for the given row.
+ * @param key The current row key.
+ * @param values The {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given
+ * row.
* @param context The context of the reduce.
* @throws IOException When writing the record fails.
* @throws InterruptedException When the job gets interrupted.
*/
- public void reduce(ImmutableBytesWritable key, Iterator values,
+ @Override
+ public void reduce(Writable key, Iterable values,
Context context) throws IOException, InterruptedException {
- while(values.hasNext()) {
- context.write(key, values.next());
+ for(Writable putOrDelete : values) {
+ context.write(key, putOrDelete);
}
}
-
+
}
diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index b21545c65c9..b31b0c7fac3 100644
--- a/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -24,29 +24,36 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
- * Convert Map/Reduce output and write it to an HBase table.
+ * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
+ * while the output value must be either a {@link Put} or a
+ * {@link Delete} instance.
+ *
+ * @param The type of the key. Ignored in this class.
*/
-public class TableOutputFormat extends
- FileOutputFormat {
+public class TableOutputFormat extends OutputFormat {
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
/** Job parameter that specifies the output table. */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
/**
- * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
- * and write to an HBase table
+ * Writes the reducer output to an HBase table.
+ *
+ * @param The type of the key.
*/
- protected static class TableRecordWriter
- extends RecordWriter {
+ protected static class TableRecordWriter
+ extends RecordWriter {
/** The table to write to. */
private HTable table;
@@ -82,9 +89,11 @@ public class TableOutputFormat extends
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
*/
@Override
- public void write(ImmutableBytesWritable key, Put value)
+ public void write(KEY key, Writable value)
throws IOException {
- table.put(new Put(value));
+ if (value instanceof Put) this.table.put(new Put((Put)value));
+ else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
+ else throw new IOException("Pass a Delete or a Put");
}
}
@@ -97,7 +106,7 @@ public class TableOutputFormat extends
* @throws InterruptedException When the jobs is cancelled.
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
- public RecordWriter getRecordWriter(
+ public RecordWriter getRecordWriter(
TaskAttemptContext context)
throws IOException, InterruptedException {
// expecting exactly one path
@@ -111,7 +120,37 @@ public class TableOutputFormat extends
throw e;
}
table.setAutoFlush(false);
- return new TableRecordWriter(table);
+ return new TableRecordWriter(table);
+ }
+
+ /**
+ * Checks if the output target exists.
+ *
+ * @param context The current context.
+ * @throws IOException When the check fails.
+ * @throws InterruptedException When the job is aborted.
+ * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
+ */
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException,
+ InterruptedException {
+ // TODO Check if the table exists?
+
+ }
+
+ /**
+ * Returns the output committer.
+ *
+ * @param context The current context.
+ * @return The committer.
+ * @throws IOException When creating the committer fails.
+ * @throws InterruptedException When the job is aborted.
+ * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TableOutputCommitter();
}
}
diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
index fcfc14498a5..64540ac73c3 100644
--- a/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
+++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java
@@ -19,19 +19,26 @@
*/
package org.apache.hadoop.hbase.mapreduce;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Extends the basic Reducer
class to add the required key and
- * value output classes.
+ * value input/output classes. While the input key and value as well as the
+ * output key can be anything handed in from the previous map phase the output
+ * value must be either a {@link org.apache.hadoop.hbase.client.Put Put}
+ * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
+ * using the {@link TableOutputFormat} class.
+ *
+ * This class is extended by {@link IdentityTableReducer} but can also be
+ * subclassed to implement similar features or any custom code needed. It has
+ * the advantage to enforce the output value to a specific basic type.
*
- * @param The type of the key.
- * @param The type of the value.
+ * @param The type of the input key.
+ * @param The type of the input value.
+ * @param The type of the output key.
* @see org.apache.hadoop.mapreduce.Reducer
*/
-public abstract class TableReducer
-extends Reducer {
-
+public abstract class TableReducer
+extends Reducer {
}
\ No newline at end of file