From 3587fe8324a94351fa2f518d3c4bcb15d33d8ab0 Mon Sep 17 00:00:00 2001 From: stack Date: Tue, 4 Nov 2014 16:25:16 -0800 Subject: [PATCH] HBASE-12423 Use a non-managed Table in TableOutputFormat (Solomon Duskis) --- .../hbase/mapreduce/TableOutputFormat.java | 33 ++++++++----------- .../regionserver/RegionServerServices.java | 6 ++-- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 52b8e450a24..da40b2ecdf5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -46,7 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; * while the output value must be either a {@link Put} or a * {@link Delete} instance. * - * @param The type of the key. Ignored in this class. + *

is the type of the key. Ignored in this class. */ @InterfaceAudience.Public @InterfaceStability.Stable @@ -79,28 +81,17 @@ implements Configurable { /** The configuration. */ private Configuration conf = null; - private HTable table; + private Table table; + private Connection connection; /** * Writes the reducer output to an HBase table. * * @param The type of the key. */ - protected static class TableRecordWriter + protected class TableRecordWriter extends RecordWriter { - /** The table to write to. */ - private Table table; - - /** - * Instantiate a TableRecordWriter with the HBase HClient for writing. - * - * @param table The table to write to. - */ - public TableRecordWriter(Table table) { - this.table = table; - } - /** * Closes the writer, in this case flush table commits. * @@ -112,6 +103,7 @@ implements Configurable { public void close(TaskAttemptContext context) throws IOException { table.close(); + connection.close(); } /** @@ -125,8 +117,8 @@ implements Configurable { @Override public void write(KEY key, Mutation value) throws IOException { - if (value instanceof Put) this.table.put(new Put((Put)value)); - else if (value instanceof Delete) this.table.delete(new Delete((Delete)value)); + if (value instanceof Put) table.put(new Put((Put)value)); + else if (value instanceof Delete) table.delete(new Delete((Delete)value)); else throw new IOException("Pass a Delete or a Put"); } } @@ -144,7 +136,7 @@ implements Configurable { public RecordWriter getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { - return new TableRecordWriter(this.table); + return new TableRecordWriter(); } /** @@ -205,8 +197,9 @@ implements Configurable { if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } - this.table = new HTable(this.conf, TableName.valueOf(tableName)); - this.table.setAutoFlush(false, true); + this.connection = ConnectionFactory.createConnection(this.conf); + this.table = connection.getTable(TableName.valueOf(tableName)); + ((HTable) this.table).setAutoFlush(false, true); LOG.info("Created table instance for " + tableName); } catch(IOException e) { LOG.error(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 67c499313c8..f02b8baa8fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -139,12 +139,12 @@ public interface RegionServerServices * @return all the online tables in this RS */ Set getOnlineTables(); - - + + /** * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to be * available for handling - * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint + * @param service the {@code Service} subclass instance to expose as a coprocessor endpoint * @return {@code true} if the registration was successful, {@code false} */ boolean registerService(Service service);