diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index bb6502605d..f5d6f0ad23 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1269,7 +1269,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding Bindings simpleRoute(SimpleString address, RoutingContext context, Message message, AddressInfo addressInfo) throws Exception { Bindings bindings = addressManager.getBindingsForRoutingAddress(address); - if (bindings == null && context.getServerSession() != null) { + if ((bindings == null || !bindings.contains(LocalQueueBinding.class)) && context.getServerSession() != null) { AutoCreateResult autoCreateResult = context.getServerSession().checkAutoCreate(new QueueConfiguration(address).setRoutingType(context.getRoutingType())); if (autoCreateResult == AutoCreateResult.NOT_FOUND) { ActiveMQException e = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteClusteredDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteClusteredDestinationTest.java new file mode 100644 index 0000000000..8c6898cffa --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteClusteredDestinationTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.cluster.distribution; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class AutoDeleteClusteredDestinationTest extends ClusterTestBase { + + @Parameterized.Parameter(0) + public MessageLoadBalancingType loadBalancingType = MessageLoadBalancingType.OFF; + + @Parameterized.Parameters(name = "loadBalancingType = {0}") + public static Iterable extends Object> loadBalancingType() { + return Arrays.asList(new Object[][]{ + {MessageLoadBalancingType.OFF}, + {MessageLoadBalancingType.STRICT}, + {MessageLoadBalancingType.ON_DEMAND}, + {MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION}}); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + } + + @Test + public void testConnectionLoadBalancingAnonRun() throws Exception { + final String queueName = "queue"; + final String url0 = "tcp://localhost:61616?useTopologyForLoadBalancing=false"; + final String url1 = "tcp://localhost:61617?useTopologyForLoadBalancing=false"; + final SimpleString simpleName = SimpleString.toSimpleString(queueName); + final int messageCount = 10; + final int TIMEOUT = 5000; + + CountDownLatch latch = new CountDownLatch(messageCount); + + MessageListener listener0 = message -> { + latch.countDown(); + }; + + MessageListener listener1 = message -> { + latch.countDown(); + }; + + setupClusterConnection("cluster0", queueName, loadBalancingType, 1, isNetty(), 0, 1); + setupClusterConnection("cluster1", queueName, loadBalancingType, 1, isNetty(), 1, 0); + + startServers(0, 1); + waitForServerToStart(servers[0]); + waitForServerToStart(servers[1]); + + AddressSettings settings = new AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true); + + servers[0].getAddressSettingsRepository().addMatch(queueName, settings); + servers[1].getAddressSettingsRepository().addMatch(queueName, settings); + + try ( + ActiveMQConnectionFactory connectionFactory0 = new ActiveMQConnectionFactory(url0); + ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(url1); + Connection connection0 = connectionFactory0.createConnection(); + Connection connection1 = connectionFactory1.createConnection(); + Session consumerSession0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerSession1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session producerSession0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + + Queue queue = consumerSession0.createQueue(queueName); + MessageProducer producer0 = producerSession0.createProducer(queue); + MessageConsumer consumer0 = consumerSession0.createConsumer(queue); + MessageConsumer consumer1 = consumerSession1.createConsumer(queue); + + consumer0.setMessageListener(listener0); + consumer1.setMessageListener(listener1); + connection0.start(); + connection1.start(); + + for (int i = 0; i < messageCount; i++) { + producer0.send(producerSession0.createTextMessage("Message")); + + if (i == 2) { + QueueImpl serverQueue = (QueueImpl) servers[0].locateQueue(simpleName); + Wait.assertTrue(() -> serverQueue.getMessageCount() == 0, TIMEOUT, 100); + consumer0.close(); + //Trigger an auto-delete to not have to wait + QueueManagerImpl.performAutoDeleteQueue(servers[0], serverQueue); + Wait.assertTrue(() -> servers[0].getPostOffice().getAddressInfo(simpleName).getBindingRemovedTimestamp() != -1, TIMEOUT, 100); + } + + if (i == 6) { + consumer0 = consumerSession0.createConsumer(queue); + consumer0.setMessageListener(listener0); + QueueImpl serverQueue = (QueueImpl) servers[0].locateQueue(simpleName); + Wait.assertTrue(() -> serverQueue.getConsumerCount() == 1, TIMEOUT, 100); + } + } + + Assert.assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS)); + + } + } + + protected boolean isNetty() { + return true; + } + + @Override + protected boolean isFileStorage() { + return false; + } + +}