HBASE-12459 Use a non-managed Table in mapred.TableOutputFormat
This commit is contained in:
parent
9ea80f8f11
commit
1276af23d5
|
@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
|
@ -44,8 +46,7 @@ import org.apache.hadoop.util.Progressable;
|
|||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public class TableOutputFormat extends
|
||||
FileOutputFormat<ImmutableBytesWritable, Put> {
|
||||
public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
|
||||
|
||||
/** JobConf parameter that specifies the output table */
|
||||
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
|
||||
|
@ -53,55 +54,49 @@ FileOutputFormat<ImmutableBytesWritable, Put> {
|
|||
|
||||
/**
|
||||
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
|
||||
* and write to an HBase table
|
||||
* and write to an HBase table.
|
||||
*/
|
||||
protected static class TableRecordWriter
|
||||
implements RecordWriter<ImmutableBytesWritable, Put> {
|
||||
private Table m_table;
|
||||
protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
|
||||
private final Connection conn;
|
||||
private final Table table;
|
||||
|
||||
/**
|
||||
* Instantiate a TableRecordWriter with the HBase HClient for writing.
|
||||
*
|
||||
* @param table
|
||||
* Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the
|
||||
* lifecycle of {@code conn}.
|
||||
*/
|
||||
public TableRecordWriter(Table table) {
|
||||
m_table = table;
|
||||
public TableRecordWriter(Connection conn, TableName tableName) throws IOException {
|
||||
this.conn = conn;
|
||||
this.table = conn.getTable(tableName);
|
||||
((HTable) this.table).setAutoFlush(false, true);
|
||||
}
|
||||
|
||||
public void close(Reporter reporter)
|
||||
throws IOException {
|
||||
m_table.close();
|
||||
public void close(Reporter reporter) throws IOException {
|
||||
table.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
public void write(ImmutableBytesWritable key,
|
||||
Put value) throws IOException {
|
||||
m_table.put(new Put(value));
|
||||
public void write(ImmutableBytesWritable key, Put value) throws IOException {
|
||||
table.put(new Put(value));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public RecordWriter getRecordWriter(FileSystem ignored,
|
||||
JobConf job, String name, Progressable progress) throws IOException {
|
||||
|
||||
// expecting exactly one path
|
||||
|
||||
public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(FileSystem ignored, JobConf job,
|
||||
String name, Progressable progress) throws IOException {
|
||||
TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
|
||||
HTable table = null;
|
||||
Connection conn = null;
|
||||
try {
|
||||
table = new HTable(HBaseConfiguration.create(job), tableName);
|
||||
conn = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
|
||||
} catch(IOException e) {
|
||||
LOG.error(e);
|
||||
throw e;
|
||||
}
|
||||
table.setAutoFlush(false, true);
|
||||
return new TableRecordWriter(table);
|
||||
return new TableRecordWriter(conn, tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkOutputSpecs(FileSystem ignored, JobConf job)
|
||||
throws FileAlreadyExistsException, InvalidJobConfException, IOException {
|
||||
|
||||
String tableName = job.get(OUTPUT_TABLE);
|
||||
if(tableName == null) {
|
||||
throw new IOException("Must specify table name");
|
||||
|
|
Loading…
Reference in New Issue