HBASE-16117 Fix Connection leak in mapred.TableOutputFormat

This commit is contained in:
Jonathan M Hsieh 2016-06-26 17:05:49 -07:00
parent 42106b89b1
commit e1d130946b
7 changed files with 148 additions and 41 deletions

View File

@ -436,7 +436,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
protected String clusterId = null;
protected void retrieveClusterId() {
protected void retrieveClusterId() throws IOException {
if (clusterId != null) {
return;
}

View File

@ -2077,16 +2077,17 @@ public class HBaseAdmin implements Admin {
// We set it to make it fail as soon as possible if HBase is not available
copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
copyOfConf.setInt("zookeeper.recovery.retry", 0);
try (ClusterConnection connection =
(ClusterConnection)ConnectionFactory.createConnection(copyOfConf)) {
// Check ZK first.
// If the connection exists, we may have a connection to ZK that does not work anymore
ZooKeeperKeepAliveConnection zkw = null;
try {
try (ClusterConnection connection =
(ClusterConnection) ConnectionFactory.createConnection(copyOfConf);
ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection).
getKeepAliveZooKeeperWatcher();) {
// This is NASTY. FIX!!!! Dependent on internal implementation! TODO
zkw = ((ConnectionImplementation)connection).
getKeepAliveZooKeeperWatcher();
zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
connection.isMasterRunning();
} catch (IOException e) {
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
} catch (InterruptedException e) {
@ -2094,12 +2095,6 @@ public class HBaseAdmin implements Admin {
new InterruptedIOException("Can't connect to ZooKeeper").initCause(e);
} catch (KeeperException e) {
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
} finally {
if (zkw != null) {
zkw.close();
}
}
connection.isMasterRunning();
}
}

View File

@ -32,7 +32,7 @@ interface Registry {
/**
* @param connection
*/
void init(Connection connection);
void init(Connection connection) throws IOException;
/**
* @return Meta region location
@ -43,7 +43,7 @@ interface Registry {
/**
* @return Cluster id.
*/
String getClusterId();
String getClusterId() throws IOException;
/**
* @return Count of 'running' regionservers

View File

@ -92,7 +92,7 @@ class ZooKeeperRegistry implements Registry {
private String clusterId = null;
@Override
public String getClusterId() {
public String getClusterId() throws IOException {
if (this.clusterId != null) return this.clusterId;
// No synchronized here, worse case we will retrieve it twice, that's
// not an issue.
@ -105,8 +105,10 @@ class ZooKeeperRegistry implements Registry {
}
} catch (KeeperException e) {
LOG.warn("Can't retrieve clusterId from ZooKeeper", e);
throw new IOException("ZooKeeperException ", e);
} catch (IOException e) {
LOG.warn("Can't retrieve clusterId from ZooKeeper", e);
throw e;
} finally {
if (zkw != null) zkw.close();
}

View File

@ -53,27 +53,31 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
*/
protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
private BufferedMutator m_mutator;
private Connection connection;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the
* lifecycle of {@code conn}.
*/
public TableRecordWriter(final BufferedMutator mutator) throws IOException {
this.m_mutator = mutator;
}
private Connection conn;
/**
* Instantiate a TableRecordWriter with a BufferedMutator for batch writing.
*/
public TableRecordWriter(JobConf job) throws IOException {
// expecting exactly one path
TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
connection = ConnectionFactory.createConnection(job);
m_mutator = connection.getBufferedMutator(tableName);
try {
this.conn = ConnectionFactory.createConnection(job);
this.m_mutator = conn.getBufferedMutator(tableName);
} finally {
if (this.m_mutator == null) {
conn.close();
conn = null;
}
}
}
public void close(Reporter reporter) throws IOException {
if (this.m_mutator != null) {
this.m_mutator.close();
if (connection != null) {
connection.close();
connection = null;
}
if (conn != null) {
this.conn.close();
}
}
@ -101,6 +105,7 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
Progressable progress)
throws IOException {
// Clear write buffer on fail is true by default so no need to reset it.
return new TableRecordWriter(job);
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
@ -106,9 +107,9 @@ public class TestClientTimeouts {
// run some admin commands
HBaseAdmin.checkHBaseAvailable(conf);
admin.setBalancerRunning(false, false);
} catch (MasterNotRunningException ex) {
} catch (ZooKeeperConnectionException ex) {
// Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
// a MasterNotRunningException. It's a bug if we get other exceptions.
// a ZooKeeperConnectionException. It's a bug if we get other exceptions.
lastFailed = true;
} finally {
if(admin != null) {

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import static org.junit.Assert.fail;
/**
* Spark creates many instances of TableOutputFormat within a single process. We need to make
* sure we can have many instances and not leak connections.
*
* This test creates a few TableOutputFormats and shouldn't fail due to ZK connection exhaustion.
*/
@Category(MediumTests.class)
public class TestTableOutputFormatConnectionExhaust {
private static final Log LOG =
LogFactory.getLog(TestTableOutputFormatConnectionExhaust.class);
private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
static final String TABLE = "TestTableOutputFormatConnectionExhaust";
static final String FAMILY = "family";
@BeforeClass
public static void beforeClass() throws Exception {
// Default in ZookeeperMiniCluster is 1000, setting artificially low to trigger exhaustion.
// need min of 7 to properly start the default mini HBase cluster
UTIL.getConfiguration().setInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 10);
UTIL.startMiniCluster();
}
@AfterClass
public static void afterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Before
public void before() throws IOException {
LOG.info("before");
UTIL.ensureSomeRegionServersAvailable(1);
LOG.info("before done");
}
/**
* Open and close a TableOutputFormat. The closing the RecordWriter should release HBase
* Connection (ZK) resources, and will throw exception if they are exhausted.
*/
static void openCloseTableOutputFormat(int iter) throws IOException {
LOG.info("Instantiating TableOutputFormat connection " + iter);
JobConf conf = new JobConf();
conf.addResource(UTIL.getConfiguration());
conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE);
TableMapReduceUtil.initTableMapJob(TABLE, FAMILY, TableMap.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, conf);
TableOutputFormat tof = new TableOutputFormat();
RecordWriter rw = tof.getRecordWriter(null, conf, TABLE, null);
rw.close(null);
}
@Test
public void testConnectionExhaustion() throws IOException {
int MAX_INSTANCES = 5; // fails on iteration 3 if zk connections leak
for (int i = 0; i < MAX_INSTANCES; i++) {
final int iter = i;
try {
openCloseTableOutputFormat(iter);
} catch (Exception e) {
LOG.error("Exception encountered", e);
fail("Failed on iteration " + i);
}
}
}
}