From b471392872cf15c2a29f2b3c4e9a9ca4a9586e49 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 11 Nov 2022 07:24:15 -0500 Subject: [PATCH] ARTEMIS-4089 Check on AutoCreation during routing (cherry picked from commit 4f79eb42f53071e3606a4848ec72dd164a88e350) --- .../amqp/broker/AMQPSessionCallback.java | 48 +-- .../core/postoffice/impl/PostOfficeImpl.java | 94 ++++-- .../artemis/core/server/RoutingContext.java | 4 + .../artemis/core/server/ServerSession.java | 2 + .../core/server/impl/RoutingContextImpl.java | 30 +- .../core/server/impl/ServerSessionImpl.java | 57 +++- .../amqp/AmqpExpiredMessageTest.java | 9 + .../integration/client/DeleteAddressTest.java | 291 ++++++++++++++++++ .../integration/client/LargeMessageTest.java | 3 +- .../management/AddressControlTest.java | 1 + 10 files changed, 465 insertions(+), 74 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index efa25c89cb..637cdf10f2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; @@ -35,7 +34,6 @@ import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.server.AddressQueryResult; @@ -47,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ServerProducer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; @@ -332,45 +329,8 @@ public class AMQPSessionCallback implements SessionCallback { } - public boolean checkAddressAndAutocreateIfPossible(SimpleString address, RoutingType routingType) throws Exception { - boolean result = false; - SimpleString unPrefixedAddress = serverSession.removePrefix(address); - AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(unPrefixedAddress.toString()); - - if (routingType == RoutingType.MULTICAST) { - if (manager.getServer().getAddressInfo(unPrefixedAddress) == null) { - if (addressSettings.isAutoCreateAddresses()) { - try { - serverSession.createAddress(address, routingType, true); - } catch (ActiveMQAddressExistsException e) { - // The address may have been created by another thread in the mean time. Catch and do nothing. - } - result = true; - } - } else { - result = true; - } - } else if (routingType == RoutingType.ANYCAST) { - if (manager.getServer().locateQueue(unPrefixedAddress) == null) { - Bindings bindings = manager.getServer().getPostOffice().lookupBindingsForAddress(address); - if (bindings != null) { - // this means the address has another queue with a different name, which is fine, we just ignore it on this case - result = true; - } else if (addressSettings.isAutoCreateQueues()) { - try { - serverSession.createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true)); - } catch (ActiveMQQueueExistsException e) { - // The queue may have been created by another thread in the mean time. Catch and do nothing. - } - result = true; - } - } else { - result = true; - } - } - - return result; + return serverSession.checkAutoCreate(address, routingType); } public AddressQueryResult addressQuery(SimpleString addressName, @@ -506,7 +466,11 @@ public class AMQPSessionCallback implements SessionCallback { //here check queue-autocreation if (!checkAddressAndAutocreateIfPossible(address, routingType)) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); + ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); + if (transaction != null) { + transaction.markAsRollbackOnly(e); + } + throw e; } OperationContext oldcontext = recoverContext(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 5949ad8448..b9c60f1fb3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -38,6 +38,7 @@ import java.util.stream.Stream; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; import org.apache.activemq.artemis.api.core.Message; @@ -1154,7 +1155,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } message.clearInternalProperties(); Bindings bindings; - final AddressInfo addressInfo = addressManager.getAddressInfo(address); + final AddressInfo addressInfo = checkAddress(context, address); + + final RoutingStatus status; if (bindingMove != null) { context.clear(); context.setReusable(false); @@ -1162,18 +1165,28 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (addressInfo != null) { addressInfo.incrementRoutedMessageCount(); } - } else if ((bindings = addressManager.getBindingsForRoutingAddress(address)) != null) { - bindings.route(message, context); - if (addressInfo != null) { - addressInfo.incrementRoutedMessageCount(); - } + status = RoutingStatus.OK; } else { - context.setReusable(false); - if (addressInfo != null) { - addressInfo.incrementUnRoutedMessageCount(); + bindings = simpleRoute(address, context, message, addressInfo); + if (logger.isDebugEnabled()) { + if (bindings != null) { + logger.debug("PostOffice::simpleRoute returned bindings with size = {}", bindings.getBindings().size()); + } else { + logger.debug("PostOffice::simpleRoute null as bindings"); + } + } + if (bindings == null) { + context.setReusable(false); + context.clear(); + if (addressInfo != null) { + addressInfo.incrementUnRoutedMessageCount(); + } + // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS) + logger.debug("Couldn't find any bindings for address={} on message={}", address, message); + status = RoutingStatus.NO_BINDINGS; + } else { + status = RoutingStatus.OK; } - // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS) - logger.debug("Couldn't find any bindings for address={} on message={}", address, message); } if (server.hasBrokerMessagePlugins()) { @@ -1182,14 +1195,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding logger.trace("Message after routed={}\n{}", message, context); + final RoutingStatus finalStatus; try { - final RoutingStatus status; - if (context.getQueueCount() == 0) { - status = maybeSendToDLA(message, context, address, sendToDLA); + if ( status == RoutingStatus.NO_BINDINGS) { + finalStatus = maybeSendToDLA(message, context, address, sendToDLA); } else { - status = RoutingStatus.OK; + finalStatus = status; try { - processRoute(message, context, direct); + if (context.getQueueCount() > 0) { + processRoute(message, context, direct); + } else { + if (message.isLargeMessage()) { + ((LargeServerMessage) message).deleteFile(); + } + } } catch (ActiveMQAddressFullException e) { if (startedTX) { context.getTransaction().rollback(); @@ -1203,9 +1222,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding context.getTransaction().commit(); } if (server.hasBrokerMessagePlugins()) { - server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status)); + server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalStatus)); } - return status; + return finalStatus; } catch (Exception e) { if (server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e)); @@ -1214,6 +1233,45 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + private AddressInfo checkAddress(RoutingContext context, SimpleString address) throws Exception { + AddressInfo addressInfo = addressManager.getAddressInfo(address); + if (addressInfo == null && context.getServerSession() != null) { + if (context.getServerSession().checkAutoCreate(address, context.getRoutingType())) { + addressInfo = addressManager.getAddressInfo(address); + } else { + ActiveMQException ex = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); + if (context.getTransaction() != null) { + context.getTransaction().markAsRollbackOnly(ex); + } + throw ex; + } + } + return addressInfo; + } + + Bindings simpleRoute(SimpleString address, RoutingContext context, Message message, AddressInfo addressInfo) throws Exception { + Bindings bindings = addressManager.getBindingsForRoutingAddress(address); + if (bindings == null && context.getServerSession() != null) { + if (!context.getServerSession().checkAutoCreate(address, context.getRoutingType())) { + ActiveMQException e = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); + Transaction tx = context.getTransaction(); + if (tx != null) { + tx.markAsRollbackOnly(e); + } + throw e; + } + bindings = addressManager.getBindingsForRoutingAddress(address); + } + if (bindings != null) { + bindings.route(message, context); + if (addressInfo != null) { + addressInfo.incrementRoutedMessageCount(); + } + } + return bindings; + } + + private RoutingStatus maybeSendToDLA(final Message message, final RoutingContext context, final SimpleString address, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index f6cff3d1be..d95c7ae9d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -103,5 +103,9 @@ public interface RoutingContext { MessageLoadBalancingType getLoadBalancingType(); + RoutingContext setServerSession(ServerSession session); + + ServerSession getServerSession(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index eff638f1e0..f238a4925d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -110,6 +110,8 @@ public interface ServerSession extends SecurityAuth { void addCloseable(Closeable closeable); + boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception; + ServerConsumer createConsumer(long consumerID, SimpleString queueName, SimpleString filterString, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index c047171154..1220190d12 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -30,6 +29,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.mirror.MirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -65,10 +65,10 @@ public class RoutingContextImpl implements RoutingContext { boolean mirrorDisabled = false; - private final Executor executor; - private boolean duplicateDetection = true; + private ServerSession serverSession; + @Override public boolean isDuplicateDetection() { return duplicateDetection; @@ -81,12 +81,7 @@ public class RoutingContextImpl implements RoutingContext { } public RoutingContextImpl(final Transaction transaction) { - this(transaction, null); - } - - public RoutingContextImpl(final Transaction transaction, Executor executor) { this.transaction = transaction; - this.executor = executor; } @Override @@ -121,7 +116,7 @@ public class RoutingContextImpl implements RoutingContext { } @Override - public RoutingContext setReusable(boolean reusable) { + public RoutingContextImpl setReusable(boolean reusable) { if (this.reusable != null && !this.reusable.booleanValue()) { // cannot set to Reusable once it was set to false return this; @@ -131,7 +126,7 @@ public class RoutingContextImpl implements RoutingContext { return this; } @Override - public RoutingContext setReusable(boolean reusable, int previousBindings) { + public RoutingContextImpl setReusable(boolean reusable, int previousBindings) { this.version = previousBindings; this.previousAddress = address; this.previousRoutingType = routingType; @@ -144,7 +139,7 @@ public class RoutingContextImpl implements RoutingContext { } @Override - public RoutingContext clear() { + public RoutingContextImpl clear() { map.clear(); queueCount = 0; @@ -252,7 +247,7 @@ public class RoutingContextImpl implements RoutingContext { } @Override - public RoutingContext setRoutingType(RoutingType routingType) { + public RoutingContextImpl setRoutingType(RoutingType routingType) { if (this.routingType == null && routingType != null || this.routingType != routingType) { this.clear(); } @@ -313,6 +308,17 @@ public class RoutingContextImpl implements RoutingContext { return getContextListing(address).getDurableQueues(); } + @Override + public RoutingContextImpl setServerSession(ServerSession session) { + this.serverSession = session; + return this; + } + + @Override + public ServerSession getServerSession() { + return serverSession; + } + @Override public int getQueueCount() { return queueCount; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 67201d3c7e..ab77a89f61 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.server.impl; +import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.json.JsonArrayBuilder; import org.apache.activemq.artemis.json.JsonObjectBuilder; import java.security.cert.X509Certificate; @@ -169,7 +172,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private final SimpleString managementAddress; - protected final RoutingContext routingContext = new RoutingContextImpl(null); + protected final RoutingContext routingContext = new RoutingContextImpl(null).setServerSession(this); protected final SessionCallback callback; @@ -1737,6 +1740,55 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return tx; } + + @Override + public boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception { + boolean result; + SimpleString unPrefixedAddress = removePrefix(address); + AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(unPrefixedAddress.toString()); + + if (routingType == RoutingType.MULTICAST) { + if (server.getAddressInfo(unPrefixedAddress) == null) { + if (addressSettings.isAutoCreateAddresses()) { + try { + createAddress(address, routingType, true); + } catch (ActiveMQAddressExistsException e) { + // The address may have been created by another thread in the mean time. Catch and do nothing. + } + result = true; + } else { + result = false; + } + } else { + result = true; + } + } else if (routingType == RoutingType.ANYCAST) { + if (server.locateQueue(unPrefixedAddress) == null) { + Bindings bindings = server.getPostOffice().lookupBindingsForAddress(address); + if (bindings != null) { + // this means the address has another queue with a different name, which is fine, we just ignore it on this case + result = true; + } else if (addressSettings.isAutoCreateQueues()) { + try { + createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true)); + } catch (ActiveMQQueueExistsException e) { + // The queue may have been created by another thread in the mean time. Catch and do nothing. + } + result = true; + } else { + result = false; + } + } else { + result = true; + } + } else { + result = true; + } + + return result; + } + + @Override public RoutingStatus send(final Message message, final boolean direct) throws Exception { return send(message, direct, false); @@ -2218,6 +2270,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { result = postOffice.route(msg, routingContext, direct); + logger.debug("Routing result for {} = {}", msg, result); + Pair value = targetAddressInfos.get(msg.getAddressSimpleString()); if (value == null) { @@ -2231,6 +2285,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { routingContext.clear(); } } + return result; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index f98ea8067c..71f397aad5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -23,6 +23,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -49,9 +50,13 @@ import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AmqpExpiredMessageTest extends AmqpClientTestSupport { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @Test(timeout = 60000) public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception { AmqpClient client = createAmqpClient(); @@ -568,7 +573,11 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { message.setText("Test-Message"); message.setDeliveryAnnotation("shouldDisappear", 1); message.setMessageAnnotation("x-opt-routing-type", (byte) 1); + + logger.debug("*******************************************************************************************************************************"); + logger.debug("message being sent {}", message); sender.send(message); + logger.debug("*******************************************************************************************************************************"); Queue forward = getProxyToQueue(FORWARDING_ADDRESS); assertTrue("Message not diverted", Wait.waitFor(() -> forward.getMessageCount() > 0, 7000, 500)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java new file mode 100644 index 0000000000..dece936305 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeleteAddressTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + ActiveMQServer server; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + private void localServer(boolean autoCreate) throws Exception { + server = createServer(false, true); + + AddressSettings settings = new AddressSettings().setAutoDeleteAddresses(autoCreate).setAutoCreateAddresses(autoCreate).setAutoCreateQueues(autoCreate).setAutoDeleteQueues(autoCreate).setDeadLetterAddress(SimpleString.toSimpleString("DLQ")).setSendToDLAOnNoRoute(true); + server.start(); + server.createQueue(new QueueConfiguration("DLQ").setRoutingType(RoutingType.ANYCAST)); + server.getAddressSettingsRepository().addMatch(getName() + "*", settings); + } + + @Test + public void testQueueNoAutoCreateCore() throws Exception { + internalQueueTest("CORE", false); + } + + @Test + public void testQueueNoAutoCreateAMQP() throws Exception { + internalQueueTest("AMQP", false); + } + + @Test + public void testQueueNoAutoCreateOpenWire() throws Exception { + internalQueueTest("OPENWIRE", false); + } + + + @Test + public void testQueueAutoCreateCore() throws Exception { + internalQueueTest("CORE", true); + } + + @Test + public void testDeletoAutoCreateAMQP() throws Exception { + internalQueueTest("AMQP", true); + } + + @Test + public void testQueueAutoCreateOpenWire() throws Exception { + internalQueueTest("OPENWIRE", true); + } + + public void internalQueueTest(String protocol, boolean autocreate) throws Exception { + localServer(autocreate); + + String ADDRESS_NAME = getName() + protocol; + + if (!autocreate) { + server.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(ADDRESS_NAME).setRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + } + + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(ADDRESS_NAME); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("hello")); + session.commit(); + connection.start(); + + try (MessageConsumer consumer = session.createConsumer(queue)) { + logger.debug("Sending hello message"); + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("hello", message.getText()); + } + + session.commit(); + + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS_NAME); + Wait.assertEquals(0, serverQueue::getConsumerCount); + + server.destroyQueue(SimpleString.toSimpleString(ADDRESS_NAME)); + + boolean exception = false; + try { + logger.debug("Sending good bye message"); + producer.send(session.createTextMessage("good bye")); + session.commit(); + logger.debug("Exception was not captured, sent went fine"); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + exception = true; + } + + if (!autocreate) { + Assert.assertTrue(exception); + } + + if (autocreate) { + logger.debug("creating consumer"); + try (MessageConsumer consumer = session.createConsumer(queue)) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("good bye", message.getText()); + } + } else { + exception = false; + logger.debug("Creating consumer, where an exception is expected"); + try (MessageConsumer consumer = session.createConsumer(queue)) { + } catch (Exception e) { + logger.debug("Received exception after createConsumer"); + exception = true; + } + Assert.assertTrue(exception); + } + } + + org.apache.activemq.artemis.core.server.Queue dlqServerQueue = server.locateQueue("DLQ"); + Assert.assertEquals(0, dlqServerQueue.getMessageCount()); + } + + @Test + public void testTopicNoAutoCreateCore() throws Exception { + internalMulticastTest("CORE", false); + } + + @Test + public void testTopicAutoCreateCore() throws Exception { + internalMulticastTest("CORE", true); + } + + @Test + public void testTopicNoAutoCreateAMQP() throws Exception { + internalMulticastTest("AMQP", false); + } + + @Test + public void testTopicAutoCreateAMQP() throws Exception { + internalMulticastTest("AMQP", true); + } + + @Test + public void testTopicNoAutoCreateOPENWIRE() throws Exception { + internalMulticastTest("OPENWIRE", false); + } + + @Test + public void testTopicAutoCreateOPENWIRE() throws Exception { + internalMulticastTest("OPENWIRE", true); + } + + public void internalMulticastTest(String protocol, boolean autocreate) throws Exception { + localServer(autocreate); + + String ADDRESS_NAME = getName() + protocol + "_Topic"; + final String dlqText = "This should be in DLQ " + RandomUtil.randomString(); + + if (!autocreate) { + server.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST)); + } + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + connection.setClientID("client"); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic destination = session.createTopic(ADDRESS_NAME); + + TopicSubscriber consumer = session.createDurableSubscriber(destination, "subs1"); + + MessageProducer producer = session.createProducer(destination); + producer.send(session.createTextMessage("hello")); + session.commit(); + connection.start(); + + logger.debug("Sending hello message"); + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("hello", message.getText()); + + consumer.close(); + + session.commit(); + + Bindings bindings = server.getPostOffice().lookupBindingsForAddress(SimpleString.toSimpleString(ADDRESS_NAME)); + for (Binding b : bindings.getBindings()) { + if (b instanceof LocalQueueBinding) { + Wait.assertEquals(0, () -> ((LocalQueueBinding)b).getQueue().getConsumerCount()); + server.destroyQueue(b.getUniqueName()); + } + } + + producer.send(session.createTextMessage(dlqText)); + session.commit(); + + server.removeAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME), null); + + try { + logger.debug("Sending good bye message"); + producer.send(session.createTextMessage("good bye")); + logger.debug("Exception was not captured, sent went fine"); + if (!autocreate) { + session.commit(); + Assert.fail("Exception was expected"); + } else { + session.rollback(); + } + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + + logger.debug("creating consumer"); + try (TopicSubscriber newSubs = session.createDurableSubscriber(destination, "second")) { + if (!autocreate) { + Assert.fail("exception was expected"); + } + } catch (Exception expected) { + logger.debug(expected.getMessage(), expected); + } + + org.apache.activemq.artemis.core.server.Queue dlqServerQueue = server.locateQueue("DLQ"); + Assert.assertEquals(1, dlqServerQueue.getMessageCount()); + } + + try (Connection connection = factory.createConnection()) { + connection.setClientID("client"); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("DLQ")); + TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000); + Assert.assertNotNull(dlqMessage); + Assert.assertEquals(dlqText, dlqMessage.getText()); + Assert.assertNull(dlqConsumer.receiveNoWait()); + } + + + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index b1e96713c5..5914d62b3a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -272,6 +272,7 @@ public class LargeMessageTest extends LargeMessageTestBase { Message clientFile = createLargeClientMessageStreaming(session, messageSize, true); + logger.debug("****** Send message"); producer.send(clientFile); session.commit(); @@ -292,7 +293,7 @@ public class LargeMessageTest extends LargeMessageTestBase { msg1.getBodyBuffer().readByte(); Assert.fail("Exception was expected"); } catch (final Exception ignored) { - // empty on purpose + logger.debug(ignored.getMessage(), ignored); } session.close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 222061123b..576d5499ba 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -636,6 +636,7 @@ public class AddressControlTest extends ManagementTestBase { session.createAddress(address, RoutingType.ANYCAST, false); AddressControl addressControl = createManagementControl(address); + Assert.assertNotNull(addressControl); assertEquals(0, addressControl.getMessageCount()); ClientProducer producer = session.createProducer(address.toString());