diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java index 1f7ec86ee5..db14e0a80a 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java @@ -31,12 +31,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.jms.InvalidDestinationException; import javax.jms.JMSSecurityException; import javax.jms.ResourceAllocationException; import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.api.core.ActiveMQSecurityException; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -1242,8 +1244,27 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor @Override public Response processAddProducer(ProducerInfo info) throws Exception { - protocolManager.addProducer(this, info); - return null; + Response resp = null; + try + { + protocolManager.addProducer(this, info); + } + catch (Exception e) + { + if (e instanceof ActiveMQSecurityException) + { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else if (e instanceof ActiveMQNonExistentQueueException) + { + resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); + } + else + { + resp = new ExceptionResponse(e); + } + } + return resp; } @Override diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java index 363d092518..dfa4e4a8fa 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireProtocolManager.java @@ -440,7 +440,7 @@ public class OpenWireProtocolManager implements ProtocolManager return false; } - public void addProducer(OpenWireConnection theConn, ProducerInfo info) + public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java index 1824447b6d..e0b9872ac8 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireUtil.java @@ -20,6 +20,10 @@ package org.apache.activemq.core.protocol.openwire; import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.core.protocol.openwire.amq.AMQServerSession; +import org.apache.activemq.core.protocol.openwire.amq.AMQSession; +import org.apache.activemq.core.server.ActiveMQMessageBundle; +import org.apache.activemq.core.server.BindingQueryResult; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.api.core.SimpleString; @@ -47,6 +51,22 @@ public class OpenWireUtil } } + /** + * Checks to see if this destination exists. If it does not throw an invalid destination exception. + * @param destination + * @param amqSession + */ + public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception + { + AMQServerSession coreSession = amqSession.getCoreSession(); + SimpleString physicalName = OpenWireUtil.toCoreAddress(destination); + BindingQueryResult result = coreSession.executeBindingQuery(physicalName); + if (!result.isExists()) + { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); + } + } + /* *This util converts amq wildcards to compatible core wildcards *The conversion is like this: diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java index bdf486cb3e..06f7da7943 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQProducer.java @@ -17,6 +17,7 @@ package org.apache.activemq.core.protocol.openwire.amq; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.core.protocol.openwire.OpenWireUtil; public class AMQProducer { @@ -29,9 +30,8 @@ public class AMQProducer this.info = info; } - public void init() + public void init() throws Exception { - //activemq doesn't have producer at server. + OpenWireUtil.validateDestination(info.getDestination(), amqSession); } - } diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java index d7fdce5c8b..1d7740e681 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java @@ -245,7 +245,7 @@ public class AMQSession implements SessionCallback AMQConsumer consumer = consumers.remove(nativeId); } - public void createProducer(ProducerInfo info) + public void createProducer(ProducerInfo info) throws Exception { AMQProducer producer = new AMQProducer(this, info); producer.init(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java index a767bda789..87b962c792 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.tests.integration.openwire; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -25,15 +27,21 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class SimpleOpenWireTest extends BasicOpenWireTest { + @Rule + public ExpectedException thrown= ExpectedException.none(); + @Override @Before public void setUp() throws Exception @@ -220,6 +228,31 @@ public class SimpleOpenWireTest extends BasicOpenWireTest session.close(); } + @Test + public void testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("fake.queue"); + + thrown.expect(InvalidDestinationException.class); + thrown.expect(JMSException.class); + session.createProducer(queue); + session.close(); + } + + @Test + public void testInvalidDestinationExceptionWhenNoTopicExistsOnCreateProducer() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic("fake.queue"); + + thrown.expect(InvalidDestinationException.class); + session.createProducer(destination); + session.close(); + } + /** * This is the example shipped with the distribution * @throws Exception