Send error on create producer when no dest exists

Returns an error to the client causing InvalidDestinationException to be
thrown when an ActiveMQ 5.x client attempts to create a producer with a
destination that does not exist.  (Over OpenWire Protocol).
This commit is contained in:
Martyn Taylor 2015-03-11 17:37:06 +00:00
parent 7c3e5d1c0e
commit 77921956c5
6 changed files with 81 additions and 7 deletions

View File

@ -31,12 +31,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException; import javax.jms.JMSSecurityException;
import javax.jms.ResourceAllocationException; import javax.jms.ResourceAllocationException;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.api.core.ActiveMQBuffers;
import org.apache.activemq.api.core.ActiveMQException; import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.api.core.ActiveMQSecurityException; import org.apache.activemq.api.core.ActiveMQSecurityException;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -1242,8 +1244,27 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
@Override @Override
public Response processAddProducer(ProducerInfo info) throws Exception public Response processAddProducer(ProducerInfo info) throws Exception
{ {
protocolManager.addProducer(this, info); Response resp = null;
return null; try
{
protocolManager.addProducer(this, info);
}
catch (Exception e)
{
if (e instanceof ActiveMQSecurityException)
{
resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
}
else if (e instanceof ActiveMQNonExistentQueueException)
{
resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
}
else
{
resp = new ExceptionResponse(e);
}
}
return resp;
} }
@Override @Override

View File

@ -440,7 +440,7 @@ public class OpenWireProtocolManager implements ProtocolManager
return false; return false;
} }
public void addProducer(OpenWireConnection theConn, ProducerInfo info) public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception
{ {
SessionId sessionId = info.getProducerId().getParentId(); SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();

View File

@ -20,6 +20,10 @@ package org.apache.activemq.core.protocol.openwire;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.api.core.ActiveMQBuffers;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.core.protocol.openwire.amq.AMQServerSession;
import org.apache.activemq.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.core.server.ActiveMQMessageBundle;
import org.apache.activemq.core.server.BindingQueryResult;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.SimpleString;
@ -47,6 +51,22 @@ public class OpenWireUtil
} }
} }
/**
* Checks to see if this destination exists. If it does not throw an invalid destination exception.
* @param destination
* @param amqSession
*/
public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception
{
AMQServerSession coreSession = amqSession.getCoreSession();
SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
if (!result.isExists())
{
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
}
}
/* /*
*This util converts amq wildcards to compatible core wildcards *This util converts amq wildcards to compatible core wildcards
*The conversion is like this: *The conversion is like this:

View File

@ -17,6 +17,7 @@
package org.apache.activemq.core.protocol.openwire.amq; package org.apache.activemq.core.protocol.openwire.amq;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.core.protocol.openwire.OpenWireUtil;
public class AMQProducer public class AMQProducer
{ {
@ -29,9 +30,8 @@ public class AMQProducer
this.info = info; this.info = info;
} }
public void init() public void init() throws Exception
{ {
//activemq doesn't have producer at server. OpenWireUtil.validateDestination(info.getDestination(), amqSession);
} }
} }

View File

@ -245,7 +245,7 @@ public class AMQSession implements SessionCallback
AMQConsumer consumer = consumers.remove(nativeId); AMQConsumer consumer = consumers.remove(nativeId);
} }
public void createProducer(ProducerInfo info) public void createProducer(ProducerInfo info) throws Exception
{ {
AMQProducer producer = new AMQProducer(this, info); AMQProducer producer = new AMQProducer(this, info);
producer.init(); producer.init();

View File

@ -18,6 +18,8 @@ package org.apache.activemq.tests.integration.openwire;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
@ -25,15 +27,21 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue; import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic; import javax.jms.TemporaryTopic;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
public class SimpleOpenWireTest extends BasicOpenWireTest public class SimpleOpenWireTest extends BasicOpenWireTest
{ {
@Rule
public ExpectedException thrown= ExpectedException.none();
@Override @Override
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -220,6 +228,31 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
session.close(); session.close();
} }
@Test
public void testInvalidDestinationExceptionWhenNoQueueExistsOnCreateProducer() throws Exception
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("fake.queue");
thrown.expect(InvalidDestinationException.class);
thrown.expect(JMSException.class);
session.createProducer(queue);
session.close();
}
@Test
public void testInvalidDestinationExceptionWhenNoTopicExistsOnCreateProducer() throws Exception
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("fake.queue");
thrown.expect(InvalidDestinationException.class);
session.createProducer(destination);
session.close();
}
/** /**
* This is the example shipped with the distribution * This is the example shipped with the distribution
* @throws Exception * @throws Exception