From 548747c71dbe5defdad2af057abaffe5747eb8fa Mon Sep 17 00:00:00 2001 From: AntonRoskvist Date: Thu, 12 May 2022 19:56:19 +0200 Subject: [PATCH] ARTEMIS-3827 anon OpenWire producer may lose sent msg --- .../protocol/openwire/OpenWireConnection.java | 5 +- .../protocol/openwire/amq/AMQSession.java | 5 ++ .../amq/ProducerAutoCreateQueueTest.java | 53 +++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index f033d47159..8a1b89c3f7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -368,7 +368,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se setLastCommand(command); response = command.visit(commandProcessorInstance); } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn("Errors occurred during the buffering operation ", e); + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); if (responseRequired) { response = convertException(e); } @@ -1683,6 +1683,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } catch (Exception e) { if (tx != null) { tx.markAsRollbackOnly(new ActiveMQException(e.getMessage())); + } else if (e instanceof ActiveMQNonExistentQueueException && producerInfo.getDestination() == null) { + //Send exception for non transacted anonymous producers using an incorrect destination + sendException(e); } throw e; } finally { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 3c4b29f2e0..16a75d090d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; @@ -250,6 +251,10 @@ public class AMQSession implements SessionCallback { coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter)); connection.addKnownDestination(queueName); } else { + if (server.getAddressInfo(queueName) == null) { + //Address does not exist and will not get autocreated + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); + } hasQueue = false; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java index b9207af9d4..4fe4e8a340 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerAutoCreateQueueTest.java @@ -21,8 +21,11 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.command.ActiveMQDestination; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import javax.jms.Connection; import javax.jms.MessageConsumer; @@ -144,4 +147,54 @@ public class ProducerAutoCreateQueueTest extends BasicOpenWireTest { Wait.assertTrue(() -> server.locateQueue(new SimpleString("trash")) != null); Wait.assertTrue(() -> server.getAddressInfo(new SimpleString("trash")) != null); } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test() + public void testSendFailsWithoutAutoCreate() throws Exception { + thrown.expect(javax.jms.JMSException.class); + + Connection connection = null; + try { + AddressSettings setting = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false); + server.getAddressSettingsRepository().addMatch("WRONG.#", setting); + + connection = factory.createConnection("admin", "password"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = ActiveMQDestination.createDestination("WRONG.QUEUE", ActiveMQDestination.QUEUE_TYPE); + + final MessageProducer producer = session.createProducer(null); + producer.send(destination, session.createTextMessage("foo")); + + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test() + public void testTransactedSendFailsWithoutAutoCreate() throws Exception { + thrown.expect(javax.jms.JMSException.class); + + Connection connection = null; + try { + AddressSettings setting = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false); + server.getAddressSettingsRepository().addMatch("WRONG.#", setting); + + connection = factory.createConnection("admin", "password"); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = ActiveMQDestination.createDestination("WRONG.QUEUE", ActiveMQDestination.QUEUE_TYPE); + + final MessageProducer producer = session.createProducer(null); + producer.send(destination, session.createTextMessage("foo")); + session.commit(); + + } finally { + if (connection != null) { + connection.close(); + } + } + } }