HBASE-12459 Use a non-managed Table in mapred.TableOutputFormat

This commit is contained in:
Nick Dimiduk 2014-11-17 08:36:08 +01:00
parent dcba045526
commit f38efa79e6
1 changed files with 24 additions and 29 deletions

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -44,8 +46,7 @@ import org.apache.hadoop.util.Progressable;
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class TableOutputFormat extends public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
FileOutputFormat<ImmutableBytesWritable, Put> {
/** JobConf parameter that specifies the output table */ /** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
@ -53,55 +54,49 @@ FileOutputFormat<ImmutableBytesWritable, Put> {
/** /**
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
* and write to an HBase table * and write to an HBase table.
*/ */
protected static class TableRecordWriter protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
implements RecordWriter<ImmutableBytesWritable, Put> { private final Connection conn;
private Table m_table; private final Table table;
/** /**
* Instantiate a TableRecordWriter with the HBase HClient for writing. * Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the
* * lifecycle of {@code conn}.
* @param table
*/ */
public TableRecordWriter(Table table) { public TableRecordWriter(Connection conn, TableName tableName) throws IOException {
m_table = table; this.conn = conn;
this.table = conn.getTable(tableName);
((HTable) this.table).setAutoFlush(false, true);
} }
public void close(Reporter reporter) public void close(Reporter reporter) throws IOException {
throws IOException { table.close();
m_table.close(); conn.close();
} }
public void write(ImmutableBytesWritable key, public void write(ImmutableBytesWritable key, Put value) throws IOException {
Put value) throws IOException { table.put(new Put(value));
m_table.put(new Put(value));
} }
} }
@Override @Override
@SuppressWarnings("unchecked") public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(FileSystem ignored, JobConf job,
public RecordWriter getRecordWriter(FileSystem ignored, String name, Progressable progress) throws IOException {
JobConf job, String name, Progressable progress) throws IOException {
// expecting exactly one path
TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
HTable table = null; Connection conn = null;
try { try {
table = new HTable(HBaseConfiguration.create(job), tableName); conn = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
} catch(IOException e) { } catch(IOException e) {
LOG.error(e); LOG.error(e);
throw e; throw e;
} }
table.setAutoFlush(false, true); return new TableRecordWriter(conn, tableName);
return new TableRecordWriter(table);
} }
@Override @Override
public void checkOutputSpecs(FileSystem ignored, JobConf job) public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException, InvalidJobConfException, IOException { throws FileAlreadyExistsException, InvalidJobConfException, IOException {
String tableName = job.get(OUTPUT_TABLE); String tableName = job.get(OUTPUT_TABLE);
if(tableName == null) { if(tableName == null) {
throw new IOException("Must specify table name"); throw new IOException("Must specify table name");