ARTEMIS-2408 Too many opened FDs after server stops

Wait netty event loop group shutdown to avoid too many opened FDs after
server stops, when netty configuration is used. Clear server
activateCallbacks to avoid reactivation of previous nodeManager and
consequent FD leaks on restart. Fix LargeServerMessageImpl.copy to avoid
FD leaks when a large message expiry or it is sent to DLA. Terminate
HawtDispatcher global queue to avoid pipes and eventpolls leaks after a
MQTT test.

cherry-picking commit 9617058ba0649af4eea15ce8793f86de827c4b7f
NO-JIRA adding check for open FD on the testsuite

cherry-picking commit 0facb7ddf4d3baa14a3add4290684aff7fd46053
NO-JIRA addressing connections leaks on integration tests
This commit is contained in:
brusdev 2019-07-16 15:41:29 +02:00 committed by Clebert Suconic
parent 1f0c35e1af
commit 29bbb94b5e
13 changed files with 446 additions and 38 deletions

View File

@ -16,11 +16,27 @@
*/
package org.apache.activemq.artemis.core.server;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public interface ActiveMQComponent {
void start() throws Exception;
void stop() throws Exception;
default Future<?> asyncStop() {
CompletableFuture<?> future = new CompletableFuture<>();
try {
stop();
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}
boolean isStarted();
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
/**
* Utility adapted from: org.apache.activemq.util.Wait
*/
public class Wait {
public static final long MAX_WAIT_MILLIS = 30 * 1000;
public static final int SLEEP_MILLIS = 1000;
public interface Condition {
boolean isSatisfied() throws Exception;
}
public interface LongCondition {
long getCount() throws Exception;
}
public interface IntCondition {
int getCount() throws Exception;
}
public static boolean waitFor(Condition condition) throws Exception {
return waitFor(condition, MAX_WAIT_MILLIS);
}
public static void assertEquals(long size, LongCondition condition) throws Exception {
assertEquals(size, condition, MAX_WAIT_MILLIS);
}
public static void assertEquals(long size, LongCondition condition, long timeout) throws Exception {
assertEquals(size, condition, timeout, SLEEP_MILLIS);
}
public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis) throws Exception {
boolean result = waitFor(() -> condition.getCount() == size, timeout, sleepMillis);
if (!result) {
Assert.fail(size + " != " + condition.getCount());
}
}
public static void assertEquals(int size, IntCondition condition) throws Exception {
assertEquals(size, condition, MAX_WAIT_MILLIS);
}
public static void assertEquals(int size, IntCondition condition, long timeout) throws Exception {
assertEquals(size, condition, timeout, SLEEP_MILLIS);
}
public static void assertEquals(int size, IntCondition condition, long timeout, long sleepMillis) throws Exception {
boolean result = waitFor(() -> condition.getCount() == size, timeout, sleepMillis);
if (!result) {
Assert.fail(size + " != " + condition.getCount());
}
}
public static void assertTrue(Condition condition) {
assertTrue("Condition wasn't met", condition);
}
public static void assertFalse(Condition condition) throws Exception {
assertTrue(() -> !condition.isSatisfied());
}
public static void assertFalse(String failureMessage, Condition condition) {
assertTrue(failureMessage, () -> !condition.isSatisfied());
}
public static void assertTrue(String failureMessage, Condition condition) {
assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
}
public static void assertTrue(String failureMessage, Condition condition, final long duration) {
assertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
}
public static void assertTrue(Condition condition, final long duration, final long sleep) throws Exception {
assertTrue("condition not met", condition, duration, sleep);
}
public static void assertTrue(String failureMessage, Condition condition, final long duration, final long sleep) {
boolean result = waitFor(condition, duration, sleep);
if (!result) {
Assert.fail(failureMessage);
}
}
public static boolean waitFor(final Condition condition, final long duration) throws Exception {
return waitFor(condition, duration, SLEEP_MILLIS);
}
public static boolean waitFor(final Condition condition,
final long durationMillis,
final long sleepMillis) {
try {
final long expiry = System.currentTimeMillis() + durationMillis;
boolean conditionSatisified = condition.isSatisfied();
while (!conditionSatisified && System.currentTimeMillis() < expiry) {
if (sleepMillis == 0) {
Thread.yield();
} else {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
}
conditionSatisified = condition.isSatisfied();
}
return conditionSatisified;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}

View File

@ -376,7 +376,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
long oldPosition = file.position();
file.open();
if (!file.isOpen()) {
file.open();
}
file.position(0);
for (;;) {

View File

@ -37,6 +37,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -221,6 +222,8 @@ public class NettyAcceptor extends AbstractAcceptor {
final Executor failureExecutor;
private Future<?> asyncStopFuture = null;
public NettyAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@ -645,6 +648,13 @@ public class NettyAcceptor extends AbstractAcceptor {
return this.configuration;
}
@Override
public java.util.concurrent.Future<?> asyncStop() {
stop();
return asyncStopFuture;
}
@Override
public synchronized void stop() {
if (channelClazz == null) {
@ -685,7 +695,7 @@ public class NettyAcceptor extends AbstractAcceptor {
// Shutdown the EventLoopGroup if no new task was added for 100ms or if
// 3000ms elapsed.
eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
asyncStopFuture = eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
eventLoopGroup = null;
channelClazz = null;

View File

@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -407,15 +408,27 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
conn.disconnect(criticalError);
}
Map<Acceptor, Future<?>> acceptorFutures = new HashMap<>();
for (Acceptor acceptor : acceptors.values()) {
try {
acceptor.stop();
acceptorFutures.put(acceptor, acceptor.asyncStop());
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
}
}
for (Map.Entry<Acceptor, Future<?>> acceptorFuture : acceptorFutures.entrySet()) {
if (acceptorFuture.getValue() != null) {
try {
acceptorFuture.getValue().get();
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptorFuture.getKey().getName());
}
}
}
acceptors.clear();
acceptorFutures.clear();
connections.clear();
connectionCountLatch.setCount(0);

View File

@ -1269,6 +1269,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.analyzer = null;
}
activateCallbacks.clear();
if (identity != null) {
ActiveMQServerLogger.LOGGER.serverStopped("identity=" + identity + ",version=" + getVersion().getFullVersion(), tempNodeID, getUptime());
} else {

View File

@ -171,6 +171,9 @@ public abstract class ActiveMQTestBase extends Assert {
@ClassRule
public static ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
@Rule
public NoProcessFilesBehind noProcessFilesBehind = new NoProcessFilesBehind(-1, 1000);
/** We should not under any circunstance create data outside of ./target
* if you have a test failing because because of this rule for any reason,
* even if you use afterClass events, move the test to ./target and always cleanup after
@ -276,6 +279,7 @@ public abstract class ActiveMQTestBase extends Assert {
@After
public void tearDown() throws Exception {
noProcessFilesBehind.tearDown();
closeAllSessionFactories();
closeAllServerLocatorsFactories();

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.util;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.sun.management.UnixOperatingSystemMXBean;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
/**
* This is useful to make sure you won't have leaking threads between tests
*/
public class NoProcessFilesBehind extends TestWatcher {
private static Logger log = Logger.getLogger(NoProcessFilesBehind.class);
/**
* -1 on maxVariance means no check
*/
public NoProcessFilesBehind(int maxVariance, long maxFiles) {
this.maxVariance = maxVariance;
this.maxFiles = maxFiles;
}
long fdBefore;
int maxVariance;
long maxFiles;
List<String> openFilesBefore;
List<String> openFilesAfter;
static OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
public static long getOpenFD() {
if (os instanceof UnixOperatingSystemMXBean) {
return ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount();
} else {
return 0;
}
}
@Override
protected void starting(Description description) {
LibaioContext.isLoaded();
fdBefore = getOpenFD();
openFilesBefore = getOpenFiles(true);
}
public static List<String> getOpenFiles(boolean filtered) {
ArrayList<String> openFiles = new ArrayList<>();
try {
String outputLine;
int processId = getProcessId();
Process child = Runtime.getRuntime().exec("lsof -a -p " + processId + " -d ^txt,^mem,^cwd,^rtd,^DEL", new String[] {});
try (BufferedReader processInput = new BufferedReader(new InputStreamReader(child.getInputStream()))) {
processInput.readLine();
while ((outputLine = processInput.readLine()) != null) {
if (!filtered || (!outputLine.endsWith(".jar") && !outputLine.endsWith(".so") && !outputLine.contains("type=STREAM")))
openFiles.add(outputLine);
}
}
} catch (Exception ignore) {
}
return openFiles;
}
private static List<String> getDiffFiles(List<String> xOpenFiles, List<String> yOpenFiles) {
ArrayList<String> diffOpenFiles = new ArrayList<>();
for (String xOpenFile : xOpenFiles) {
boolean found = false;
Iterator<String> yOpenFilesIterator = yOpenFiles.iterator();
String xOpenFileS = xOpenFile.replaceAll("\\s+", "\\\\s+");
while (yOpenFilesIterator.hasNext() && !found) {
String yOpenFileS = yOpenFilesIterator.next().replaceAll("\\s+", "\\\\s+");
found = yOpenFileS.equals(xOpenFileS);
}
if (!found) {
diffOpenFiles.add(xOpenFile);
}
}
return diffOpenFiles;
}
private static int getProcessId() throws ReflectiveOperationException {
java.lang.management.RuntimeMXBean runtime = java.lang.management.ManagementFactory.getRuntimeMXBean();
java.lang.reflect.Field jvmField = runtime.getClass().getDeclaredField("jvm");
jvmField.setAccessible(true);
sun.management.VMManagement jvm = (sun.management.VMManagement) jvmField.get(runtime);
java.lang.reflect.Method getProcessIdMethod = jvm.getClass().getDeclaredMethod("getProcessId");
getProcessIdMethod.setAccessible(true);
return (Integer) getProcessIdMethod.invoke(jvm);
}
public void tearDown() {
openFilesAfter = getOpenFiles(true);
}
@Override
protected void failed(Throwable e, Description description) {
}
@Override
protected void succeeded(Description description) {
}
List<String> getVariance() {
long fdAfter = getOpenFD();
long variance = fdAfter - fdBefore;
if (variance > 0) {
List<String> currOpenFiles = getOpenFiles(true);
List<String> diffOpenFiles = getDiffFiles(currOpenFiles, openFilesBefore);
List<String> skippingOpenFiles = getDiffFiles(currOpenFiles, openFilesAfter);
List<String> leavingOpenFiles = getDiffFiles(diffOpenFiles, skippingOpenFiles);
return leavingOpenFiles;
} else {
return new ArrayList<>();
}
}
/**
* Override to tear down your specific external resource.
*/
@Override
protected void finished(Description description) {
long fdAfter = getOpenFD();
List<String> variance = getVariance();
if (variance.size() > 0) {
log.warn("test " + description.toString() + " is leaving " + variance.size() + " files open with a total number of files open = " + fdAfter);
System.err.println("test " + description.toString() + " is leaving " + variance.size() + " files open with a total number of files open = " + fdAfter);
for (String openFile : variance) {
System.err.println(openFile);
}
}
if (maxVariance > 0) {
VarianceCondition varianceCondition = new VarianceCondition();
Wait.assertTrue("The test " + description.toString() + " is leaving " + varianceCondition.getVarianceSize() + " files open, which is more than " + maxVariance + " max open", varianceCondition, 5000, 0);
}
Wait.assertTrue("Too many open files", () -> getOpenFD() < maxFiles, 5000, 0);
}
class VarianceCondition implements Wait.Condition {
private List<String> variance = null;
public long getVarianceSize() {
if (variance != null) {
return variance.size();
} else {
return 0;
}
}
@Override
public boolean isSatisfied() throws Exception {
variance = getVariance();
return variance.size() < maxVariance;
}
}
}

View File

@ -437,51 +437,51 @@ public class ConsumerTest extends ActiveMQTestBase {
ConnectionFactory factorySend = createFactory(protocolSender);
ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer);
StringBuilder bufferLarge = new StringBuilder();
while (bufferLarge.length() < 100 * 1024) {
bufferLarge.append(" ");
}
final String bufferLargeContent = bufferLarge.toString();
Connection connection = factorySend.createConnection();
try (Connection connection = factorySend.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
try (MessageProducer producer = session.createProducer(queue)) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
StringBuffer bufferLarge = new StringBuffer();
while (bufferLarge.length() < 100 * 1024) {
bufferLarge.append(" ");
msg = session.createTextMessage(bufferLargeContent);
msg.setIntProperty("mycount", 1);
producer.send(msg);
}
}
msg = session.createTextMessage(bufferLarge.toString());
msg.setIntProperty("mycount", 1);
producer.send(msg);
connection = factoryConsume.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(QUEUE.toString());
}
try (Connection connection = factoryConsume.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
MessageConsumer consumer = session.createConsumer(queue);
try (MessageConsumer consumer = session.createConsumer(queue)) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(0, message.getIntProperty("mycount"));
Assert.assertEquals("hello", message.getText());
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(0, message.getIntProperty("mycount"));
Assert.assertEquals("hello", message.getText());
message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(1, message.getIntProperty("mycount"));
Assert.assertEquals(bufferLarge.toString(), message.getText());
message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(1, message.getIntProperty("mycount"));
Assert.assertEquals(bufferLargeContent, message.getText());
Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
} finally {
connection.close();
Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
}
}
}
}

View File

@ -248,6 +248,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
pd.send(sn0.createTextMessage("hello " + i));
}
cn0.close();
// Messages should stay in node 1 and note get redistributed.
assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName).getMessageCount());
assertEquals(0, servers[1].locateQueue(queueName).getMessageCount());
@ -371,6 +373,10 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
receiveMessages(connection[1], consumer[1], NUMBER_OF_MESSAGES / 2, true);
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
connection[node].close();
}
}
@Test
@ -451,6 +457,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
connection[1].close();
// this wil be after redistribution
receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
connection[0].close();
}

View File

@ -57,6 +57,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.fusesource.hawtdispatch.DispatchPriority;
import org.fusesource.hawtdispatch.internal.DispatcherConfig;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
@ -75,6 +77,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
static {
DispatcherConfig.getDefaultDispatcher().getThreadQueues(DispatchPriority.DEFAULT);
}
protected int port = 1883;
protected ConnectionFactory cf;
protected LinkedList<Throwable> exceptions = new LinkedList<>();

View File

@ -105,6 +105,7 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
// await possible exceptions
Thread.sleep(1000);
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
executor.shutdown();
}
@Test

View File

@ -58,5 +58,6 @@ public class QueuePeristPauseTest extends ActiveMQTestBase {
Assert.assertFalse(queue.isPaused());
}
server.stop();
}
}