From 548735f8b63ee07399ebd84b858f0fa808c4102d Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 8 Apr 2015 13:27:55 +0100 Subject: [PATCH] Add Auto JMS queue creation for OpenWire --- .../protocol/openwire/OpenWireConnection.java | 17 ++++++ .../core/protocol/openwire/OpenWireUtil.java | 13 +++-- .../protocol/openwire/amq/AMQProducer.java | 6 ++- .../protocol/openwire/amq/AMQSession.java | 5 ++ .../openwire/SimpleOpenWireTest.java | 53 ++++++++++++++++--- 5 files changed, 82 insertions(+), 12 deletions(-) diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java index db14e0a80a..4130d6e37b 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java @@ -74,6 +74,7 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.core.server.QueueQueryResult; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.ConnectionState; @@ -1402,6 +1403,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor if (producerExchange.canDispatch(messageSend)) { + if (messageSend.getDestination().isQueue()) + { + SimpleString queueName = OpenWireUtil.toCoreAddress(messageSend.getDestination()); + autoCreateQueueIfPossible(queueName, session); + } + SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); if (result.isBlockNextSend()) { @@ -1451,6 +1458,15 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor return resp; } + public void autoCreateQueueIfPossible(SimpleString queueName, AMQSession session) throws Exception + { + QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName); + if (result.isAutoCreateJmsQueues() && !result.isExists()) + { + session.getCoreServer().createQueue(queueName, queueName, null, false, false, true); + } + } + private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { AMQProducerBrokerExchange result = producerExchanges.get(id); @@ -1785,4 +1801,5 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { return this.state.getContext(); } + } diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java index e0b9872ac8..6163dfe306 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java @@ -58,12 +58,15 @@ public class OpenWireUtil */ public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception { - AMQServerSession coreSession = amqSession.getCoreSession(); - SimpleString physicalName = OpenWireUtil.toCoreAddress(destination); - BindingQueryResult result = coreSession.executeBindingQuery(physicalName); - if (!result.isExists()) + if (destination.isQueue()) { - throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); + AMQServerSession coreSession = amqSession.getCoreSession(); + SimpleString physicalName = OpenWireUtil.toCoreAddress(destination); + BindingQueryResult result = coreSession.executeBindingQuery(physicalName); + if (!result.isExists() && !result.isAutoCreateJmsQueues()) + { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); + } } } diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java index 06f7da7943..89cfd85888 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java @@ -32,6 +32,10 @@ public class AMQProducer public void init() throws Exception { - OpenWireUtil.validateDestination(info.getDestination(), amqSession); + // If the destination is specified check that it exists. + if (info.getDestination() != null) + { + OpenWireUtil.validateDestination(info.getDestination(), amqSession); + } } } diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java index 1d7740e681..54dc8cbc5a 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java @@ -140,6 +140,11 @@ public class AMQSession implements SessionCallback for (ActiveMQDestination d : dests) { + if (d.isQueue()) + { + SimpleString queueName = OpenWireUtil.toCoreAddress(d); + connection.autoCreateQueueIfPossible(queueName, this); + } AMQConsumer consumer = new AMQConsumer(this, d, info); consumer.init(); consumers.put(consumer.getNativeId(), consumer); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java index 2adc74ba3e..6b88682f01 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java @@ -31,6 +31,7 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.core.settings.impl.AddressSettings; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -230,9 +231,14 @@ public class SimpleOpenWireTest extends BasicOpenWireTest @Test public void testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws Exception { + AddressSettings addressSetting = new AddressSettings(); + addressSetting.setAutoCreateJmsQueues(false); + + server.getAddressSettingsRepository().addMatch("jms.queue.foo", addressSetting); + connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("fake.queue"); + Queue queue = session.createQueue("foo"); thrown.expect(InvalidDestinationException.class); thrown.expect(JMSException.class); @@ -241,15 +247,50 @@ public class SimpleOpenWireTest extends BasicOpenWireTest } @Test - public void testInvalidDestinationExceptionWhenNoTopicExistsOnCreateProducer() throws Exception + public void testAutoDestinationCreationOnProducerSend() throws JMSException { + AddressSettings addressSetting = new AddressSettings(); + addressSetting.setAutoCreateJmsQueues(true); + + String address = "foo"; + server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting); + connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic("fake.queue"); - thrown.expect(InvalidDestinationException.class); - session.createProducer(destination); - session.close(); + TextMessage message = session.createTextMessage("bar"); + Queue queue = new ActiveMQQueue(address); + + MessageProducer producer = session.createProducer(null); + producer.send(queue, message); + + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message1 = (TextMessage) consumer.receive(1000); + assertTrue(message1.getText().equals(message.getText())); + } + + @Test + public void testAutoDestinationCreationOnConsumer() throws JMSException + { + AddressSettings addressSetting = new AddressSettings(); + addressSetting.setAutoCreateJmsQueues(true); + + String address = "foo"; + server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting); + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TextMessage message = session.createTextMessage("bar"); + Queue queue = new ActiveMQQueue(address); + + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = session.createProducer(null); + producer.send(queue, message); + + TextMessage message1 = (TextMessage) consumer.receive(1000); + assertTrue(message1.getText().equals(message.getText())); } /**