HBASE-1626 Allow emitting Deletes out of new TableReducer
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@792388 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b015cc7fc1
commit
ff272e45ad
|
@ -450,6 +450,8 @@ Release 0.20.0 - Unreleased
|
||||||
HBASE-1620 Need to use special StoreScanner constructor for major compactions
|
HBASE-1620 Need to use special StoreScanner constructor for major compactions
|
||||||
(passed sf, no caching, etc) (Jon Gray via Stack)
|
(passed sf, no caching, etc) (Jon Gray via Stack)
|
||||||
HBASE-1624 Don't sort Puts if only one in list in HCM#processBatchOfRows
|
HBASE-1624 Don't sort Puts if only one in list in HCM#processBatchOfRows
|
||||||
|
HBASE-1626 Allow emitting Deletes out of new TableReducer
|
||||||
|
(Lars George via Stack)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HBASE-1412 Change values for delete column and column family in KeyValue
|
HBASE-1412 Change values for delete column and column family in KeyValue
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class Delete implements Writable {
|
||||||
|
|
||||||
/** Constructor for Writable. DO NOT USE */
|
/** Constructor for Writable. DO NOT USE */
|
||||||
public Delete() {
|
public Delete() {
|
||||||
this(null);
|
this((byte [])null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -107,6 +107,16 @@ public class Delete implements Writable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param d Delete to clone.
|
||||||
|
*/
|
||||||
|
public Delete(final Delete d) {
|
||||||
|
this.row = d.getRow();
|
||||||
|
this.ts = d.getTimeStamp();
|
||||||
|
this.lockId = d.getLockId();
|
||||||
|
this.familyMap.putAll(d.getFamilyMap());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method to check if the familyMap is empty
|
* Method to check if the familyMap is empty
|
||||||
* @return true if empty, false otherwise
|
* @return true if empty, false otherwise
|
||||||
|
|
|
@ -20,37 +20,59 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convenience class that simply writes each key, record pair to the configured
|
* Convenience class that simply writes all values (which must be
|
||||||
* HBase table.
|
* {@link org.apache.hadoop.hbase.client.Put Put} or
|
||||||
|
* {@link org.apache.hadoop.hbase.client.Delete Delete} instances)
|
||||||
|
* passed to it out to the configured HBase table. This works in combination
|
||||||
|
* with {@link TableOutputFormat} which actually does the writing to HBase.<p>
|
||||||
|
*
|
||||||
|
* Keys are passed along but ignored in TableOutputFormat. However, they can
|
||||||
|
* be used to control how your values will be divided up amongst the specified
|
||||||
|
* number of reducers. <p>
|
||||||
|
*
|
||||||
|
* You can also use the {@link TableMapReduceUtil} class to set up the two
|
||||||
|
* classes in one step:
|
||||||
|
* <blockquote><code>
|
||||||
|
* TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job);
|
||||||
|
* </code></blockquote>
|
||||||
|
* This will also set the proper {@link TableOutputFormat} which is given the
|
||||||
|
* <code>table</code> parameter. The
|
||||||
|
* {@link org.apache.hadoop.hbase.client.Put Put} or
|
||||||
|
* {@link org.apache.hadoop.hbase.client.Delete Delete} define the
|
||||||
|
* row and columns implicitly.
|
||||||
*/
|
*/
|
||||||
public class IdentityTableReducer
|
public class IdentityTableReducer
|
||||||
extends TableReducer<ImmutableBytesWritable, Put> {
|
extends TableReducer<Writable, Writable, Writable> {
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
|
private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes each given record, consisting of the key and the given values, to
|
* Writes each given record, consisting of the row key and the given values,
|
||||||
* the HBase table.
|
* to the configured {@link OutputFormat}. It is emitting the row key and each
|
||||||
|
* {@link org.apache.hadoop.hbase.client.Put Put} or
|
||||||
|
* {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
|
||||||
*
|
*
|
||||||
* @param key The current row key.
|
* @param key The current row key.
|
||||||
* @param values The values for the given row.
|
* @param values The {@link org.apache.hadoop.hbase.client.Put Put} or
|
||||||
|
* {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given
|
||||||
|
* row.
|
||||||
* @param context The context of the reduce.
|
* @param context The context of the reduce.
|
||||||
* @throws IOException When writing the record fails.
|
* @throws IOException When writing the record fails.
|
||||||
* @throws InterruptedException When the job gets interrupted.
|
* @throws InterruptedException When the job gets interrupted.
|
||||||
*/
|
*/
|
||||||
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
|
@Override
|
||||||
|
public void reduce(Writable key, Iterable<Writable> values,
|
||||||
Context context) throws IOException, InterruptedException {
|
Context context) throws IOException, InterruptedException {
|
||||||
while(values.hasNext()) {
|
for(Writable putOrDelete : values) {
|
||||||
context.write(key, values.next());
|
context.write(key, putOrDelete);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,29 +24,36 @@ import java.io.IOException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
|
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert Map/Reduce output and write it to an HBase table.
|
* Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
|
||||||
|
* while the output value <u>must</u> be either a {@link Put} or a
|
||||||
|
* {@link Delete} instance.
|
||||||
|
*
|
||||||
|
* @param <KEY> The type of the key. Ignored in this class.
|
||||||
*/
|
*/
|
||||||
public class TableOutputFormat extends
|
public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable> {
|
||||||
FileOutputFormat<ImmutableBytesWritable, Put> {
|
|
||||||
|
|
||||||
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
|
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
|
||||||
/** Job parameter that specifies the output table. */
|
/** Job parameter that specifies the output table. */
|
||||||
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
|
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
|
* Writes the reducer output to an HBase table.
|
||||||
* and write to an HBase table
|
*
|
||||||
|
* @param <KEY> The type of the key.
|
||||||
*/
|
*/
|
||||||
protected static class TableRecordWriter
|
protected static class TableRecordWriter<KEY>
|
||||||
extends RecordWriter<ImmutableBytesWritable, Put> {
|
extends RecordWriter<KEY, Writable> {
|
||||||
|
|
||||||
/** The table to write to. */
|
/** The table to write to. */
|
||||||
private HTable table;
|
private HTable table;
|
||||||
|
@ -82,9 +89,11 @@ public class TableOutputFormat extends
|
||||||
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
|
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void write(ImmutableBytesWritable key, Put value)
|
public void write(KEY key, Writable value)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
table.put(new Put(value));
|
if (value instanceof Put) this.table.put(new Put((Put)value));
|
||||||
|
else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
|
||||||
|
else throw new IOException("Pass a Delete or a Put");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +106,7 @@ public class TableOutputFormat extends
|
||||||
* @throws InterruptedException When the jobs is cancelled.
|
* @throws InterruptedException When the jobs is cancelled.
|
||||||
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
|
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
|
||||||
*/
|
*/
|
||||||
public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(
|
public RecordWriter<KEY, Writable> getRecordWriter(
|
||||||
TaskAttemptContext context)
|
TaskAttemptContext context)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
// expecting exactly one path
|
// expecting exactly one path
|
||||||
|
@ -111,7 +120,37 @@ public class TableOutputFormat extends
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
table.setAutoFlush(false);
|
table.setAutoFlush(false);
|
||||||
return new TableRecordWriter(table);
|
return new TableRecordWriter<KEY>(table);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the output target exists.
|
||||||
|
*
|
||||||
|
* @param context The current context.
|
||||||
|
* @throws IOException When the check fails.
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
* @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void checkOutputSpecs(JobContext context) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
// TODO Check if the table exists?
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the output committer.
|
||||||
|
*
|
||||||
|
* @param context The current context.
|
||||||
|
* @return The committer.
|
||||||
|
* @throws IOException When creating the committer fails.
|
||||||
|
* @throws InterruptedException When the job is aborted.
|
||||||
|
* @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return new TableOutputCommitter();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,19 +19,26 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extends the basic <code>Reducer</code> class to add the required key and
|
* Extends the basic <code>Reducer</code> class to add the required key and
|
||||||
* value output classes.
|
* value input/output classes. While the input key and value as well as the
|
||||||
|
* output key can be anything handed in from the previous map phase the output
|
||||||
|
* value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put}
|
||||||
|
* or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
|
||||||
|
* using the {@link TableOutputFormat} class.
|
||||||
|
* <p>
|
||||||
|
* This class is extended by {@link IdentityTableReducer} but can also be
|
||||||
|
* subclassed to implement similar features or any custom code needed. It has
|
||||||
|
* the advantage to enforce the output value to a specific basic type.
|
||||||
*
|
*
|
||||||
* @param <KEYIN> The type of the key.
|
* @param <KEYIN> The type of the input key.
|
||||||
* @param <VALUEIN> The type of the value.
|
* @param <VALUEIN> The type of the input value.
|
||||||
|
* @param <KEYOUT> The type of the output key.
|
||||||
* @see org.apache.hadoop.mapreduce.Reducer
|
* @see org.apache.hadoop.mapreduce.Reducer
|
||||||
*/
|
*/
|
||||||
public abstract class TableReducer<KEYIN, VALUEIN>
|
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
|
||||||
extends Reducer<KEYIN, VALUEIN, ImmutableBytesWritable, Put> {
|
extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue