diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index dac8e57ca6..86c808c095 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -28,6 +28,7 @@ import javax.jms.TopicSubscriber; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -220,6 +221,9 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE || coreMessage.getType() == ActiveMQObjectMessage.TYPE; + if (coreMessage.getRoutingType() == null) { + coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST); + } if (session.isEnable1xPrefixes()) { jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, needSession ? coreSession : null, options); } else { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 66e1d24a7f..a122d7b098 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1097,13 +1097,13 @@ public class AMQPMessage extends RefCountMessage { Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE); if (routingType != null) { - return RoutingType.getType((byte) routingType); + return RoutingType.getType(((Number) routingType).byteValue()); } else { routingType = getMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION); if (routingType != null) { - if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) { + if (AMQPMessageSupport.QUEUE_TYPE == ((Number) routingType).byteValue() || AMQPMessageSupport.TEMP_QUEUE_TYPE == ((Number) routingType).byteValue()) { return RoutingType.ANYCAST; - } else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) { + } else if (AMQPMessageSupport.TOPIC_TYPE == ((Number) routingType).byteValue() || AMQPMessageSupport.TEMP_TOPIC_TYPE == ((Number) routingType).byteValue()) { return RoutingType.MULTICAST; } } else { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 17ec6a2ac0..8e854abc08 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -224,6 +224,7 @@ public class AmqpCoreConverter { result.getInnerMessage().setDurable(message.isDurable()); result.getInnerMessage().setPriority(message.getPriority()); result.getInnerMessage().setAddress(message.getAddressSimpleString()); + result.getInnerMessage().setRoutingType(message.getRoutingType()); result.encode(); return result.getInnerMessage(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java index fcce0abc12..33a7371e82 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import javax.jms.Connection; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -31,6 +32,11 @@ import org.junit.Test; public class AmqpMessageRoutingTest extends JMSClientTestSupport { + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + @Override protected boolean isAutoCreateQueues() { return false; @@ -159,4 +165,67 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport { connection.close(); } } + + + @Test(timeout = 60000) + public void testAMQPRouteMessageToJMSOpenWire() throws Throwable { + testAMQPRouteMessageToJMS(createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testAMQPRouteMessageToJMSAMQP() throws Throwable { + testAMQPRouteMessageToJMS(createConnection()); + } + + @Test(timeout = 60000) + public void testAMQPRouteMessageToJMSCore() throws Throwable { + testAMQPRouteMessageToJMS(createCoreConnection()); + } + + private void testAMQPRouteMessageToJMS(Connection connection) throws Exception { + final String addressA = "addressA"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, addressA, RoutingType.ANYCAST.toString()); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Topic topic = session.createTopic(addressA); + javax.jms.Queue queue = session.createQueue(addressA); + + MessageConsumer queueConsumer = session.createConsumer(queue); + MessageConsumer topicConsumer = session.createConsumer(topic); + + sendMessages(addressA, 1, RoutingType.MULTICAST); + + Message topicMessage = topicConsumer.receive(1000); + assertNotNull(topicMessage); + assertEquals(addressA, ((javax.jms.Topic) topicMessage.getJMSDestination()).getTopicName()); + + assertNull(queueConsumer.receiveNoWait()); + + + sendMessages(addressA, 1, RoutingType.ANYCAST); + + Message queueMessage = queueConsumer.receive(1000); + assertNotNull(queueMessage); + assertEquals(addressA, ((javax.jms.Queue) queueMessage.getJMSDestination()).getQueueName()); + + assertNull(topicConsumer.receiveNoWait()); + + + sendMessages(addressA, 1, null); + Message queueMessage2 = queueConsumer.receive(1000); + assertNotNull(queueMessage2); + assertEquals(addressA, ((javax.jms.Queue) queueMessage2.getJMSDestination()).getQueueName()); + + Message topicMessage2 = topicConsumer.receive(1000); + assertNotNull(topicMessage2); + assertEquals(addressA, ((javax.jms.Topic) topicMessage2.getJMSDestination()).getTopicName()); + + } finally { + connection.close(); + } + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java index c373e29fd5..2a62f05d0a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java @@ -471,8 +471,8 @@ public class JMSMessageTypesTest extends JMSClientTestSupport { producer.send(message); consumerConnection.start(); - Session consumerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue consumerQueue = session.createQueue(getQueueName()); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = consumerSession.createQueue(getQueueName()); MessageConsumer messageConsumer = consumerSession.createConsumer(consumerQueue); TextMessage received = (TextMessage) messageConsumer.receive(5000); Assert.assertNotNull(received);