HBASE-16017 HBase TableOutputFormat has connection leak in getRecordWriter (Zhan Zhang)
This commit is contained in:
parent
db234bf15d
commit
6c60bc9f6c
|
@ -53,7 +53,7 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
||||||
*/
|
*/
|
||||||
protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
|
protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
|
||||||
private BufferedMutator m_mutator;
|
private BufferedMutator m_mutator;
|
||||||
|
private Connection connection;
|
||||||
/**
|
/**
|
||||||
* Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the
|
* Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the
|
||||||
* lifecycle of {@code conn}.
|
* lifecycle of {@code conn}.
|
||||||
|
@ -62,8 +62,19 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
||||||
this.m_mutator = mutator;
|
this.m_mutator = mutator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TableRecordWriter(JobConf job) throws IOException {
|
||||||
|
// expecting exactly one path
|
||||||
|
TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
|
||||||
|
connection = ConnectionFactory.createConnection(job);
|
||||||
|
m_mutator = connection.getBufferedMutator(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
public void close(Reporter reporter) throws IOException {
|
public void close(Reporter reporter) throws IOException {
|
||||||
this.m_mutator.close();
|
this.m_mutator.close();
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
connection = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void write(ImmutableBytesWritable key, Put value) throws IOException {
|
public void write(ImmutableBytesWritable key, Put value) throws IOException {
|
||||||
|
@ -90,14 +101,7 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
||||||
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
|
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
|
||||||
Progressable progress)
|
Progressable progress)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// expecting exactly one path
|
return new TableRecordWriter(job);
|
||||||
TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
|
|
||||||
BufferedMutator mutator = null;
|
|
||||||
// Connection is not closed. Dies with JVM. No possibility for cleanup.
|
|
||||||
Connection connection = ConnectionFactory.createConnection(job);
|
|
||||||
mutator = connection.getBufferedMutator(tableName);
|
|
||||||
// Clear write buffer on fail is true by default so no need to reset it.
|
|
||||||
return new TableRecordWriter(mutator);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue