ARTEMIS-2328 Routing after empty addresses could lead to invalid messages

This commit is contained in:
Clebert Suconic 2019-05-01 00:29:31 -04:00
parent 39475c68e9
commit fa259ba66e
7 changed files with 260 additions and 12 deletions

View File

@ -282,7 +282,8 @@ public final class BindingsImpl implements Bindings {
private void route(final Message message,
final RoutingContext context,
final boolean groupRouting) throws Exception {
boolean reusableContext = context.isReusable(message, version.get());
int currentVersion = version.get();
boolean reusableContext = context.isReusable(message, currentVersion);
if (!reusableContext) {
context.clear();
@ -310,13 +311,18 @@ public final class BindingsImpl implements Bindings {
boolean routed = false;
boolean hasExclusives = false;
for (Binding binding : exclusiveBindings) {
if (!hasExclusives) {
context.clear().setReusable(false);
hasExclusives = true;
}
if (binding.getFilter() == null || binding.getFilter().match(message)) {
binding.getBindable().route(message, context);
routed = true;
}
context.setReusable(false);
}
if (!routed) {
// Remove the ids now, in order to avoid double check
@ -332,6 +338,7 @@ public final class BindingsImpl implements Bindings {
context.clear().setReusable(false);
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
context.clear().setReusable(false);
Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
if (theBinding != null) {
theBinding.route(message, context);
@ -340,21 +347,17 @@ public final class BindingsImpl implements Bindings {
// in a optimization, we are reusing the previous context if everything is right for it
// so the simpleRouting will only happen if needed
if (!reusableContext) {
simpleRouting(message, context);
simpleRouting(message, context, currentVersion);
}
}
}
}
private void simpleRouting(Message message, RoutingContext context) throws Exception {
private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Routing message " + message + " on binding=" + this);
logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context);
}
// We check at the version before we started routing,
// this is because if something changed in between we want to check the correct version
int currentVersion = version.get();
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
SimpleString routingName = entry.getKey();

View File

@ -25,9 +25,12 @@ import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.jboss.logging.Logger;
public class LocalQueueBinding implements QueueBinding {
private static final Logger logger = Logger.getLogger(LocalQueueBinding.class);
private final SimpleString address;
private final Queue queue;
@ -119,13 +122,23 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public void route(final Message message, final RoutingContext context) throws Exception {
if (isMatchRoutingType(context)) {
if (logger.isTraceEnabled()) {
logger.trace("adding routing " + queue.getID() + " on message " + message);
}
queue.route(message, context);
} else {
if (logger.isTraceEnabled()) {
logger.trace("routing " + queue.getID() + " is ignored as routing type did not match");
}
}
}
@Override
public void routeWithAck(Message message, RoutingContext context) throws Exception {
if (isMatchRoutingType(context)) {
if (logger.isTraceEnabled()) {
logger.trace("Message " + message + " routed with ack on queue " + queue.getID());
}
queue.routeWithAck(message, context);
}
}

View File

@ -934,7 +934,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
if (logger.isTraceEnabled()) {
logger.trace("Message after routed=" + message);
logger.trace("Message after routed=" + message + "\n" + context.toString());
}
try {

View File

@ -83,5 +83,4 @@ public interface RoutingContext {
boolean isReusable(Message message, int version);
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -30,9 +32,12 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
public final class RoutingContextImpl implements RoutingContext {
private static final Logger logger = Logger.getLogger(RoutingContextImpl.class);
// The pair here is Durable and NonDurable
private final Map<SimpleString, RouteContextList> map = new HashMap<>();
@ -128,6 +133,27 @@ public final class RoutingContextImpl implements RoutingContext {
queueCount++;
}
@Override
public String toString() {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
printWriter.println("RoutingContextImpl(Address=" + this.address + ", routingType=" + this.routingType + ", PreviousAddress=" + previousAddress + " previousRoute:" + previousRoutingType + ", reusable=" + this.reusable + ", version=" + version + ")");
for (Map.Entry<SimpleString, RouteContextList> entry : map.entrySet()) {
printWriter.println("..................................................");
printWriter.println("***** durable queues " + entry.getKey() + ":");
for (Queue queue : entry.getValue().getDurableQueues()) {
printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter());
}
printWriter.println("***** non durable for " + entry.getKey() + ":");
for (Queue queue : entry.getValue().getNonDurableQueues()) {
printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter());
}
}
printWriter.println("..................................................");
return stringWriter.toString();
}
@Override
public void processReferences(final List<MessageReference> refs, final boolean direct) {
internalprocessReferences(refs, direct);
@ -163,11 +189,17 @@ public final class RoutingContextImpl implements RoutingContext {
@Override
public void setAddress(SimpleString address) {
if (this.address == null || !this.address.equals(address)) {
this.clear();
}
this.address = address;
}
@Override
public void setRoutingType(RoutingType routingType) {
if (this.routingType == null || this.routingType != routingType) {
this.clear();
}
this.routingType = routingType;
}

View File

@ -41,7 +41,7 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server = createServer(false, createDefaultInVMConfig());
server = createServer();
server.start();
locator = createLocator();
@ -49,6 +49,10 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase {
session = addClientSession(sf.createSession(false, true, true));
}
protected ActiveMQServer createServer() throws Exception {
return createServer(false, createDefaultInVMConfig());
}
protected ServerLocator createLocator() {
return createInVMNonHALocator();
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
import org.junit.Assert;
import org.junit.Test;
public class MixRoutingTest extends SingleServerTestBase {
// Constants -----------------------------------------------------
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
private static final long CONNECTION_TTL = 2000;
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
@Override
protected ActiveMQServer createServer() throws Exception {
return createServer(false, createDefaultNettyConfig());
}
@Test
public void testMix() throws Exception {
SimpleString queueName = SimpleString.toSimpleString(getName());
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
Queue queue = session.createQueue(queueName.toString());
MessageProducer prodTemp = session.createProducer(temporaryQueue);
MessageProducer prodQueue = session.createProducer(queue);
final int NMESSAGES = 100;
for (int i = 0; i < NMESSAGES; i++) {
TextMessage tmpMessage = session.createTextMessage("tmp");
tmpMessage.setIntProperty("i", i);
TextMessage permanent = session.createTextMessage("permanent");
permanent.setIntProperty("i", i);
prodQueue.send(permanent);
prodTemp.send(tmpMessage);
}
MessageConsumer consumerTemp = session.createConsumer(temporaryQueue);
MessageConsumer consumerQueue = session.createConsumer(queue);
connection.start();
for (int i = 0; i < NMESSAGES; i++) {
TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000);
TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
Assert.assertNotNull(tmpMessage);
Assert.assertNotNull(permanent);
Assert.assertEquals("tmp", tmpMessage.getText());
Assert.assertEquals("permanent", permanent.getText());
Assert.assertEquals(i, tmpMessage.getIntProperty("i"));
Assert.assertEquals(i, permanent.getIntProperty("i"));
}
Assert.assertNull(consumerQueue.receiveNoWait());
Assert.assertNull(consumerTemp.receiveNoWait());
connection.close();
factory.close();
}
@Test
public void testMix2() throws Exception {
SimpleString queueName = SimpleString.toSimpleString(getName());
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName.toString());
MessageProducer prodQueue = session.createProducer(queue);
final int NMESSAGES = 100;
for (int i = 0; i < NMESSAGES; i++) {
TextMessage permanent = session.createTextMessage("permanent");
permanent.setIntProperty("i", i);
prodQueue.send(permanent);
}
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
MessageProducer prodTemp = session.createProducer(temporaryQueue);
for (int i = 0; i < NMESSAGES; i++) {
TextMessage tmpMessage = session.createTextMessage("tmp");
tmpMessage.setIntProperty("i", i);
prodTemp.send(tmpMessage);
}
MessageConsumer consumerTemp = session.createConsumer(temporaryQueue);
MessageConsumer consumerQueue = session.createConsumer(queue);
connection.start();
for (int i = 0; i < NMESSAGES; i++) {
TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000);
TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
Assert.assertNotNull(tmpMessage);
Assert.assertNotNull(permanent);
Assert.assertEquals("tmp", tmpMessage.getText());
Assert.assertEquals("permanent", permanent.getText());
Assert.assertEquals(i, tmpMessage.getIntProperty("i"));
Assert.assertEquals(i, permanent.getIntProperty("i"));
}
Assert.assertNull(consumerQueue.receiveNoWait());
Assert.assertNull(consumerTemp.receiveNoWait());
connection.close();
factory.close();
}
@Test
public void testMixWithTopics() throws Exception {
SimpleString queueName = SimpleString.toSimpleString(getName());
SimpleString topicName = SimpleString.toSimpleString("topic" + getName());
AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
server.addAddressInfo(info);
server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName.toString());
Topic topic = session.createTopic(topicName.toString());
MessageProducer prodQueue = session.createProducer(queue);
MessageProducer prodTopic = session.createProducer(topic);
final int NMESSAGES = 10;
for (int i = 0; i < NMESSAGES; i++) {
TextMessage topicMessage = session.createTextMessage("topic");
topicMessage.setIntProperty("i", i);
TextMessage permanent = session.createTextMessage("permanent");
permanent.setIntProperty("i", i);
prodQueue.send(permanent);
prodTopic.send(topicMessage);
}
MessageConsumer consumerQueue = session.createConsumer(queue);
connection.start();
for (int i = 0; i < NMESSAGES; i++) {
TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
Assert.assertNotNull(permanent);
Assert.assertEquals("permanent", permanent.getText());
Assert.assertEquals(i, permanent.getIntProperty("i"));
}
Assert.assertNull(consumerQueue.receiveNoWait());
connection.close();
factory.close();
}
}