diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index 485cc911ba..c499a28d9d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -140,7 +140,7 @@ public class DivertImpl implements Divert { copy = message; } - postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()), false); + postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()).setServerSession(context.getServerSession()), false); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java index 355c3c4541..76a00aad29 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; @@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl; import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -58,6 +60,7 @@ import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Assert; import org.junit.Test; @@ -1775,4 +1778,143 @@ public class DivertTest extends ActiveMQTestBase { Assert.assertEquals("testAddress" + COUNT, message.getAddress()); Assert.assertEquals("testAddress" + (COUNT - 1), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS)); } + + @Test + public void testDivertToNewAddress() throws Exception { + final String queueName = "queue"; + final String dummyQueueName = "dummy"; + final String noDivertAutoCreateQName = "notAllowed"; + final String propKey = "newQueue"; + final String DIVERT = "myDivert"; + final int numMessages = 10; + + Transformer transformer = message -> message.setAddress(message.getStringProperty(propKey)); + + ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl(); + serviceRegistry.addDivertTransformer(DIVERT, transformer); + + AddressSettings autoCreateDestinationsAS = new AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true); + AddressSettings noAutoCreateDestinationsAS = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false); + + ActiveMQServer server = addServer(new ActiveMQServerImpl(createDefaultInVMConfig(), null, null, null, serviceRegistry)); + + server.getConfiguration().addAddressSetting("#", autoCreateDestinationsAS); + server.getConfiguration().addAddressSetting(noDivertAutoCreateQName, noAutoCreateDestinationsAS); + + server.start(); + + server.createQueue(new QueueConfiguration(queueName)); + server.deployDivert(new DivertConfiguration() + .setName(DIVERT) + .setAddress(queueName) + .setRoutingType(ComponentConfigurationRoutingType.ANYCAST) + .setForwardingAddress(dummyQueueName) + .setExclusive(true)); + + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession session = sf.createSession(false, true, true); + session.start(); + + ClientMessage message; + ClientProducer producer = session.createProducer(queueName); + + for (int i = 0; i < numMessages; i++) { + message = session.createMessage(true); + message.putStringProperty(propKey, queueName + "." + i); + producer.send(message); + } + + for (int i = 0; i < numMessages; i++) { + ClientConsumer consumer = session.createConsumer(queueName + "." + i); + message = consumer.receive(DivertTest.TIMEOUT); + Assert.assertNotNull(message); + message.acknowledge(); + consumer.close(); + } + + ClientMessage failMessage = session.createMessage(true); + + assertThrows(ActiveMQAddressDoesNotExistException.class, () -> { + failMessage.putStringProperty(propKey, noDivertAutoCreateQName); + producer.send(failMessage); + }); + + producer.close(); + + Assert.assertNull(server.locateQueue(noDivertAutoCreateQName)); + Assert.assertNull(server.locateQueue(dummyQueueName)); + + } + + @Test + public void testHandleAutoDeleteDestination() throws Exception { + final String testAddress = "testAddress"; + final String forwardAddress = "forwardAddress"; + + DivertConfiguration divertConf = new DivertConfiguration() + .setName("divert") + .setRoutingType(ComponentConfigurationRoutingType.ANYCAST) + .setExclusive(true) + .setAddress(testAddress) + .setForwardingAddress(forwardAddress); + + AddressSettings addressSettings = new AddressSettings() + .setAutoCreateAddresses(true) + .setAutoCreateQueues(true) + .setAutoDeleteAddresses(true) + .setAutoDeleteQueues(true); + + Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf).addAddressSetting("#", addressSettings); + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false)); + server.start(); + + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession session = sf.createSession(false, true, true); + + session.createQueue(new QueueConfiguration(testAddress).setAddress(testAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + session.createQueue(new QueueConfiguration(forwardAddress).setAddress(forwardAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + session.start(); + + ClientProducer producer = session.createProducer(testAddress); + ClientConsumer consumer = session.createConsumer(forwardAddress); + + final int numMessages = 5; + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session.createMessage(true); + message.setRoutingType(RoutingType.ANYCAST); + producer.send(message); + } + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = consumer.receive(DivertTest.TIMEOUT); + Assert.assertNotNull(message); + message.acknowledge(); + } + + Assert.assertNull(consumer.receiveImmediate()); + consumer.close(); + + //Trigger autoDelete instead of waiting + QueueManagerImpl.performAutoDeleteQueue(server, server.locateQueue(forwardAddress)); + Wait.assertTrue(() -> server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(forwardAddress)) + .getBindingRemovedTimestamp() != -1, DivertTest.TIMEOUT, 100); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session.createMessage(true); + producer.send(message); + } + + consumer = session.createConsumer(forwardAddress); + for (int i = 0; i < numMessages; i++) { + ClientMessage message = consumer.receive(DivertTest.TIMEOUT); + Assert.assertNotNull(message); + message.acknowledge(); + } + + Assert.assertNull(consumer.receiveImmediate()); + } + }