diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 56097aee16..9cd3fa744c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -168,7 +168,13 @@ public interface Message { // only on core } - RoutingType getRouteType(); + default RoutingType getRoutingType() { + return null; + } + + default Message setRoutingType(RoutingType routingType) { + return this; + } default SimpleString getLastValueProperty() { return null; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index f0a8715fdc..8f24cc05bc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -154,13 +154,23 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override - public RoutingType getRouteType() { + public RoutingType getRoutingType() { if (containsProperty(Message.HDR_ROUTING_TYPE)) { return RoutingType.getType(getByteProperty(Message.HDR_ROUTING_TYPE)); } return null; } + @Override + public Message setRoutingType(RoutingType routingType) { + if (routingType == null) { + removeProperty(Message.HDR_ROUTING_TYPE); + } else { + putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType()); + } + return this; + } + @Override public CoreMessage setReplyTo(SimpleString address) { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index 286bc57538..64c8f16fc7 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -399,13 +399,10 @@ public class ActiveMQMessage implements javax.jms.Message { if (dest == null) { SimpleString address = message.getAddressSimpleString(); String prefix = ""; - if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) { - RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)); - if (routingType.equals(RoutingType.ANYCAST)) { - prefix = QUEUE_QUALIFIED_PREFIX; - } else if (routingType.equals(RoutingType.MULTICAST)) { - prefix = TOPIC_QUALIFIED_PREFIX; - } + if (RoutingType.ANYCAST.equals(message.getRoutingType())) { + prefix = QUEUE_QUALIFIED_PREFIX; + } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) { + prefix = TOPIC_QUALIFIED_PREFIX; } dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString()); diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 74dd39acc0..3121a88333 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -494,8 +494,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage(); coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID); - byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType(); - coreMessage.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, routingType); + coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST); try { /** diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index d02eaced38..522ae1663a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -250,22 +250,24 @@ public class AMQPMessage extends RefCountMessage { } @Override - public RoutingType getRouteType() { + public RoutingType getRoutingType() { + Object routingType = getSymbol(AMQPMessageSupport.ROUTING_TYPE); - /* TODO-now How to use this properly - switch (((Byte)type).byteValue()) { - case AMQPMessageSupport.QUEUE_TYPE: - case AMQPMessageSupport.TEMP_QUEUE_TYPE: - return RoutingType.ANYCAST; + if (routingType != null) { + return RoutingType.getType((byte) routingType); + } else { + return null; + } + } - case AMQPMessageSupport.TOPIC_TYPE: - case AMQPMessageSupport.TEMP_TOPIC_TYPE: - return RoutingType.MULTICAST; - default: - return null; - } */ - - return null; + @Override + public org.apache.activemq.artemis.api.core.Message setRoutingType(RoutingType routingType) { + parseHeaders(); + if (routingType == null) { + removeSymbol(AMQPMessageSupport.ROUTING_TYPE); + } + setSymbol(AMQPMessageSupport.ROUTING_TYPE, routingType.getType()); + return this; } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java index 0dd54dbe70..da2f4e0f96 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -64,6 +64,11 @@ public final class AMQPMessageSupport { */ public static final Symbol JMS_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time"); + /** + * Attribute used to mark the Application defined delivery time assigned to the message + */ + public static final Symbol ROUTING_TYPE = Symbol.getSymbol("x-opt-routing-type"); + /** * Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message * which has no body. diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java index 3bd95f4aa5..d28eda4b7e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java @@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessageListener; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.Persister; @@ -42,11 +41,6 @@ public class OpenwireMessage implements Message { } - @Override - public RoutingType getRouteType() { - return null; - } - @Override public SimpleString getReplyTo() { return null; diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index b5d2c86724..7a8ed3bf26 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -329,9 +329,9 @@ public class AMQSession implements SessionCallback { if (actualDestinations[i].isQueue()) { checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary()); - coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); + coreMsg.setRoutingType(RoutingType.ANYCAST); } else { - coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + coreMsg.setRoutingType(RoutingType.MULTICAST); } PagingStore store = server.getPagingManager().getPageStore(address); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 3f68c6fc7d..c5fc8f1ad5 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -180,7 +180,7 @@ public abstract class VersionedStompFrameHandler { CoreMessage message = connection.createServerMessage(); if (routingType != null) { - message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType()); + message.setRoutingType(routingType); } message.setTimestamp(timestamp); message.setAddress(SimpleString.toSimpleString(destination)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index c73fd80a6c..124a43dfbd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -108,13 +108,13 @@ public class DivertImpl implements Divert { switch (routingType) { case ANYCAST: - copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); + copy.setRoutingType(RoutingType.ANYCAST); break; case MULTICAST: - copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + copy.setRoutingType(RoutingType.MULTICAST); break; case STRIP: - copy.removeProperty(Message.HDR_ROUTING_TYPE); + copy.setRoutingType(null); break; case PASS: break; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 97a42490d6..ae4c16ed7a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1619,7 +1619,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { RoutingStatus result = RoutingStatus.OK; - RoutingType routingType = msg.getRouteType(); + RoutingType routingType = msg.getRoutingType(); /* TODO-now: How to address here with AMQP? if (originalAddress != null) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index dd48b58ffe..5cea833d0d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -283,11 +283,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { class FakeMessage extends RefCountMessage { - @Override - public RoutingType getRouteType() { - return null; - } - @Override public SimpleString getReplyTo() { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 10f06b209b..47576591ab 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -35,7 +35,9 @@ import javax.jms.JMSException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -208,7 +210,33 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } @Test(timeout = 60000) - public void testAnycastMessageRoutingExclusivity() throws Exception { + public void testQueueReceiverReadMessageWithDivert() throws Exception { + final String forwardingAddress = getTestName() + "Divert"; + final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress); + server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false); + server.getActiveMQServerControl().createDivert("name", "routingName", getTestName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString()); + sendMessages(getTestName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(forwardingAddress); + + Queue queueView = getProxyToQueue(forwardingAddress); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testAnycastMessageRoutingExclusivityUsingPrefix() throws Exception { final String addressA = "addressA"; final String queueA = "queueA"; final String queueB = "queueB"; @@ -226,8 +254,27 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); } + @Test(timeout = 60000) + public void testAnycastMessageRoutingExclusivityUsingProperty() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + sendMessages(addressA, 1, RoutingType.ANYCAST); + + assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); + } + @Test - public void testMulticastMessageRoutingExclusivity() throws Exception { + public void testMulticastMessageRoutingExclusivityUsingPrefix() throws Exception { final String addressA = "addressA"; final String queueA = "queueA"; final String queueB = "queueB"; @@ -245,6 +292,25 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); } + @Test + public void testMulticastMessageRoutingExclusivityUsingProperty() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + sendMessages(addressA, 1, RoutingType.MULTICAST); + + assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); + assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + } + @Test(timeout = 60000) public void testMessageDurableFalse() throws Exception { sendMessages(getTestName(), 1, false); @@ -1107,6 +1173,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } public void sendMessages(String destinationName, int count) throws Exception { + sendMessages(destinationName, count, null); + } + + public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); try { @@ -1116,6 +1186,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { for (int i = 0; i < count; ++i) { AmqpMessage message = new AmqpMessage(); message.setMessageId("MessageID:" + i); + if (routingType != null) { + message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType()); + } sender.send(message); } } finally { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 31e26e38e8..604b630811 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -341,11 +341,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { final long id; - @Override - public RoutingType getRouteType() { - return null; - } - @Override public SimpleString getReplyTo() { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java index 9e225422b9..8de6958d49 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RoutingTest.java @@ -236,7 +236,7 @@ public class RoutingTest extends ActiveMQTestBase { sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC); ClientProducer p = sendSession.createProducer(addressA); ClientMessage message = sendSession.createMessage(false); - message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); + message.setRoutingType(RoutingType.ANYCAST); p.send(message); sendSession.close(); assertEquals(1, server.locateQueue(queueA).getMessageCount() + server.locateQueue(queueB).getMessageCount()); @@ -255,7 +255,7 @@ public class RoutingTest extends ActiveMQTestBase { sendSession.createQueue(addressA, RoutingType.MULTICAST, queueC); ClientProducer p = sendSession.createProducer(addressA); ClientMessage message = sendSession.createMessage(false); - message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + message.setRoutingType(RoutingType.MULTICAST); p.send(message); sendSession.close(); assertEquals(0, server.locateQueue(queueA).getMessageCount()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java index 69a360ea9b..53116011f3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java @@ -165,7 +165,7 @@ public class DivertTest extends ActiveMQTestBase { for (int i = 0; i < numMessages; i++) { ClientMessage message = session.createMessage(false); - message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + message.setRoutingType(RoutingType.MULTICAST); message.putIntProperty(propKey, i); @@ -238,7 +238,7 @@ public class DivertTest extends ActiveMQTestBase { for (int i = 0; i < numMessages; i++) { ClientMessage message = session.createMessage(false); - message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + message.setRoutingType(RoutingType.MULTICAST); message.putIntProperty(propKey, i);