diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 02c17c6ad5..978cc39bc8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -67,6 +67,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.actors.Actor; import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores; import org.jboss.logging.Logger; @@ -199,6 +200,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private Executor startExecutor; + private Actor updateArrayActor; + private AfterConnectInternalListener afterConnectListener; private String groupID; @@ -251,6 +254,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory); } + + this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray); } @Override @@ -534,6 +539,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private TransportConfiguration selectConnector() { Pair[] usedTopology; + flushTopology(); + synchronized (topologyArrayGuard) { usedTopology = topologyArray; } @@ -743,6 +750,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery initialise(); + flushTopology(); + if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) { // Wait for an initial broadcast to give us at least one node in the cluster long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout(); @@ -812,6 +821,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return factory; } + public void flushTopology() { + if (updateArrayActor != null) { + updateArrayActor.flush(10, TimeUnit.SECONDS); + } + } + @Override public boolean isHA() { return ha; @@ -1426,14 +1441,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery topology.removeMember(eventTime, nodeID); if (clusterConnection) { - updateArraysAndPairs(); + updateArraysAndPairs(eventTime); } else { if (topology.isEmpty()) { // Resetting the topology to its original condition as it was brand new receivedTopology = false; topologyArray = null; } else { - updateArraysAndPairs(); + updateArraysAndPairs(eventTime); if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) { // Resetting the topology to its original condition as it was brand new @@ -1472,7 +1487,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } - updateArraysAndPairs(); + updateArraysAndPairs(uniqueEventID); if (last) { receivedTopology = true; @@ -1496,7 +1511,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } @SuppressWarnings("unchecked") - private void updateArraysAndPairs() { + private void updateArraysAndPairs(long time) { + if (updateArrayActor == null) { + // if for some reason we don't have an actor, just go straight + internalUpdateArray(time); + } else { + updateArrayActor.act(time); + } + } + + private void internalUpdateArray(long time) { synchronized (topologyArrayGuard) { Collection membersCopy = topology.getMembers(); @@ -1506,7 +1530,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery for (TopologyMemberImpl pair : membersCopy) { Pair transportConfigs = pair.getConnector(); topologyArrayLocal[count++] = new Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()), - protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB())); + protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB())); } this.topologyArray = topologyArrayLocal; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 9119a0d74f..2c570b9a24 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -369,6 +369,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Message copy() { + checkProperties(); checkEncode(); return new CoreMessage(this); } @@ -936,8 +937,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage putObjectProperty(final SimpleString key, final Object value) throws ActiveMQPropertyConversionException { - messageChanged(); checkProperties(); + messageChanged(); TypedProperties.setObjectProperty(key, value, properties); return this; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 287b261816..9a2e285784 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -362,12 +362,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe } } - @Override - protected void finalize() throws Throwable { - releaseResources(); - super.finalize(); - } - // Private ------------------------------------------------------- public synchronized void validateFile() throws ActiveMQException { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index dad930005c..2f6cb7f745 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -679,17 +679,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - @Override - protected final void finalize() throws Throwable { - if (state != SERVER_STATE.STOPPED) { - ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped(); - - stop(); - } - - super.finalize(); - } - @Override public void setState(SERVER_STATE state) { this.state = state; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java index 630cdf55a6..8caba17d14 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java @@ -435,6 +435,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase { coreAddressConfiguration.setName(TOPIC); CoreQueueConfiguration coreQueueConfiguration = new CoreQueueConfiguration(); coreQueueConfiguration.setName(TOPIC); + coreQueueConfiguration.setAddress(TOPIC); coreQueueConfiguration.setRoutingType(RoutingType.ANYCAST); coreAddressConfiguration.addQueueConfiguration(coreQueueConfiguration); return coreAddressConfiguration; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 3de9203068..bc09fa1540 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -835,7 +835,7 @@ public class PagingTest extends ActiveMQTestBase { ClientMessage message = session.createMessage(true); if (i < 1000) { - message.setExpiration(System.currentTimeMillis() + 1000); + message.setExpiration(System.currentTimeMillis() + 100); } message.putIntProperty("tst-count", i); @@ -852,12 +852,7 @@ public class PagingTest extends ActiveMQTestBase { session.commit(); producer.close(); - for (long timeout = System.currentTimeMillis() + 60000; timeout > System.currentTimeMillis() && getMessageCount(qEXP) < 1000; ) { - System.out.println("count = " + getMessageCount(qEXP)); - Thread.sleep(100); - } - - assertEquals(1000, getMessageCount(qEXP)); + Wait.assertEquals(1000, qEXP::getMessageCount); session.start(); @@ -874,10 +869,7 @@ public class PagingTest extends ActiveMQTestBase { assertNull(consumer.receiveImmediate()); - for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(queue1) != 0; ) { - Thread.sleep(100); - } - assertEquals(0, getMessageCount(queue1)); + Wait.assertEquals(0, queue1::getMessageCount); consumer.close();