This closes #3031
This commit is contained in:
commit
33e841cd7a
|
@ -264,7 +264,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
SimpleString queueName,
|
||||
SimpleString filter) throws Exception {
|
||||
try {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false);
|
||||
serverSession.createSharedQueue(address, queueName, routingType, filter, true, -1, false, false, false);
|
||||
} catch (ActiveMQSecurityException se) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
|
||||
}
|
||||
|
@ -275,7 +275,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
SimpleString queueName,
|
||||
SimpleString filter) throws Exception {
|
||||
try {
|
||||
serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
|
||||
serverSession.createSharedQueue(address, queueName, routingType, filter, false, -1, false, false, false);
|
||||
} catch (ActiveMQSecurityException se) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
|
|||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
|
@ -29,6 +28,8 @@ import javax.jms.Topic;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -58,7 +59,11 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
|
|||
return "AMQP,OPENWIRE,CORE";
|
||||
}
|
||||
|
||||
private void testSharedConsumer(Connection connection1, Connection connection2) throws JMSException {
|
||||
private void testSharedConsumer(Connection connection1, Connection connection2) throws Exception {
|
||||
testSharedConsumer(connection1, connection2, false);
|
||||
}
|
||||
|
||||
private void testSharedConsumer(Connection connection1, Connection connection2, boolean amqpQueueName) throws Exception {
|
||||
try {
|
||||
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -89,6 +94,13 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
|
|||
}
|
||||
assertNotNull("Should have received a message by now.", received);
|
||||
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
|
||||
|
||||
String consumerQueueName = "nonDurable.SharedConsumer";
|
||||
if (amqpQueueName) {
|
||||
consumerQueueName = "SharedConsumer:shared-volatile:global";
|
||||
}
|
||||
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(consumerQueueName));
|
||||
assertTrue(queueBinding.getQueue().isTemporary());
|
||||
} finally {
|
||||
connection1.close();
|
||||
connection2.close();
|
||||
|
@ -100,7 +112,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
|
|||
Connection connection = createConnection(); //AMQP
|
||||
Connection connection2 = createConnection(); //AMQP
|
||||
|
||||
testSharedConsumer(connection, connection2);
|
||||
testSharedConsumer(connection, connection2, !amqpUseCoreSubscriptionNaming);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
|
|
Loading…
Reference in New Issue