ARTEMIS-2286 AMQP to Core Conversion doesn't map routing type always

Add test that exhibits the issue when sending AMQP (non JMS) to Artemis that one mapping to Core JMS the destination is not resolving as the RoutingType can be missing.
Add fix.
This commit is contained in:
Michael André Pearce 2019-03-27 15:56:46 +00:00 committed by Clebert Suconic
parent 8b3eeafed0
commit d9b3d0fe4c
5 changed files with 79 additions and 5 deletions

View File

@ -28,6 +28,7 @@ import javax.jms.TopicSubscriber;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@ -220,6 +221,9 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE ||
coreMessage.getType() == ActiveMQObjectMessage.TYPE;
if (coreMessage.getRoutingType() == null) {
coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST);
}
if (session.isEnable1xPrefixes()) {
jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, needSession ? coreSession : null, options);
} else {

View File

@ -1097,13 +1097,13 @@ public class AMQPMessage extends RefCountMessage {
Object routingType = getMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE);
if (routingType != null) {
return RoutingType.getType((byte) routingType);
return RoutingType.getType(((Number) routingType).byteValue());
} else {
routingType = getMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
if (routingType != null) {
if (AMQPMessageSupport.QUEUE_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_QUEUE_TYPE == (byte) routingType) {
if (AMQPMessageSupport.QUEUE_TYPE == ((Number) routingType).byteValue() || AMQPMessageSupport.TEMP_QUEUE_TYPE == ((Number) routingType).byteValue()) {
return RoutingType.ANYCAST;
} else if (AMQPMessageSupport.TOPIC_TYPE == (byte) routingType || AMQPMessageSupport.TEMP_TOPIC_TYPE == (byte) routingType) {
} else if (AMQPMessageSupport.TOPIC_TYPE == ((Number) routingType).byteValue() || AMQPMessageSupport.TEMP_TOPIC_TYPE == ((Number) routingType).byteValue()) {
return RoutingType.MULTICAST;
}
} else {

View File

@ -224,6 +224,7 @@ public class AmqpCoreConverter {
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());
result.getInnerMessage().setRoutingType(message.getRoutingType());
result.encode();
return result.getInnerMessage();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@ -31,6 +32,11 @@ import org.junit.Test;
public class AmqpMessageRoutingTest extends JMSClientTestSupport {
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Override
protected boolean isAutoCreateQueues() {
return false;
@ -159,4 +165,67 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport {
connection.close();
}
}
@Test(timeout = 60000)
public void testAMQPRouteMessageToJMSOpenWire() throws Throwable {
testAMQPRouteMessageToJMS(createOpenWireConnection());
}
@Test(timeout = 60000)
public void testAMQPRouteMessageToJMSAMQP() throws Throwable {
testAMQPRouteMessageToJMS(createConnection());
}
@Test(timeout = 60000)
public void testAMQPRouteMessageToJMSCore() throws Throwable {
testAMQPRouteMessageToJMS(createCoreConnection());
}
private void testAMQPRouteMessageToJMS(Connection connection) throws Exception {
final String addressA = "addressA";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, addressA, RoutingType.ANYCAST.toString());
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = session.createTopic(addressA);
javax.jms.Queue queue = session.createQueue(addressA);
MessageConsumer queueConsumer = session.createConsumer(queue);
MessageConsumer topicConsumer = session.createConsumer(topic);
sendMessages(addressA, 1, RoutingType.MULTICAST);
Message topicMessage = topicConsumer.receive(1000);
assertNotNull(topicMessage);
assertEquals(addressA, ((javax.jms.Topic) topicMessage.getJMSDestination()).getTopicName());
assertNull(queueConsumer.receiveNoWait());
sendMessages(addressA, 1, RoutingType.ANYCAST);
Message queueMessage = queueConsumer.receive(1000);
assertNotNull(queueMessage);
assertEquals(addressA, ((javax.jms.Queue) queueMessage.getJMSDestination()).getQueueName());
assertNull(topicConsumer.receiveNoWait());
sendMessages(addressA, 1, null);
Message queueMessage2 = queueConsumer.receive(1000);
assertNotNull(queueMessage2);
assertEquals(addressA, ((javax.jms.Queue) queueMessage2.getJMSDestination()).getQueueName());
Message topicMessage2 = topicConsumer.receive(1000);
assertNotNull(topicMessage2);
assertEquals(addressA, ((javax.jms.Topic) topicMessage2.getJMSDestination()).getTopicName());
} finally {
connection.close();
}
}
}

View File

@ -471,8 +471,8 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message);
consumerConnection.start();
Session consumerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = session.createQueue(getQueueName());
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = consumerSession.createQueue(getQueueName());
MessageConsumer messageConsumer = consumerSession.createConsumer(consumerQueue);
TextMessage received = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull(received);