This closes #2656
This commit is contained in:
commit
bede6032c9
|
@ -282,7 +282,8 @@ public final class BindingsImpl implements Bindings {
|
|||
private void route(final Message message,
|
||||
final RoutingContext context,
|
||||
final boolean groupRouting) throws Exception {
|
||||
boolean reusableContext = context.isReusable(message, version.get());
|
||||
int currentVersion = version.get();
|
||||
boolean reusableContext = context.isReusable(message, currentVersion);
|
||||
|
||||
if (!reusableContext) {
|
||||
context.clear();
|
||||
|
@ -310,13 +311,18 @@ public final class BindingsImpl implements Bindings {
|
|||
|
||||
boolean routed = false;
|
||||
|
||||
boolean hasExclusives = false;
|
||||
|
||||
for (Binding binding : exclusiveBindings) {
|
||||
if (!hasExclusives) {
|
||||
context.clear().setReusable(false);
|
||||
hasExclusives = true;
|
||||
}
|
||||
|
||||
if (binding.getFilter() == null || binding.getFilter().match(message)) {
|
||||
binding.getBindable().route(message, context);
|
||||
routed = true;
|
||||
}
|
||||
context.setReusable(false);
|
||||
}
|
||||
if (!routed) {
|
||||
// Remove the ids now, in order to avoid double check
|
||||
|
@ -332,6 +338,7 @@ public final class BindingsImpl implements Bindings {
|
|||
context.clear().setReusable(false);
|
||||
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
|
||||
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
|
||||
context.clear().setReusable(false);
|
||||
Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
|
||||
if (theBinding != null) {
|
||||
theBinding.route(message, context);
|
||||
|
@ -340,21 +347,17 @@ public final class BindingsImpl implements Bindings {
|
|||
// in a optimization, we are reusing the previous context if everything is right for it
|
||||
// so the simpleRouting will only happen if needed
|
||||
if (!reusableContext) {
|
||||
simpleRouting(message, context);
|
||||
simpleRouting(message, context, currentVersion);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void simpleRouting(Message message, RoutingContext context) throws Exception {
|
||||
private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Routing message " + message + " on binding=" + this);
|
||||
logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context);
|
||||
}
|
||||
|
||||
// We check at the version before we started routing,
|
||||
// this is because if something changed in between we want to check the correct version
|
||||
int currentVersion = version.get();
|
||||
|
||||
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
|
||||
SimpleString routingName = entry.getKey();
|
||||
|
||||
|
|
|
@ -25,9 +25,12 @@ import org.apache.activemq.artemis.core.server.Bindable;
|
|||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class LocalQueueBinding implements QueueBinding {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(LocalQueueBinding.class);
|
||||
|
||||
private final SimpleString address;
|
||||
|
||||
private final Queue queue;
|
||||
|
@ -119,13 +122,23 @@ public class LocalQueueBinding implements QueueBinding {
|
|||
@Override
|
||||
public void route(final Message message, final RoutingContext context) throws Exception {
|
||||
if (isMatchRoutingType(context)) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("adding routing " + queue.getID() + " on message " + message);
|
||||
}
|
||||
queue.route(message, context);
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("routing " + queue.getID() + " is ignored as routing type did not match");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void routeWithAck(Message message, RoutingContext context) throws Exception {
|
||||
if (isMatchRoutingType(context)) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Message " + message + " routed with ack on queue " + queue.getID());
|
||||
}
|
||||
queue.routeWithAck(message, context);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -934,7 +934,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Message after routed=" + message);
|
||||
logger.trace("Message after routed=" + message + "\n" + context.toString());
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
|
@ -83,5 +83,4 @@ public interface RoutingContext {
|
|||
boolean isReusable(Message message, int version);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.impl;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -30,9 +32,12 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
|
|||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public final class RoutingContextImpl implements RoutingContext {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(RoutingContextImpl.class);
|
||||
|
||||
// The pair here is Durable and NonDurable
|
||||
private final Map<SimpleString, RouteContextList> map = new HashMap<>();
|
||||
|
||||
|
@ -128,6 +133,27 @@ public final class RoutingContextImpl implements RoutingContext {
|
|||
queueCount++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringWriter stringWriter = new StringWriter();
|
||||
PrintWriter printWriter = new PrintWriter(stringWriter);
|
||||
printWriter.println("RoutingContextImpl(Address=" + this.address + ", routingType=" + this.routingType + ", PreviousAddress=" + previousAddress + " previousRoute:" + previousRoutingType + ", reusable=" + this.reusable + ", version=" + version + ")");
|
||||
for (Map.Entry<SimpleString, RouteContextList> entry : map.entrySet()) {
|
||||
printWriter.println("..................................................");
|
||||
printWriter.println("***** durable queues " + entry.getKey() + ":");
|
||||
for (Queue queue : entry.getValue().getDurableQueues()) {
|
||||
printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter());
|
||||
}
|
||||
printWriter.println("***** non durable for " + entry.getKey() + ":");
|
||||
for (Queue queue : entry.getValue().getNonDurableQueues()) {
|
||||
printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter());
|
||||
}
|
||||
}
|
||||
printWriter.println("..................................................");
|
||||
|
||||
return stringWriter.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processReferences(final List<MessageReference> refs, final boolean direct) {
|
||||
internalprocessReferences(refs, direct);
|
||||
|
@ -163,11 +189,17 @@ public final class RoutingContextImpl implements RoutingContext {
|
|||
|
||||
@Override
|
||||
public void setAddress(SimpleString address) {
|
||||
if (this.address == null || !this.address.equals(address)) {
|
||||
this.clear();
|
||||
}
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRoutingType(RoutingType routingType) {
|
||||
if (this.routingType == null || this.routingType != routingType) {
|
||||
this.clear();
|
||||
}
|
||||
this.routingType = routingType;
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase {
|
|||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
server = createServer(false, createDefaultInVMConfig());
|
||||
server = createServer();
|
||||
server.start();
|
||||
|
||||
locator = createLocator();
|
||||
|
@ -49,6 +49,10 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase {
|
|||
session = addClientSession(sf.createSession(false, true, true));
|
||||
}
|
||||
|
||||
protected ActiveMQServer createServer() throws Exception {
|
||||
return createServer(false, createDefaultInVMConfig());
|
||||
}
|
||||
|
||||
protected ServerLocator createLocator() {
|
||||
return createInVMNonHALocator();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class MixRoutingTest extends SingleServerTestBase {
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
private static final long CONNECTION_TTL = 2000;
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
protected ActiveMQServer createServer() throws Exception {
|
||||
return createServer(false, createDefaultNettyConfig());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMix() throws Exception {
|
||||
SimpleString queueName = SimpleString.toSimpleString(getName());
|
||||
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
|
||||
Queue queue = session.createQueue(queueName.toString());
|
||||
|
||||
MessageProducer prodTemp = session.createProducer(temporaryQueue);
|
||||
MessageProducer prodQueue = session.createProducer(queue);
|
||||
|
||||
final int NMESSAGES = 100;
|
||||
|
||||
for (int i = 0; i < NMESSAGES; i++) {
|
||||
TextMessage tmpMessage = session.createTextMessage("tmp");
|
||||
tmpMessage.setIntProperty("i", i);
|
||||
TextMessage permanent = session.createTextMessage("permanent");
|
||||
permanent.setIntProperty("i", i);
|
||||
prodQueue.send(permanent);
|
||||
prodTemp.send(tmpMessage);
|
||||
}
|
||||
|
||||
MessageConsumer consumerTemp = session.createConsumer(temporaryQueue);
|
||||
MessageConsumer consumerQueue = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
for (int i = 0; i < NMESSAGES; i++) {
|
||||
TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000);
|
||||
TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
|
||||
Assert.assertNotNull(tmpMessage);
|
||||
Assert.assertNotNull(permanent);
|
||||
Assert.assertEquals("tmp", tmpMessage.getText());
|
||||
Assert.assertEquals("permanent", permanent.getText());
|
||||
Assert.assertEquals(i, tmpMessage.getIntProperty("i"));
|
||||
Assert.assertEquals(i, permanent.getIntProperty("i"));
|
||||
}
|
||||
|
||||
Assert.assertNull(consumerQueue.receiveNoWait());
|
||||
Assert.assertNull(consumerTemp.receiveNoWait());
|
||||
connection.close();
|
||||
factory.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMix2() throws Exception {
|
||||
SimpleString queueName = SimpleString.toSimpleString(getName());
|
||||
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Queue queue = session.createQueue(queueName.toString());
|
||||
|
||||
MessageProducer prodQueue = session.createProducer(queue);
|
||||
|
||||
final int NMESSAGES = 100;
|
||||
|
||||
for (int i = 0; i < NMESSAGES; i++) {
|
||||
TextMessage permanent = session.createTextMessage("permanent");
|
||||
permanent.setIntProperty("i", i);
|
||||
prodQueue.send(permanent);
|
||||
}
|
||||
|
||||
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
|
||||
MessageProducer prodTemp = session.createProducer(temporaryQueue);
|
||||
|
||||
for (int i = 0; i < NMESSAGES; i++) {
|
||||
TextMessage tmpMessage = session.createTextMessage("tmp");
|
||||
tmpMessage.setIntProperty("i", i);
|
||||
prodTemp.send(tmpMessage);
|
||||
}
|
||||
|
||||
MessageConsumer consumerTemp = session.createConsumer(temporaryQueue);
|
||||
MessageConsumer consumerQueue = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
for (int i = 0; i < NMESSAGES; i++) {
|
||||
TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000);
|
||||
TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
|
||||
Assert.assertNotNull(tmpMessage);
|
||||
Assert.assertNotNull(permanent);
|
||||
Assert.assertEquals("tmp", tmpMessage.getText());
|
||||
Assert.assertEquals("permanent", permanent.getText());
|
||||
Assert.assertEquals(i, tmpMessage.getIntProperty("i"));
|
||||
Assert.assertEquals(i, permanent.getIntProperty("i"));
|
||||
}
|
||||
|
||||
Assert.assertNull(consumerQueue.receiveNoWait());
|
||||
Assert.assertNull(consumerTemp.receiveNoWait());
|
||||
connection.close();
|
||||
factory.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMixWithTopics() throws Exception {
|
||||
SimpleString queueName = SimpleString.toSimpleString(getName());
|
||||
SimpleString topicName = SimpleString.toSimpleString("topic" + getName());
|
||||
AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
|
||||
server.addAddressInfo(info);
|
||||
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Queue queue = session.createQueue(queueName.toString());
|
||||
Topic topic = session.createTopic(topicName.toString());
|
||||
|
||||
MessageProducer prodQueue = session.createProducer(queue);
|
||||
MessageProducer prodTopic = session.createProducer(topic);
|
||||
|
||||
final int NMESSAGES = 10;
|
||||
|
||||
for (int i = 0; i < NMESSAGES; i++) {
|
||||
TextMessage topicMessage = session.createTextMessage("topic");
|
||||
topicMessage.setIntProperty("i", i);
|
||||
TextMessage permanent = session.createTextMessage("permanent");
|
||||
permanent.setIntProperty("i", i);
|
||||
prodQueue.send(permanent);
|
||||
prodTopic.send(topicMessage);
|
||||
}
|
||||
|
||||
MessageConsumer consumerQueue = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
for (int i = 0; i < NMESSAGES; i++) {
|
||||
TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
|
||||
Assert.assertNotNull(permanent);
|
||||
Assert.assertEquals("permanent", permanent.getText());
|
||||
Assert.assertEquals(i, permanent.getIntProperty("i"));
|
||||
}
|
||||
|
||||
Assert.assertNull(consumerQueue.receiveNoWait());
|
||||
connection.close();
|
||||
factory.close();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue