HBASE-22963 Netty ByteBuf leak in rpc client implementation (#577)

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2019-09-08 21:54:09 +08:00
parent 18fbef2d0b
commit a14eea82b4
3 changed files with 51 additions and 4 deletions

View File

@ -450,6 +450,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
+ connection.remoteId); + connection.remoteId);
connections.removeValue(remoteId, connection); connections.removeValue(remoteId, connection);
connection.shutdown(); connection.shutdown();
connection.cleanupConnection();
} }
} }
} }

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -81,6 +82,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
/** /**
* This class is for testing HBaseConnectionManager features * This class is for testing HBaseConnectionManager features
@ -112,6 +115,7 @@ public class TestConnectionImplementation {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
ResourceLeakDetector.setLevel(Level.PARANOID);
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
// Up the handlers; this test needs more than usual. // Up the handlers; this test needs more than usual.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
@ -126,6 +130,11 @@ public class TestConnectionImplementation {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
@After
public void tearDown() throws IOException {
TEST_UTIL.getAdmin().balancerSwitch(true, true);
}
@Test @Test
public void testClusterConnection() throws IOException { public void testClusterConnection() throws IOException {
ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
@ -284,7 +293,7 @@ public class TestConnectionImplementation {
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt); TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
TEST_UTIL.createTable(tableName, FAM_NAM).close(); TEST_UTIL.createTable(tableName, FAM_NAM).close();
boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);
Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
// We want to work on a separate connection. // We want to work on a separate connection.
@ -349,9 +358,9 @@ public class TestConnectionImplementation {
RpcClient rpcClient = conn.getRpcClient(); RpcClient rpcClient = conn.getRpcClient();
LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
for (int i = 0; i < 5000; i++) { for (int i = 0; i < 500; i++) {
rpcClient.cancelConnections(sn); rpcClient.cancelConnections(sn);
Thread.sleep(5); Thread.sleep(50);
} }
step.compareAndSet(1, 2); step.compareAndSet(1, 2);
@ -366,7 +375,6 @@ public class TestConnectionImplementation {
table.close(); table.close();
connection.close(); connection.close();
Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null); Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
} }
/** /**
@ -1080,4 +1088,40 @@ public class TestConnectionImplementation {
TEST_UTIL.deleteTable(tableName); TEST_UTIL.deleteTable(tableName);
} }
} }
@Test
public void testMetaLookupThreadPoolCreated() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
TEST_UTIL.getAdmin().disableTable(tableName);
TEST_UTIL.getAdmin().deleteTable(tableName);
}
try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
byte[] row = Bytes.toBytes("test");
ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
// check that metalookup pool would get created
c.relocateRegion(tableName, row);
ExecutorService ex = c.getCurrentMetaLookupPool();
assertNotNull(ex);
}
}
// There is no assertion, but you need to confirm that there is no resource leak output from netty
@Test
public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(tableName, FAM_NAM).close();
TEST_UTIL.getAdmin().balancerSwitch(false, true);
try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
Table table = connection.getTable(tableName)) {
table.get(new Get(Bytes.toBytes("1")));
ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient();
rpcClient.cancelConnections(sn);
Thread.sleep(1000);
System.gc();
Thread.sleep(1000);
}
}
} }

View File

@ -1480,10 +1480,12 @@
<hbase-surefire.argLine>-enableassertions -Dhbase.build.id=${build.id} -Xmx${surefire.Xmx} <hbase-surefire.argLine>-enableassertions -Dhbase.build.id=${build.id} -Xmx${surefire.Xmx}
-Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
-Djava.awt.headless=true -Djdk.net.URLClassPath.disableClassPathURLCheck=true -Djava.awt.headless=true -Djdk.net.URLClassPath.disableClassPathURLCheck=true
-Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=paranoid
</hbase-surefire.argLine> </hbase-surefire.argLine>
<hbase-surefire.cygwin-argLine>-enableassertions -Xmx${surefire.cygwinXmx} <hbase-surefire.cygwin-argLine>-enableassertions -Xmx${surefire.cygwinXmx}
-Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
"-Djava.library.path=${hadoop.library.path};${java.library.path}" "-Djava.library.path=${hadoop.library.path};${java.library.path}"
-Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=paranoid
</hbase-surefire.cygwin-argLine> </hbase-surefire.cygwin-argLine>
<!-- Surefire argLine defaults to Linux, cygwin argLine is used in the os.windows profile --> <!-- Surefire argLine defaults to Linux, cygwin argLine is used in the os.windows profile -->
<argLine>${hbase-surefire.argLine}</argLine> <argLine>${hbase-surefire.argLine}</argLine>