This closes #202 on auto-queue creation
This commit is contained in:
commit
238b2fe094
|
@ -74,6 +74,7 @@ import org.apache.activemq.command.ShutdownInfo;
|
|||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.TransactionInfo;
|
||||
import org.apache.activemq.command.WireFormatInfo;
|
||||
import org.apache.activemq.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.state.CommandVisitor;
|
||||
import org.apache.activemq.state.ConnectionState;
|
||||
|
@ -1402,6 +1403,12 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
|
||||
if (producerExchange.canDispatch(messageSend))
|
||||
{
|
||||
if (messageSend.getDestination().isQueue())
|
||||
{
|
||||
SimpleString queueName = OpenWireUtil.toCoreAddress(messageSend.getDestination());
|
||||
autoCreateQueueIfPossible(queueName, session);
|
||||
}
|
||||
|
||||
SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
|
||||
if (result.isBlockNextSend())
|
||||
{
|
||||
|
@ -1451,6 +1458,15 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
return resp;
|
||||
}
|
||||
|
||||
public void autoCreateQueueIfPossible(SimpleString queueName, AMQSession session) throws Exception
|
||||
{
|
||||
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
|
||||
if (result.isAutoCreateJmsQueues() && !result.isExists())
|
||||
{
|
||||
session.getCoreServer().createQueue(queueName, queueName, null, false, false, true);
|
||||
}
|
||||
}
|
||||
|
||||
private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException
|
||||
{
|
||||
AMQProducerBrokerExchange result = producerExchanges.get(id);
|
||||
|
@ -1785,4 +1801,5 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
{
|
||||
return this.state.getContext();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -58,12 +58,15 @@ public class OpenWireUtil
|
|||
*/
|
||||
public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception
|
||||
{
|
||||
AMQServerSession coreSession = amqSession.getCoreSession();
|
||||
SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
|
||||
BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
|
||||
if (!result.isExists())
|
||||
if (destination.isQueue())
|
||||
{
|
||||
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
|
||||
AMQServerSession coreSession = amqSession.getCoreSession();
|
||||
SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
|
||||
BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
|
||||
if (!result.isExists() && !result.isAutoCreateJmsQueues())
|
||||
{
|
||||
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,10 @@ public class AMQProducer
|
|||
|
||||
public void init() throws Exception
|
||||
{
|
||||
OpenWireUtil.validateDestination(info.getDestination(), amqSession);
|
||||
// If the destination is specified check that it exists.
|
||||
if (info.getDestination() != null)
|
||||
{
|
||||
OpenWireUtil.validateDestination(info.getDestination(), amqSession);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -140,6 +140,11 @@ public class AMQSession implements SessionCallback
|
|||
|
||||
for (ActiveMQDestination d : dests)
|
||||
{
|
||||
if (d.isQueue())
|
||||
{
|
||||
SimpleString queueName = OpenWireUtil.toCoreAddress(d);
|
||||
connection.autoCreateQueueIfPossible(queueName, this);
|
||||
}
|
||||
AMQConsumer consumer = new AMQConsumer(this, d, info);
|
||||
consumer.init();
|
||||
consumers.put(consumer.getNativeId(), consumer);
|
||||
|
|
|
@ -31,6 +31,7 @@ import javax.jms.TextMessage;
|
|||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.core.settings.impl.AddressSettings;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -230,9 +231,14 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
|
|||
@Test
|
||||
public void testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws Exception
|
||||
{
|
||||
AddressSettings addressSetting = new AddressSettings();
|
||||
addressSetting.setAutoCreateJmsQueues(false);
|
||||
|
||||
server.getAddressSettingsRepository().addMatch("jms.queue.foo", addressSetting);
|
||||
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue("fake.queue");
|
||||
Queue queue = session.createQueue("foo");
|
||||
|
||||
thrown.expect(InvalidDestinationException.class);
|
||||
thrown.expect(JMSException.class);
|
||||
|
@ -241,15 +247,50 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidDestinationExceptionWhenNoTopicExistsOnCreateProducer() throws Exception
|
||||
public void testAutoDestinationCreationOnProducerSend() throws JMSException
|
||||
{
|
||||
AddressSettings addressSetting = new AddressSettings();
|
||||
addressSetting.setAutoCreateJmsQueues(true);
|
||||
|
||||
String address = "foo";
|
||||
server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting);
|
||||
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createTopic("fake.queue");
|
||||
|
||||
thrown.expect(InvalidDestinationException.class);
|
||||
session.createProducer(destination);
|
||||
session.close();
|
||||
TextMessage message = session.createTextMessage("bar");
|
||||
Queue queue = new ActiveMQQueue(address);
|
||||
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
producer.send(queue, message);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
TextMessage message1 = (TextMessage) consumer.receive(1000);
|
||||
assertTrue(message1.getText().equals(message.getText()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoDestinationCreationOnConsumer() throws JMSException
|
||||
{
|
||||
AddressSettings addressSetting = new AddressSettings();
|
||||
addressSetting.setAutoCreateJmsQueues(true);
|
||||
|
||||
String address = "foo";
|
||||
server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting);
|
||||
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
TextMessage message = session.createTextMessage("bar");
|
||||
Queue queue = new ActiveMQQueue(address);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
producer.send(queue, message);
|
||||
|
||||
TextMessage message1 = (TextMessage) consumer.receive(1000);
|
||||
assertTrue(message1.getText().equals(message.getText()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue