HBASE-12610 Close connection in TableInputFormatBase (Solomon Duskis)

This commit is contained in:
stack 2014-12-02 09:28:02 -08:00
parent 5cc0714840
commit 7b10847ec8
1 changed files with 47 additions and 7 deletions

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -109,6 +110,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
private HashMap<InetAddress, String> reverseDNSCacheMap =
new HashMap<InetAddress, String>();
private Connection connection;
/**
* Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
* the default.
@ -132,19 +135,55 @@ extends InputFormat<ImmutableBytesWritable, Result> {
}
TableSplit tSplit = (TableSplit) split;
LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
TableRecordReader trr = this.tableRecordReader;
// if no table record reader was provided use default
if (trr == null) {
trr = new TableRecordReader();
}
final TableRecordReader trr =
this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader();
Scan sc = new Scan(this.scan);
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
trr.setTable(table);
return trr;
return new RecordReader<ImmutableBytesWritable, Result>() {
@Override
public void close() throws IOException {
trr.close();
close(admin, table, regionLocator, connection);
}
private void close(Closeable... closables) throws IOException {
for (Closeable c : closables) {
if(c != null) { c.close(); }
}
}
@Override
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
return trr.getCurrentKey();
}
@Override
public Result getCurrentValue() throws IOException, InterruptedException {
return trr.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return trr.getProgress();
}
@Override
public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException,
InterruptedException {
trr.initialize(inputsplit, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return trr.nextKeyValue();
}
};
}
protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
return regionLocator.getStartEndKeys();
}
@ -331,6 +370,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
this.table = connection.getTable(tableName);
this.regionLocator = connection.getRegionLocator(tableName);
this.admin = connection.getAdmin();
this.connection = connection;
}
/**