diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 2cc00d3993..8c18a1562b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -44,7 +44,6 @@ import io.netty.channel.Channel; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.Configuration; @@ -1903,8 +1902,8 @@ public interface ActiveMQServerLogger extends BasicLogger { void undeployAddress(SimpleString addressName); @LogMessage(level = Logger.Level.INFO) - @Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT) - void undeployQueue(RoutingType routingType, SimpleString queueName); + @Message(id = 224077, value = "Undeploying queue {0}", format = Message.Format.MESSAGE_FORMAT) + void undeployQueue(SimpleString queueName); @LogMessage(level = Logger.Level.WARN) @Message(id = 224078, value = "The size of duplicate cache detection () appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer () {1}.", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 715f984ad2..498f4e9def 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2601,9 +2601,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { }, 0, dumpInfoInterval, TimeUnit.MILLISECONDS); } - // Undeploy any addresses and queues not in config - undeployAddressesAndQueueNotInConfiguration(); - // Deploy the rest of the stuff // Deploy predefined addresses @@ -2612,6 +2609,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { // Deploy any predefined queues deployQueuesFromConfiguration(); + // Undeploy any addresses and queues not in config + undeployAddressesAndQueueNotInConfiguration(); + // We need to call this here, this gives any dependent server a chance to deploy its own addresses // this needs to be done before clustering is fully activated callActivateCallbacks(); @@ -2695,31 +2695,28 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception { Set addressesInConfig = configuration.getAddressConfigurations().stream() - .map(CoreAddressConfiguration::getName) - .collect(Collectors.toSet()); + .map(CoreAddressConfiguration::getName) + .collect(Collectors.toSet()); - Set queuesInConfig = new HashSet<>(); - for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) { - for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) { - // combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name - queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName()); - } - } + Set queuesInConfig = configuration.getAddressConfigurations().stream() + .map(CoreAddressConfiguration::getQueueConfigurations) + .flatMap(List::stream).map(CoreQueueConfiguration::getName) + .collect(Collectors.toSet()); for (SimpleString addressName : listAddressNames()) { AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString()); if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) { for (Queue queue : listQueues(addressName)) { - ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName()); + ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); queue.deleteQueue(true); } ActiveMQServerLogger.LOGGER.undeployAddress(addressName); removeAddressInfo(addressName, null); } else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) { for (Queue queue : listConfiguredQueues(addressName)) { - if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) { - ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName()); + if (!queuesInConfig.contains(queue.getName().toString())) { + ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); queue.deleteQueue(true); } } @@ -2747,15 +2744,38 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) { try { ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString()); - AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes()); - addOrUpdateAddressInfo(info); + SimpleString address = SimpleString.toSimpleString(config.getName()); + + AddressInfo tobe = new AddressInfo(address, config.getRoutingTypes()); + + //During this stage until all queues re-configured we combine the current (if exists) with to-be routing types to allow changes in queues + AddressInfo current = getAddressInfo(address); + AddressInfo merged = new AddressInfo(address, tobe.getRoutingType()); + if (current != null) { + merged.getRoutingTypes().addAll(current.getRoutingTypes()); + } + addOrUpdateAddressInfo(merged); + deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); + + //Now all queues updated we apply the actual address info expected tobe. + addOrUpdateAddressInfo(tobe); } catch (Exception e) { ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage()); } } } + private AddressInfo mergedRoutingTypes(SimpleString address, AddressInfo... addressInfos) { + EnumSet mergedRoutingTypes = EnumSet.noneOf(RoutingType.class); + for (AddressInfo addressInfo : addressInfos) { + if (addressInfo != null) { + mergedRoutingTypes.addAll(addressInfo.getRoutingTypes()); + } + } + return new AddressInfo(address, mergedRoutingTypes); + } + private void deployQueuesFromListCoreQueueConfiguration(List queues) throws Exception { for (CoreQueueConfiguration config : queues) { try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index 1d8e7b93f3..ead96d5353 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -17,12 +17,6 @@ package org.apache.activemq.artemis.tests.integration.jms; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; @@ -32,6 +26,16 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.QueueBinding; @@ -289,9 +293,14 @@ public class RedeployTest extends ActiveMQTestBase { embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); try { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616"); + try (JMSContext context = connectionFactory.createContext()) { + context.createProducer().send(context.createQueue("myAddress"), "hello"); + } + latch.await(10, TimeUnit.SECONDS); Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress")); - Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType()); + Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType()); Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000); @@ -300,7 +309,14 @@ public class RedeployTest extends ActiveMQTestBase { latch.await(10, TimeUnit.SECONDS); Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress")); - Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType()); + Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType()); + + //Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss) + try (JMSContext context = connectionFactory.createContext()) { + Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive(); + assertEquals("hello", ((TextMessage) message).getText()); + } + } finally { embeddedActiveMQ.stop(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java index 3a2264f127..fefc735d9b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java @@ -20,6 +20,10 @@ package org.apache.activemq.artemis.tests.integration.persistence; import java.util.ArrayList; import java.util.List; +import javax.jms.ConnectionFactory; +import javax.jms.JMSContext; +import javax.jms.Message; +import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; @@ -27,7 +31,7 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Test; @@ -37,21 +41,8 @@ public class ConfigChangeTest extends ActiveMQTestBase { @Test public void testChangeQueueRoutingTypeOnRestart() throws Exception { - internalTestChangeQueueRoutingTypeOnRestart(false); - } - - @Test - public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception { - internalTestChangeQueueRoutingTypeOnRestart(true); - } - - public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception { - // if negative == true then the queue's routing type should *not* change - Configuration configuration = createDefaultInVMConfig(); - configuration.addAddressesSetting("#", new AddressSettings() - .setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE) - .setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)); + configuration.addAddressesSetting("#", new AddressSettings()); List addressConfigurations = new ArrayList(); CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration() @@ -65,6 +56,14 @@ public class ConfigChangeTest extends ActiveMQTestBase { configuration.setAddressConfigurations(addressConfigurations); server = createServer(true, configuration); server.start(); + + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0"); + try (JMSContext context = connectionFactory.createContext()) { + context.createProducer().send(context.createQueue("myAddress"), "hello"); + } + + server.stop(); addressConfiguration = new CoreAddressConfiguration() @@ -77,10 +76,16 @@ public class ConfigChangeTest extends ActiveMQTestBase { addressConfigurations.clear(); addressConfigurations.add(addressConfiguration); configuration.setAddressConfigurations(addressConfigurations); - server.start(); - assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType()); - assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType()); + assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType()); + assertEquals(RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType()); + + //Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss) + try (JMSContext context = connectionFactory.createContext()) { + Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive(); + assertEquals("hello", ((TextMessage) message).getText()); + } + server.stop(); } } diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml index e5bbe4fe96..a0313593b4 100644 --- a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml +++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml @@ -23,17 +23,21 @@ under the License. xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> - - - FORCE - - + false + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576 + + +
- + - +
diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml index 61ae86a9c8..6737a05e45 100644 --- a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml +++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml @@ -23,17 +23,20 @@ under the License. xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> - - - FORCE - - + false + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576 + +
- + - +