From b5bf5afde7b767683e71956b49cc6ec8f94c10b8 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 14 Feb 2018 10:15:01 -0500 Subject: [PATCH 1/2] NO-JIRA Test fixes - LargeServerMessageImpl.finalize is eventually causing deadlocks - CoreMessage needs to check properties before decoding - PagingTest tweaks - ServerLocatorImpl can deadlock eventually, avoiding a lock and using actors - ActiveMQServerImpl.finalize is also evil and can cause deadlocks on the testsuite - MqttClusterRemoteSubscribeTest needs to setup the Address now on the setup --- .../core/client/impl/ServerLocatorImpl.java | 34 ++++++++++++++++--- .../core/message/impl/CoreMessage.java | 3 +- .../impl/journal/LargeServerMessageImpl.java | 6 ---- .../core/server/impl/ActiveMQServerImpl.java | 11 ------ .../MqttClusterRemoteSubscribeTest.java | 1 + .../tests/integration/paging/PagingTest.java | 14 ++------ 6 files changed, 35 insertions(+), 34 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 02c17c6ad5..978cc39bc8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -67,6 +67,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.actors.Actor; import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores; import org.jboss.logging.Logger; @@ -199,6 +200,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private Executor startExecutor; + private Actor updateArrayActor; + private AfterConnectInternalListener afterConnectListener; private String groupID; @@ -251,6 +254,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory); } + + this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray); } @Override @@ -534,6 +539,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private TransportConfiguration selectConnector() { Pair[] usedTopology; + flushTopology(); + synchronized (topologyArrayGuard) { usedTopology = topologyArray; } @@ -743,6 +750,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery initialise(); + flushTopology(); + if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) { // Wait for an initial broadcast to give us at least one node in the cluster long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout(); @@ -812,6 +821,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return factory; } + public void flushTopology() { + if (updateArrayActor != null) { + updateArrayActor.flush(10, TimeUnit.SECONDS); + } + } + @Override public boolean isHA() { return ha; @@ -1426,14 +1441,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery topology.removeMember(eventTime, nodeID); if (clusterConnection) { - updateArraysAndPairs(); + updateArraysAndPairs(eventTime); } else { if (topology.isEmpty()) { // Resetting the topology to its original condition as it was brand new receivedTopology = false; topologyArray = null; } else { - updateArraysAndPairs(); + updateArraysAndPairs(eventTime); if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) { // Resetting the topology to its original condition as it was brand new @@ -1472,7 +1487,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } - updateArraysAndPairs(); + updateArraysAndPairs(uniqueEventID); if (last) { receivedTopology = true; @@ -1496,7 +1511,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } @SuppressWarnings("unchecked") - private void updateArraysAndPairs() { + private void updateArraysAndPairs(long time) { + if (updateArrayActor == null) { + // if for some reason we don't have an actor, just go straight + internalUpdateArray(time); + } else { + updateArrayActor.act(time); + } + } + + private void internalUpdateArray(long time) { synchronized (topologyArrayGuard) { Collection membersCopy = topology.getMembers(); @@ -1506,7 +1530,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery for (TopologyMemberImpl pair : membersCopy) { Pair transportConfigs = pair.getConnector(); topologyArrayLocal[count++] = new Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()), - protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB())); + protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB())); } this.topologyArray = topologyArrayLocal; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 9119a0d74f..2c570b9a24 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -369,6 +369,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Message copy() { + checkProperties(); checkEncode(); return new CoreMessage(this); } @@ -936,8 +937,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage putObjectProperty(final SimpleString key, final Object value) throws ActiveMQPropertyConversionException { - messageChanged(); checkProperties(); + messageChanged(); TypedProperties.setObjectProperty(key, value, properties); return this; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 287b261816..9a2e285784 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -362,12 +362,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe } } - @Override - protected void finalize() throws Throwable { - releaseResources(); - super.finalize(); - } - // Private ------------------------------------------------------- public synchronized void validateFile() throws ActiveMQException { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index dad930005c..2f6cb7f745 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -679,17 +679,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - @Override - protected final void finalize() throws Throwable { - if (state != SERVER_STATE.STOPPED) { - ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped(); - - stop(); - } - - super.finalize(); - } - @Override public void setState(SERVER_STATE state) { this.state = state; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java index 630cdf55a6..8caba17d14 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java @@ -435,6 +435,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { coreAddressConfiguration.setName(TOPIC); CoreQueueConfiguration coreQueueConfiguration = new CoreQueueConfiguration(); coreQueueConfiguration.setName(TOPIC); + coreQueueConfiguration.setAddress(TOPIC); coreQueueConfiguration.setRoutingType(RoutingType.ANYCAST); coreAddressConfiguration.addQueueConfiguration(coreQueueConfiguration); return coreAddressConfiguration; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 3de9203068..bc09fa1540 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -835,7 +835,7 @@ public class PagingTest extends ActiveMQTestBase { ClientMessage message = session.createMessage(true); if (i < 1000) { - message.setExpiration(System.currentTimeMillis() + 1000); + message.setExpiration(System.currentTimeMillis() + 100); } message.putIntProperty("tst-count", i); @@ -852,12 +852,7 @@ public class PagingTest extends ActiveMQTestBase { session.commit(); producer.close(); - for (long timeout = System.currentTimeMillis() + 60000; timeout > System.currentTimeMillis() && getMessageCount(qEXP) < 1000; ) { - System.out.println("count = " + getMessageCount(qEXP)); - Thread.sleep(100); - } - - assertEquals(1000, getMessageCount(qEXP)); + Wait.assertEquals(1000, qEXP::getMessageCount); session.start(); @@ -874,10 +869,7 @@ public class PagingTest extends ActiveMQTestBase { assertNull(consumer.receiveImmediate()); - for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(queue1) != 0; ) { - Thread.sleep(100); - } - assertEquals(0, getMessageCount(queue1)); + Wait.assertEquals(0, queue1::getMessageCount); consumer.close(); From 5480d7bc5b5dc9ab14995c89a1df8868ada720ce Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 13 Feb 2018 14:40:37 -0500 Subject: [PATCH 2/2] NO-JIRA Fixing Deadlock on MQTTConnection Found during a testsuite run --- .../core/protocol/mqtt/MQTTConnection.java | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index eb3b4b11fc..012356b553 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -47,9 +47,9 @@ public class MQTTConnection implements RemotingConnection { private String clientID; - private final List failureListeners = Collections.synchronizedList(new ArrayList()); + private final List failureListeners = new CopyOnWriteArrayList<>(); - private final List closeListeners = Collections.synchronizedList(new ArrayList()); + private final List closeListeners = new CopyOnWriteArrayList<>(); public MQTTConnection(Connection transportConnection) throws Exception { this.transportConnection = transportConnection; @@ -100,15 +100,14 @@ public class MQTTConnection implements RemotingConnection { @Override public List removeCloseListeners() { - synchronized (closeListeners) { - List deletedCloseListeners = new ArrayList<>(closeListeners); - closeListeners.clear(); - return deletedCloseListeners; - } + List deletedCloseListeners = copyCloseListeners(); + closeListeners.clear(); + return deletedCloseListeners; } @Override public void setCloseListeners(List listeners) { + closeListeners.clear(); closeListeners.addAll(listeners); } @@ -119,19 +118,15 @@ public class MQTTConnection implements RemotingConnection { @Override public List removeFailureListeners() { - synchronized (failureListeners) { - List deletedFailureListeners = new ArrayList<>(failureListeners); - failureListeners.clear(); - return deletedFailureListeners; - } + List deletedFailureListeners = copyFailureListeners(); + failureListeners.clear(); + return deletedFailureListeners; } @Override public void setFailureListeners(List listeners) { - synchronized (failureListeners) { - failureListeners.clear(); - failureListeners.addAll(listeners); - } + failureListeners.clear(); + failureListeners.addAll(listeners); } @Override @@ -141,13 +136,20 @@ public class MQTTConnection implements RemotingConnection { @Override public void fail(ActiveMQException me) { - synchronized (failureListeners) { - for (FailureListener listener : failureListeners) { - listener.connectionFailed(me, false); - } + List copy = copyFailureListeners(); + for (FailureListener listener : copy) { + listener.connectionFailed(me, false); } } + private List copyFailureListeners() { + return new ArrayList<>(failureListeners); + } + + private List copyCloseListeners() { + return new ArrayList<>(closeListeners); + } + @Override public void fail(ActiveMQException me, String scaleDownTargetNodeID) { synchronized (failureListeners) {