HBASE-12423 Use a non-managed Table in TableOutputFormat (Solomon Duskis)

This commit is contained in:
stack 2014-11-04 16:25:16 -08:00
parent 7442e5bd62
commit 3587fe8324
2 changed files with 16 additions and 23 deletions

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
@ -46,7 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* while the output value <u>must</u> be either a {@link Put} or a
* {@link Delete} instance.
*
* @param <KEY> The type of the key. Ignored in this class.
* <p><KEY> is the type of the key. Ignored in this class.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@ -79,28 +81,17 @@ implements Configurable {
/** The configuration. */
private Configuration conf = null;
private HTable table;
private Table table;
private Connection connection;
/**
* Writes the reducer output to an HBase table.
*
* @param <KEY> The type of the key.
*/
protected static class TableRecordWriter<KEY>
protected class TableRecordWriter
extends RecordWriter<KEY, Mutation> {
/** The table to write to. */
private Table table;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
*
* @param table The table to write to.
*/
public TableRecordWriter(Table table) {
this.table = table;
}
/**
* Closes the writer, in this case flush table commits.
*
@ -112,6 +103,7 @@ implements Configurable {
public void close(TaskAttemptContext context)
throws IOException {
table.close();
connection.close();
}
/**
@ -125,8 +117,8 @@ implements Configurable {
@Override
public void write(KEY key, Mutation value)
throws IOException {
if (value instanceof Put) this.table.put(new Put((Put)value));
else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
if (value instanceof Put) table.put(new Put((Put)value));
else if (value instanceof Delete) table.delete(new Delete((Delete)value));
else throw new IOException("Pass a Delete or a Put");
}
}
@ -144,7 +136,7 @@ implements Configurable {
public RecordWriter<KEY, Mutation> getRecordWriter(
TaskAttemptContext context)
throws IOException, InterruptedException {
return new TableRecordWriter<KEY>(this.table);
return new TableRecordWriter();
}
/**
@ -205,8 +197,9 @@ implements Configurable {
if (zkClientPort != 0) {
this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
}
this.table = new HTable(this.conf, TableName.valueOf(tableName));
this.table.setAutoFlush(false, true);
this.connection = ConnectionFactory.createConnection(this.conf);
this.table = connection.getTable(TableName.valueOf(tableName));
((HTable) this.table).setAutoFlush(false, true);
LOG.info("Created table instance for " + tableName);
} catch(IOException e) {
LOG.error(e);

View File

@ -139,12 +139,12 @@ public interface RegionServerServices
* @return all the online tables in this RS
*/
Set<TableName> getOnlineTables();
/**
* Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to be
* available for handling
* @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
* @param service the {@code Service} subclass instance to expose as a coprocessor endpoint
* @return {@code true} if the registration was successful, {@code false}
*/
boolean registerService(Service service);