Add Auto JMS queue creation for OpenWire

This commit is contained in:
Martyn Taylor 2015-04-08 13:27:55 +01:00
parent f8a25d4f7c
commit 548735f8b6
5 changed files with 82 additions and 12 deletions

View File

@ -74,6 +74,7 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.core.server.QueueQueryResult;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
@ -1402,6 +1403,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
if (producerExchange.canDispatch(messageSend))
{
if (messageSend.getDestination().isQueue())
{
SimpleString queueName = OpenWireUtil.toCoreAddress(messageSend.getDestination());
autoCreateQueueIfPossible(queueName, session);
}
SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
if (result.isBlockNextSend())
{
@ -1451,6 +1458,15 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
return resp;
}
public void autoCreateQueueIfPossible(SimpleString queueName, AMQSession session) throws Exception
{
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isAutoCreateJmsQueues() && !result.isExists())
{
session.getCoreServer().createQueue(queueName, queueName, null, false, false, true);
}
}
private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException
{
AMQProducerBrokerExchange result = producerExchanges.get(id);
@ -1785,4 +1801,5 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
{
return this.state.getContext();
}
}

View File

@ -57,15 +57,18 @@ public class OpenWireUtil
* @param amqSession
*/
public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception
{
if (destination.isQueue())
{
AMQServerSession coreSession = amqSession.getCoreSession();
SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
if (!result.isExists())
if (!result.isExists() && !result.isAutoCreateJmsQueues())
{
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
}
}
}
/*
*This util converts amq wildcards to compatible core wildcards

View File

@ -31,7 +31,11 @@ public class AMQProducer
}
public void init() throws Exception
{
// If the destination is specified check that it exists.
if (info.getDestination() != null)
{
OpenWireUtil.validateDestination(info.getDestination(), amqSession);
}
}
}

View File

@ -140,6 +140,11 @@ public class AMQSession implements SessionCallback
for (ActiveMQDestination d : dests)
{
if (d.isQueue())
{
SimpleString queueName = OpenWireUtil.toCoreAddress(d);
connection.autoCreateQueueIfPossible(queueName, this);
}
AMQConsumer consumer = new AMQConsumer(this, d, info);
consumer.init();
consumers.put(consumer.getNativeId(), consumer);

View File

@ -31,6 +31,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -230,9 +231,14 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
@Test
public void testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws Exception
{
AddressSettings addressSetting = new AddressSettings();
addressSetting.setAutoCreateJmsQueues(false);
server.getAddressSettingsRepository().addMatch("jms.queue.foo", addressSetting);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("fake.queue");
Queue queue = session.createQueue("foo");
thrown.expect(InvalidDestinationException.class);
thrown.expect(JMSException.class);
@ -241,15 +247,50 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
}
@Test
public void testInvalidDestinationExceptionWhenNoTopicExistsOnCreateProducer() throws Exception
public void testAutoDestinationCreationOnProducerSend() throws JMSException
{
AddressSettings addressSetting = new AddressSettings();
addressSetting.setAutoCreateJmsQueues(true);
String address = "foo";
server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("fake.queue");
thrown.expect(InvalidDestinationException.class);
session.createProducer(destination);
session.close();
TextMessage message = session.createTextMessage("bar");
Queue queue = new ActiveMQQueue(address);
MessageProducer producer = session.createProducer(null);
producer.send(queue, message);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message1 = (TextMessage) consumer.receive(1000);
assertTrue(message1.getText().equals(message.getText()));
}
@Test
public void testAutoDestinationCreationOnConsumer() throws JMSException
{
AddressSettings addressSetting = new AddressSettings();
addressSetting.setAutoCreateJmsQueues(true);
String address = "foo";
server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TextMessage message = session.createTextMessage("bar");
Queue queue = new ActiveMQQueue(address);
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(null);
producer.send(queue, message);
TextMessage message1 = (TextMessage) consumer.receive(1000);
assertTrue(message1.getText().equals(message.getText()));
}
/**