diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java index f4a3eb11b03..83dca4bc340 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -107,11 +107,10 @@ public abstract class MultiTableInputFormatBase extends sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); trr.setTable(table); + trr.setConnection(connection); } catch (IOException ioe) { // If there is an exception make sure that all // resources are closed and released. - connection.close(); - table.close(); trr.close(); throw ioe; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java index f46f1e32a04..9ff90e78e8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -40,6 +41,7 @@ public class TableRecordReader extends RecordReader { private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl(); + private Connection connection = null; /** * Restart from survivable exceptions by creating a new scanner. @@ -85,8 +87,10 @@ extends RecordReader { * @see org.apache.hadoop.mapreduce.RecordReader#close() */ @Override - public void close() { + public void close() throws IOException { this.recordReaderImpl.close(); + if (this.connection != null) + this.connection.close(); } /** @@ -157,4 +161,8 @@ extends RecordReader { public float getProgress() { return this.recordReaderImpl.getProgress(); } + + public void setConnection(Connection connection) { + this.connection = connection; + } }