ARTEMIS-4089 Check on AutoCreation during routing

(cherry picked from commit 4f79eb42f53071e3606a4848ec72dd164a88e350)
This commit is contained in:
Clebert Suconic 2022-11-11 07:24:15 -05:00
parent 64918caa10
commit b471392872
10 changed files with 465 additions and 74 deletions

View File

@ -20,7 +20,6 @@ import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@ -35,7 +34,6 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
@ -47,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@ -332,45 +329,8 @@ public class AMQPSessionCallback implements SessionCallback {
}
public boolean checkAddressAndAutocreateIfPossible(SimpleString address, RoutingType routingType) throws Exception {
boolean result = false;
SimpleString unPrefixedAddress = serverSession.removePrefix(address);
AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(unPrefixedAddress.toString());
if (routingType == RoutingType.MULTICAST) {
if (manager.getServer().getAddressInfo(unPrefixedAddress) == null) {
if (addressSettings.isAutoCreateAddresses()) {
try {
serverSession.createAddress(address, routingType, true);
} catch (ActiveMQAddressExistsException e) {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
result = true;
}
} else {
result = true;
}
} else if (routingType == RoutingType.ANYCAST) {
if (manager.getServer().locateQueue(unPrefixedAddress) == null) {
Bindings bindings = manager.getServer().getPostOffice().lookupBindingsForAddress(address);
if (bindings != null) {
// this means the address has another queue with a different name, which is fine, we just ignore it on this case
result = true;
} else if (addressSettings.isAutoCreateQueues()) {
try {
serverSession.createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true));
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
result = true;
}
} else {
result = true;
}
}
return result;
return serverSession.checkAutoCreate(address, routingType);
}
public AddressQueryResult addressQuery(SimpleString addressName,
@ -506,7 +466,11 @@ public class AMQPSessionCallback implements SessionCallback {
//here check queue-autocreation
if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
if (transaction != null) {
transaction.markAsRollbackOnly(e);
}
throw e;
}
OperationContext oldcontext = recoverContext();

View File

@ -38,6 +38,7 @@ import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Message;
@ -1154,7 +1155,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
message.clearInternalProperties();
Bindings bindings;
final AddressInfo addressInfo = addressManager.getAddressInfo(address);
final AddressInfo addressInfo = checkAddress(context, address);
final RoutingStatus status;
if (bindingMove != null) {
context.clear();
context.setReusable(false);
@ -1162,18 +1165,28 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
}
} else if ((bindings = addressManager.getBindingsForRoutingAddress(address)) != null) {
bindings.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
}
status = RoutingStatus.OK;
} else {
context.setReusable(false);
if (addressInfo != null) {
addressInfo.incrementUnRoutedMessageCount();
bindings = simpleRoute(address, context, message, addressInfo);
if (logger.isDebugEnabled()) {
if (bindings != null) {
logger.debug("PostOffice::simpleRoute returned bindings with size = {}", bindings.getBindings().size());
} else {
logger.debug("PostOffice::simpleRoute null as bindings");
}
}
if (bindings == null) {
context.setReusable(false);
context.clear();
if (addressInfo != null) {
addressInfo.incrementUnRoutedMessageCount();
}
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
logger.debug("Couldn't find any bindings for address={} on message={}", address, message);
status = RoutingStatus.NO_BINDINGS;
} else {
status = RoutingStatus.OK;
}
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
logger.debug("Couldn't find any bindings for address={} on message={}", address, message);
}
if (server.hasBrokerMessagePlugins()) {
@ -1182,14 +1195,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
logger.trace("Message after routed={}\n{}", message, context);
final RoutingStatus finalStatus;
try {
final RoutingStatus status;
if (context.getQueueCount() == 0) {
status = maybeSendToDLA(message, context, address, sendToDLA);
if ( status == RoutingStatus.NO_BINDINGS) {
finalStatus = maybeSendToDLA(message, context, address, sendToDLA);
} else {
status = RoutingStatus.OK;
finalStatus = status;
try {
processRoute(message, context, direct);
if (context.getQueueCount() > 0) {
processRoute(message, context, direct);
} else {
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}
}
} catch (ActiveMQAddressFullException e) {
if (startedTX) {
context.getTransaction().rollback();
@ -1203,9 +1222,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
context.getTransaction().commit();
}
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status));
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalStatus));
}
return status;
return finalStatus;
} catch (Exception e) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
@ -1214,6 +1233,45 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
}
private AddressInfo checkAddress(RoutingContext context, SimpleString address) throws Exception {
AddressInfo addressInfo = addressManager.getAddressInfo(address);
if (addressInfo == null && context.getServerSession() != null) {
if (context.getServerSession().checkAutoCreate(address, context.getRoutingType())) {
addressInfo = addressManager.getAddressInfo(address);
} else {
ActiveMQException ex = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
if (context.getTransaction() != null) {
context.getTransaction().markAsRollbackOnly(ex);
}
throw ex;
}
}
return addressInfo;
}
Bindings simpleRoute(SimpleString address, RoutingContext context, Message message, AddressInfo addressInfo) throws Exception {
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
if (bindings == null && context.getServerSession() != null) {
if (!context.getServerSession().checkAutoCreate(address, context.getRoutingType())) {
ActiveMQException e = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
Transaction tx = context.getTransaction();
if (tx != null) {
tx.markAsRollbackOnly(e);
}
throw e;
}
bindings = addressManager.getBindingsForRoutingAddress(address);
}
if (bindings != null) {
bindings.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
}
}
return bindings;
}
private RoutingStatus maybeSendToDLA(final Message message,
final RoutingContext context,
final SimpleString address,

View File

@ -103,5 +103,9 @@ public interface RoutingContext {
MessageLoadBalancingType getLoadBalancingType();
RoutingContext setServerSession(ServerSession session);
ServerSession getServerSession();
}

View File

@ -110,6 +110,8 @@ public interface ServerSession extends SecurityAuth {
void addCloseable(Closeable closeable);
boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception;
ServerConsumer createConsumer(long consumerID,
SimpleString queueName,
SimpleString filterString,

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -30,6 +29,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -65,10 +65,10 @@ public class RoutingContextImpl implements RoutingContext {
boolean mirrorDisabled = false;
private final Executor executor;
private boolean duplicateDetection = true;
private ServerSession serverSession;
@Override
public boolean isDuplicateDetection() {
return duplicateDetection;
@ -81,12 +81,7 @@ public class RoutingContextImpl implements RoutingContext {
}
public RoutingContextImpl(final Transaction transaction) {
this(transaction, null);
}
public RoutingContextImpl(final Transaction transaction, Executor executor) {
this.transaction = transaction;
this.executor = executor;
}
@Override
@ -121,7 +116,7 @@ public class RoutingContextImpl implements RoutingContext {
}
@Override
public RoutingContext setReusable(boolean reusable) {
public RoutingContextImpl setReusable(boolean reusable) {
if (this.reusable != null && !this.reusable.booleanValue()) {
// cannot set to Reusable once it was set to false
return this;
@ -131,7 +126,7 @@ public class RoutingContextImpl implements RoutingContext {
return this;
}
@Override
public RoutingContext setReusable(boolean reusable, int previousBindings) {
public RoutingContextImpl setReusable(boolean reusable, int previousBindings) {
this.version = previousBindings;
this.previousAddress = address;
this.previousRoutingType = routingType;
@ -144,7 +139,7 @@ public class RoutingContextImpl implements RoutingContext {
}
@Override
public RoutingContext clear() {
public RoutingContextImpl clear() {
map.clear();
queueCount = 0;
@ -252,7 +247,7 @@ public class RoutingContextImpl implements RoutingContext {
}
@Override
public RoutingContext setRoutingType(RoutingType routingType) {
public RoutingContextImpl setRoutingType(RoutingType routingType) {
if (this.routingType == null && routingType != null || this.routingType != routingType) {
this.clear();
}
@ -313,6 +308,17 @@ public class RoutingContextImpl implements RoutingContext {
return getContextListing(address).getDurableQueues();
}
@Override
public RoutingContextImpl setServerSession(ServerSession session) {
this.serverSession = session;
return this;
}
@Override
public ServerSession getServerSession() {
return serverSession;
}
@Override
public int getQueueCount() {
return queueCount;

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
import java.security.cert.X509Certificate;
@ -169,7 +172,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private final SimpleString managementAddress;
protected final RoutingContext routingContext = new RoutingContextImpl(null);
protected final RoutingContext routingContext = new RoutingContextImpl(null).setServerSession(this);
protected final SessionCallback callback;
@ -1737,6 +1740,55 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return tx;
}
@Override
public boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception {
boolean result;
SimpleString unPrefixedAddress = removePrefix(address);
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(unPrefixedAddress.toString());
if (routingType == RoutingType.MULTICAST) {
if (server.getAddressInfo(unPrefixedAddress) == null) {
if (addressSettings.isAutoCreateAddresses()) {
try {
createAddress(address, routingType, true);
} catch (ActiveMQAddressExistsException e) {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
result = true;
} else {
result = false;
}
} else {
result = true;
}
} else if (routingType == RoutingType.ANYCAST) {
if (server.locateQueue(unPrefixedAddress) == null) {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(address);
if (bindings != null) {
// this means the address has another queue with a different name, which is fine, we just ignore it on this case
result = true;
} else if (addressSettings.isAutoCreateQueues()) {
try {
createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true));
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
result = true;
} else {
result = false;
}
} else {
result = true;
}
} else {
result = true;
}
return result;
}
@Override
public RoutingStatus send(final Message message, final boolean direct) throws Exception {
return send(message, direct, false);
@ -2218,6 +2270,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
result = postOffice.route(msg, routingContext, direct);
logger.debug("Routing result for {} = {}", msg, result);
Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
if (value == null) {
@ -2231,6 +2285,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
routingContext.clear();
}
}
return result;
}

View File

@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -49,9 +50,13 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test(timeout = 60000)
public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception {
AmqpClient client = createAmqpClient();
@ -568,7 +573,11 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
message.setText("Test-Message");
message.setDeliveryAnnotation("shouldDisappear", 1);
message.setMessageAnnotation("x-opt-routing-type", (byte) 1);
logger.debug("*******************************************************************************************************************************");
logger.debug("message being sent {}", message);
sender.send(message);
logger.debug("*******************************************************************************************************************************");
Queue forward = getProxyToQueue(FORWARDING_ADDRESS);
assertTrue("Message not diverted", Wait.waitFor(() -> forward.getMessageCount() > 0, 7000, 500));

View File

@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DeleteAddressTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ActiveMQServer server;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
private void localServer(boolean autoCreate) throws Exception {
server = createServer(false, true);
AddressSettings settings = new AddressSettings().setAutoDeleteAddresses(autoCreate).setAutoCreateAddresses(autoCreate).setAutoCreateQueues(autoCreate).setAutoDeleteQueues(autoCreate).setDeadLetterAddress(SimpleString.toSimpleString("DLQ")).setSendToDLAOnNoRoute(true);
server.start();
server.createQueue(new QueueConfiguration("DLQ").setRoutingType(RoutingType.ANYCAST));
server.getAddressSettingsRepository().addMatch(getName() + "*", settings);
}
@Test
public void testQueueNoAutoCreateCore() throws Exception {
internalQueueTest("CORE", false);
}
@Test
public void testQueueNoAutoCreateAMQP() throws Exception {
internalQueueTest("AMQP", false);
}
@Test
public void testQueueNoAutoCreateOpenWire() throws Exception {
internalQueueTest("OPENWIRE", false);
}
@Test
public void testQueueAutoCreateCore() throws Exception {
internalQueueTest("CORE", true);
}
@Test
public void testDeletoAutoCreateAMQP() throws Exception {
internalQueueTest("AMQP", true);
}
@Test
public void testQueueAutoCreateOpenWire() throws Exception {
internalQueueTest("OPENWIRE", true);
}
public void internalQueueTest(String protocol, boolean autocreate) throws Exception {
localServer(autocreate);
String ADDRESS_NAME = getName() + protocol;
if (!autocreate) {
server.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS_NAME).setRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
}
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ADDRESS_NAME);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
session.commit();
connection.start();
try (MessageConsumer consumer = session.createConsumer(queue)) {
logger.debug("Sending hello message");
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("hello", message.getText());
}
session.commit();
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS_NAME);
Wait.assertEquals(0, serverQueue::getConsumerCount);
server.destroyQueue(SimpleString.toSimpleString(ADDRESS_NAME));
boolean exception = false;
try {
logger.debug("Sending good bye message");
producer.send(session.createTextMessage("good bye"));
session.commit();
logger.debug("Exception was not captured, sent went fine");
} catch (Exception e) {
logger.debug(e.getMessage(), e);
exception = true;
}
if (!autocreate) {
Assert.assertTrue(exception);
}
if (autocreate) {
logger.debug("creating consumer");
try (MessageConsumer consumer = session.createConsumer(queue)) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("good bye", message.getText());
}
} else {
exception = false;
logger.debug("Creating consumer, where an exception is expected");
try (MessageConsumer consumer = session.createConsumer(queue)) {
} catch (Exception e) {
logger.debug("Received exception after createConsumer");
exception = true;
}
Assert.assertTrue(exception);
}
}
org.apache.activemq.artemis.core.server.Queue dlqServerQueue = server.locateQueue("DLQ");
Assert.assertEquals(0, dlqServerQueue.getMessageCount());
}
@Test
public void testTopicNoAutoCreateCore() throws Exception {
internalMulticastTest("CORE", false);
}
@Test
public void testTopicAutoCreateCore() throws Exception {
internalMulticastTest("CORE", true);
}
@Test
public void testTopicNoAutoCreateAMQP() throws Exception {
internalMulticastTest("AMQP", false);
}
@Test
public void testTopicAutoCreateAMQP() throws Exception {
internalMulticastTest("AMQP", true);
}
@Test
public void testTopicNoAutoCreateOPENWIRE() throws Exception {
internalMulticastTest("OPENWIRE", false);
}
@Test
public void testTopicAutoCreateOPENWIRE() throws Exception {
internalMulticastTest("OPENWIRE", true);
}
public void internalMulticastTest(String protocol, boolean autocreate) throws Exception {
localServer(autocreate);
String ADDRESS_NAME = getName() + protocol + "_Topic";
final String dlqText = "This should be in DLQ " + RandomUtil.randomString();
if (!autocreate) {
server.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST));
}
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
connection.setClientID("client");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic destination = session.createTopic(ADDRESS_NAME);
TopicSubscriber consumer = session.createDurableSubscriber(destination, "subs1");
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("hello"));
session.commit();
connection.start();
logger.debug("Sending hello message");
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("hello", message.getText());
consumer.close();
session.commit();
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(SimpleString.toSimpleString(ADDRESS_NAME));
for (Binding b : bindings.getBindings()) {
if (b instanceof LocalQueueBinding) {
Wait.assertEquals(0, () -> ((LocalQueueBinding)b).getQueue().getConsumerCount());
server.destroyQueue(b.getUniqueName());
}
}
producer.send(session.createTextMessage(dlqText));
session.commit();
server.removeAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME), null);
try {
logger.debug("Sending good bye message");
producer.send(session.createTextMessage("good bye"));
logger.debug("Exception was not captured, sent went fine");
if (!autocreate) {
session.commit();
Assert.fail("Exception was expected");
} else {
session.rollback();
}
} catch (Exception e) {
logger.debug(e.getMessage(), e);
}
logger.debug("creating consumer");
try (TopicSubscriber newSubs = session.createDurableSubscriber(destination, "second")) {
if (!autocreate) {
Assert.fail("exception was expected");
}
} catch (Exception expected) {
logger.debug(expected.getMessage(), expected);
}
org.apache.activemq.artemis.core.server.Queue dlqServerQueue = server.locateQueue("DLQ");
Assert.assertEquals(1, dlqServerQueue.getMessageCount());
}
try (Connection connection = factory.createConnection()) {
connection.setClientID("client");
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("DLQ"));
TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
Assert.assertNotNull(dlqMessage);
Assert.assertEquals(dlqText, dlqMessage.getText());
Assert.assertNull(dlqConsumer.receiveNoWait());
}
}
}

View File

@ -272,6 +272,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
logger.debug("****** Send message");
producer.send(clientFile);
session.commit();
@ -292,7 +293,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
msg1.getBodyBuffer().readByte();
Assert.fail("Exception was expected");
} catch (final Exception ignored) {
// empty on purpose
logger.debug(ignored.getMessage(), ignored);
}
session.close();

View File

@ -636,6 +636,7 @@ public class AddressControlTest extends ManagementTestBase {
session.createAddress(address, RoutingType.ANYCAST, false);
AddressControl addressControl = createManagementControl(address);
Assert.assertNotNull(addressControl);
assertEquals(0, addressControl.getMessageCount());
ClientProducer producer = session.createProducer(address.toString());