diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index c3bf31a721..f5bb2a3682 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -282,7 +282,8 @@ public final class BindingsImpl implements Bindings { private void route(final Message message, final RoutingContext context, final boolean groupRouting) throws Exception { - boolean reusableContext = context.isReusable(message, version.get()); + int currentVersion = version.get(); + boolean reusableContext = context.isReusable(message, currentVersion); if (!reusableContext) { context.clear(); @@ -310,13 +311,18 @@ public final class BindingsImpl implements Bindings { boolean routed = false; + boolean hasExclusives = false; + for (Binding binding : exclusiveBindings) { + if (!hasExclusives) { + context.clear().setReusable(false); + hasExclusives = true; + } if (binding.getFilter() == null || binding.getFilter().match(message)) { binding.getBindable().route(message, context); routed = true; } - context.setReusable(false); } if (!routed) { // Remove the ids now, in order to avoid double check @@ -332,6 +338,7 @@ public final class BindingsImpl implements Bindings { context.clear().setReusable(false); routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0); } else if (CompositeAddress.isFullyQualified(message.getAddress())) { + context.clear().setReusable(false); Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString())); if (theBinding != null) { theBinding.route(message, context); @@ -340,21 +347,17 @@ public final class BindingsImpl implements Bindings { // in a optimization, we are reusing the previous context if everything is right for it // so the simpleRouting will only happen if needed if (!reusableContext) { - simpleRouting(message, context); + simpleRouting(message, context, currentVersion); } } } } - private void simpleRouting(Message message, RoutingContext context) throws Exception { + private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception { if (logger.isTraceEnabled()) { - logger.trace("Routing message " + message + " on binding=" + this); + logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context); } - // We check at the version before we started routing, - // this is because if something changed in between we want to check the correct version - int currentVersion = version.get(); - for (Map.Entry> entry : routingNameBindingMap.entrySet()) { SimpleString routingName = entry.getKey(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index 79ab4d3480..ed5eb3b00d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -25,9 +25,12 @@ import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; +import org.jboss.logging.Logger; public class LocalQueueBinding implements QueueBinding { + private static final Logger logger = Logger.getLogger(LocalQueueBinding.class); + private final SimpleString address; private final Queue queue; @@ -119,13 +122,23 @@ public class LocalQueueBinding implements QueueBinding { @Override public void route(final Message message, final RoutingContext context) throws Exception { if (isMatchRoutingType(context)) { + if (logger.isTraceEnabled()) { + logger.trace("adding routing " + queue.getID() + " on message " + message); + } queue.route(message, context); + } else { + if (logger.isTraceEnabled()) { + logger.trace("routing " + queue.getID() + " is ignored as routing type did not match"); + } } } @Override public void routeWithAck(Message message, RoutingContext context) throws Exception { if (isMatchRoutingType(context)) { + if (logger.isTraceEnabled()) { + logger.trace("Message " + message + " routed with ack on queue " + queue.getID()); + } queue.routeWithAck(message, context); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index b0d8063645..5764a28c3a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -934,7 +934,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } if (logger.isTraceEnabled()) { - logger.trace("Message after routed=" + message); + logger.trace("Message after routed=" + message + "\n" + context.toString()); } try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index bfde7afa6a..82091f5ee7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -83,5 +83,4 @@ public interface RoutingContext { boolean isReusable(Message message, int version); - } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index 2c9276304c..c63f524c56 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.server.impl; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -30,9 +32,12 @@ import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.jboss.logging.Logger; public final class RoutingContextImpl implements RoutingContext { + private static final Logger logger = Logger.getLogger(RoutingContextImpl.class); + // The pair here is Durable and NonDurable private final Map map = new HashMap<>(); @@ -128,6 +133,27 @@ public final class RoutingContextImpl implements RoutingContext { queueCount++; } + @Override + public String toString() { + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + printWriter.println("RoutingContextImpl(Address=" + this.address + ", routingType=" + this.routingType + ", PreviousAddress=" + previousAddress + " previousRoute:" + previousRoutingType + ", reusable=" + this.reusable + ", version=" + version + ")"); + for (Map.Entry entry : map.entrySet()) { + printWriter.println(".................................................."); + printWriter.println("***** durable queues " + entry.getKey() + ":"); + for (Queue queue : entry.getValue().getDurableQueues()) { + printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter()); + } + printWriter.println("***** non durable for " + entry.getKey() + ":"); + for (Queue queue : entry.getValue().getNonDurableQueues()) { + printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter()); + } + } + printWriter.println(".................................................."); + + return stringWriter.toString(); + } + @Override public void processReferences(final List refs, final boolean direct) { internalprocessReferences(refs, direct); @@ -163,11 +189,17 @@ public final class RoutingContextImpl implements RoutingContext { @Override public void setAddress(SimpleString address) { + if (this.address == null || !this.address.equals(address)) { + this.clear(); + } this.address = address; } @Override public void setRoutingType(RoutingType routingType) { + if (this.routingType == null || this.routingType != routingType) { + this.clear(); + } this.routingType = routingType; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java index df2bff5e62..373d91cf11 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java @@ -41,7 +41,7 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); - server = createServer(false, createDefaultInVMConfig()); + server = createServer(); server.start(); locator = createLocator(); @@ -49,6 +49,10 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase { session = addClientSession(sf.createSession(false, true, true)); } + protected ActiveMQServer createServer() throws Exception { + return createServer(false, createDefaultInVMConfig()); + } + protected ServerLocator createLocator() { return createInVMNonHALocator(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java new file mode 100644 index 0000000000..8e12033c7c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.SingleServerTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class MixRoutingTest extends SingleServerTestBase { + // Constants ----------------------------------------------------- + + private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + + private static final long CONNECTION_TTL = 2000; + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(false, createDefaultNettyConfig()); + } + + @Test + public void testMix() throws Exception { + SimpleString queueName = SimpleString.toSimpleString(getName()); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TemporaryQueue temporaryQueue = session.createTemporaryQueue(); + Queue queue = session.createQueue(queueName.toString()); + + MessageProducer prodTemp = session.createProducer(temporaryQueue); + MessageProducer prodQueue = session.createProducer(queue); + + final int NMESSAGES = 100; + + for (int i = 0; i < NMESSAGES; i++) { + TextMessage tmpMessage = session.createTextMessage("tmp"); + tmpMessage.setIntProperty("i", i); + TextMessage permanent = session.createTextMessage("permanent"); + permanent.setIntProperty("i", i); + prodQueue.send(permanent); + prodTemp.send(tmpMessage); + } + + MessageConsumer consumerTemp = session.createConsumer(temporaryQueue); + MessageConsumer consumerQueue = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < NMESSAGES; i++) { + TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000); + TextMessage permanent = (TextMessage) consumerQueue.receive(5000); + Assert.assertNotNull(tmpMessage); + Assert.assertNotNull(permanent); + Assert.assertEquals("tmp", tmpMessage.getText()); + Assert.assertEquals("permanent", permanent.getText()); + Assert.assertEquals(i, tmpMessage.getIntProperty("i")); + Assert.assertEquals(i, permanent.getIntProperty("i")); + } + + Assert.assertNull(consumerQueue.receiveNoWait()); + Assert.assertNull(consumerTemp.receiveNoWait()); + connection.close(); + factory.close(); + } + + @Test + public void testMix2() throws Exception { + SimpleString queueName = SimpleString.toSimpleString(getName()); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName.toString()); + + MessageProducer prodQueue = session.createProducer(queue); + + final int NMESSAGES = 100; + + for (int i = 0; i < NMESSAGES; i++) { + TextMessage permanent = session.createTextMessage("permanent"); + permanent.setIntProperty("i", i); + prodQueue.send(permanent); + } + + TemporaryQueue temporaryQueue = session.createTemporaryQueue(); + MessageProducer prodTemp = session.createProducer(temporaryQueue); + + for (int i = 0; i < NMESSAGES; i++) { + TextMessage tmpMessage = session.createTextMessage("tmp"); + tmpMessage.setIntProperty("i", i); + prodTemp.send(tmpMessage); + } + + MessageConsumer consumerTemp = session.createConsumer(temporaryQueue); + MessageConsumer consumerQueue = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < NMESSAGES; i++) { + TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000); + TextMessage permanent = (TextMessage) consumerQueue.receive(5000); + Assert.assertNotNull(tmpMessage); + Assert.assertNotNull(permanent); + Assert.assertEquals("tmp", tmpMessage.getText()); + Assert.assertEquals("permanent", permanent.getText()); + Assert.assertEquals(i, tmpMessage.getIntProperty("i")); + Assert.assertEquals(i, permanent.getIntProperty("i")); + } + + Assert.assertNull(consumerQueue.receiveNoWait()); + Assert.assertNull(consumerTemp.receiveNoWait()); + connection.close(); + factory.close(); + } + + @Test + public void testMixWithTopics() throws Exception { + SimpleString queueName = SimpleString.toSimpleString(getName()); + SimpleString topicName = SimpleString.toSimpleString("topic" + getName()); + AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST); + server.addAddressInfo(info); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName.toString()); + Topic topic = session.createTopic(topicName.toString()); + + MessageProducer prodQueue = session.createProducer(queue); + MessageProducer prodTopic = session.createProducer(topic); + + final int NMESSAGES = 10; + + for (int i = 0; i < NMESSAGES; i++) { + TextMessage topicMessage = session.createTextMessage("topic"); + topicMessage.setIntProperty("i", i); + TextMessage permanent = session.createTextMessage("permanent"); + permanent.setIntProperty("i", i); + prodQueue.send(permanent); + prodTopic.send(topicMessage); + } + + MessageConsumer consumerQueue = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < NMESSAGES; i++) { + TextMessage permanent = (TextMessage) consumerQueue.receive(5000); + Assert.assertNotNull(permanent); + Assert.assertEquals("permanent", permanent.getText()); + Assert.assertEquals(i, permanent.getIntProperty("i")); + } + + Assert.assertNull(consumerQueue.receiveNoWait()); + connection.close(); + factory.close(); + } + +}