diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 1ea9309661..80116edb5d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -112,6 +112,11 @@ public interface Message { */ SimpleString HDR_VALIDATED_USER = new SimpleString("_AMQ_VALIDATED_USER"); + /** + * The Routing Type for this message. Ensures that this message is only routed to queues with matching routing type. + */ + SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE"); + byte DEFAULT_TYPE = 0; byte OBJECT_TYPE = 2; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java index 40b9cb5cfe..c8bf86e670 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java @@ -64,7 +64,6 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage { return buff.toString(); } - @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeSimpleString(address); diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index 47d9ff2273..aa4754b733 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -491,6 +492,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage(); coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID); + byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType(); + coreMessage.putByteProperty(MessageImpl.HDR_ROUTING_TYPE, routingType); + try { /** * Using a completionListener requires wrapping using a {@link CompletionListenerWrapper}, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index 30e3768040..d02f0f0668 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -18,12 +18,13 @@ package org.apache.activemq.artemis.core.postoffice.impl; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.Bindable; -import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; public class LocalQueueBinding implements QueueBinding { @@ -117,12 +118,23 @@ public class LocalQueueBinding implements QueueBinding { @Override public void route(final ServerMessage message, final RoutingContext context) throws Exception { - queue.route(message, context); + if (isMatchRoutingType(message)) { + queue.route(message, context); + } } @Override public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception { - queue.routeWithAck(message, context); + if (isMatchRoutingType(message)) { + queue.routeWithAck(message, context); + } + } + + private boolean isMatchRoutingType(ServerMessage message) { + if (message.containsProperty(MessageInternal.HDR_ROUTING_TYPE)) { + return message.getByteProperty(MessageInternal.HDR_ROUTING_TYPE) == queue.getRoutingType().getType(); + } + return true; } public boolean isQueueBinding() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index dc73680d8d..2fc34093aa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -655,6 +655,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding final RoutingContext context, final boolean direct, boolean rejectDuplicates) throws Exception { + RoutingStatus result = RoutingStatus.OK; // Sanity check if (message.getRefCount() > 0) { @@ -663,6 +664,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding SimpleString address = message.getAddress(); + if (address.toString().equals("testQueue")) { + System.out.println("f"); + } + setPagingStore(message); AtomicBoolean startedTX = new AtomicBoolean(false); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index f6a0ebd426..d6e626cef1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1542,7 +1542,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public void createSharedQueue(final SimpleString address, RoutingType routingType, final SimpleString name, final SimpleString filterString, + public void createSharedQueue(final SimpleString address, + RoutingType routingType, + final SimpleString name, + final SimpleString filterString, final SimpleString user, boolean durable) throws Exception { //force the old contract about address @@ -1558,7 +1561,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, false); + final Queue queue = createQueue(address, + name, + routingType, + filterString, + user, + durable, + !durable, + true, + !durable, + false, + Queue.MAX_CONSUMERS_UNLIMITED, + false, + true); if (!queue.getAddress().equals(address)) { throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name); diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java index d001f5bcca..c5fb96456a 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java @@ -25,19 +25,29 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.tests.message.SimpleJMSMessage; import org.apache.activemq.artemis.jms.tests.message.SimpleJMSTextMessage; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; import org.junit.Test; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + public class MessageProducerTest extends JMSTestCase { @Test @@ -695,6 +705,42 @@ public class MessageProducerTest extends JMSTestCase { ProxyAssertSupport.assertTrue(listener.exception instanceof javax.jms.IllegalStateException); } + + @Test + public void testSendToQueueOnlyWhenTopicWithSameAddress() throws Exception { + SimpleString addr = SimpleString.toSimpleString("testAddr"); + + Set supportedRoutingTypes = new HashSet<>(); + supportedRoutingTypes.add(RoutingType.ANYCAST); + supportedRoutingTypes.add(RoutingType.MULTICAST); + + servers.get(0).getActiveMQServer().createAddressInfo(new AddressInfo(addr, supportedRoutingTypes)); + servers.get(0).getActiveMQServer().createQueue(addr, RoutingType.ANYCAST, addr, null, false, false); + + Connection pconn = createConnection(); + pconn.start(); + + Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = ps.createQueue(addr.toString()); + Topic topic = ps.createTopic(addr.toString()); + + MessageConsumer queueConsumer = ps.createConsumer(queue); + MessageConsumer topicConsumer = ps.createConsumer(topic); + + MessageProducer queueProducer = ps.createProducer(queue); + queueProducer.send(ps.createMessage()); + + assertNotNull(queueConsumer.receive(1000)); + assertNull(topicConsumer.receive(1000)); + + MessageProducer topicProducer = ps.createProducer(topic); + topicProducer.send(ps.createMessage()); + + assertNull(queueConsumer.receive(1000)); + assertNotNull(topicConsumer.receive(1000)); + } + // Package protected --------------------------------------------- // Protected -----------------------------------------------------