HBASE-14394 Properly close the connection after reading records from table: addendum

This commit is contained in:
Jerry He 2015-09-29 19:48:02 -07:00
parent c04d18970e
commit e6905a1444
2 changed files with 45 additions and 17 deletions

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
@ -92,29 +91,65 @@ public abstract class MultiTableInputFormatBase extends
+ " previous error. Please look at the previous logs lines from" + " previous error. Please look at the previous logs lines from"
+ " the task's full log for more details."); + " the task's full log for more details.");
} }
Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
Table table = connection.getTable(tSplit.getTable()); Table table = connection.getTable(tSplit.getTable());
TableRecordReader trr = this.tableRecordReader; if (this.tableRecordReader == null) {
this.tableRecordReader = new TableRecordReader();
}
final TableRecordReader trr = this.tableRecordReader;
try { try {
// if no table record reader was provided use default
if (trr == null) {
trr = new TableRecordReader();
}
Scan sc = tSplit.getScan(); Scan sc = tSplit.getScan();
sc.setStartRow(tSplit.getStartRow()); sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow()); sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc); trr.setScan(sc);
trr.setTable(table); trr.setTable(table);
trr.setConnection(connection); return new RecordReader<ImmutableBytesWritable, Result>() {
@Override
public void close() throws IOException {
trr.close();
if (connection != null) {
connection.close();
}
}
@Override
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
return trr.getCurrentKey();
}
@Override
public Result getCurrentValue() throws IOException, InterruptedException {
return trr.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return trr.getProgress();
}
@Override
public void initialize(InputSplit inputsplit, TaskAttemptContext context)
throws IOException, InterruptedException {
trr.initialize(inputsplit, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return trr.nextKeyValue();
}
};
} catch (IOException ioe) { } catch (IOException ioe) {
// If there is an exception make sure that all // If there is an exception make sure that all
// resources are closed and released. // resources are closed and released.
trr.close(); trr.close();
if (connection != null) {
connection.close();
}
throw ioe; throw ioe;
} }
return trr;
} }
/** /**

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -41,7 +40,6 @@ public class TableRecordReader
extends RecordReader<ImmutableBytesWritable, Result> { extends RecordReader<ImmutableBytesWritable, Result> {
private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl(); private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
private Connection connection = null;
/** /**
* Restart from survivable exceptions by creating a new scanner. * Restart from survivable exceptions by creating a new scanner.
@ -87,10 +85,8 @@ extends RecordReader<ImmutableBytesWritable, Result> {
* @see org.apache.hadoop.mapreduce.RecordReader#close() * @see org.apache.hadoop.mapreduce.RecordReader#close()
*/ */
@Override @Override
public void close() throws IOException { public void close() {
this.recordReaderImpl.close(); this.recordReaderImpl.close();
if (this.connection != null)
this.connection.close();
} }
/** /**
@ -162,7 +158,4 @@ extends RecordReader<ImmutableBytesWritable, Result> {
return this.recordReaderImpl.getProgress(); return this.recordReaderImpl.getProgress();
} }
public void setConnection(Connection connection) {
this.connection = connection;
}
} }