ARTEMIS-2151 JMS Selectors broken in some cases

Create Test Case
Fix OpenWire so selectors are translated
Fix GroupID to call groupId method
This commit is contained in:
Michael André Pearce 2018-10-29 05:46:55 +00:00 committed by Clebert Suconic
parent ad7e7c78b9
commit faa6ffa3b4
5 changed files with 172 additions and 1 deletions

View File

@ -48,6 +48,7 @@ public class SelectorTranslator {
filterString = SelectorTranslator.parse(filterString, "JMSTimestamp", "AMQTimestamp");
filterString = SelectorTranslator.parse(filterString, "JMSMessageID", "AMQUserID");
filterString = SelectorTranslator.parse(filterString, "JMSExpiration", "AMQExpiration");
filterString = SelectorTranslator.parse(filterString, "JMSXGroupID", "AMQGroupID");
return filterString;

View File

@ -67,6 +67,11 @@ public final class FilterConstants {
*/
public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress");
/**
* Name of the ActiveMQ Artemis Message group id header.
*/
public static final SimpleString ACTIVEMQ_GROUP_ID = new SimpleString("AMQGroupID");
/**
* All ActiveMQ Artemis headers are prepended by this prefix.
*/

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@ -117,7 +118,7 @@ public class AMQConsumer {
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
SimpleString selector = info.getSelector() == null ? null : new SimpleString(SelectorTranslator.convertToActiveMQFilterString(info.getSelector()));
boolean preAck = false;
if (info.isNoLocal()) {
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {

View File

@ -176,6 +176,8 @@ public class FilterImpl implements Filter {
return msg.getEncodeSize();
} else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) {
return msg.getAddress();
} else if (FilterConstants.ACTIVEMQ_GROUP_ID.equals(fieldName)) {
return msg.getGroupID();
} else {
return null;
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Test;
public class JMSSelectorTest extends JMSClientTestSupport {
private static final String NORMAL_QUEUE_NAME = "NORMAL";
private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistenceEnabled(false);
server.getAddressSettingsRepository().addMatch(NORMAL_QUEUE_NAME, new AddressSettings());
}
@Override
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
super.createAddressAndQueues(server);
//Add Standard Queue
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST));
server.createQueue(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST, SimpleString.toSimpleString(NORMAL_QUEUE_NAME), null, true, false, -1, false, true);
}
@Test
public void testJMSSelectorsAMQPProducerAMQPConsumer() throws Exception {
testJMSSelectors(AMQPConnection, AMQPConnection);
}
@Test
public void testJMSSelectorsCoreProducerCoreConsumer() throws Exception {
testJMSSelectors(CoreConnection, CoreConnection);
}
@Test
public void testJMSSelectorsCoreProducerAMQPConsumer() throws Exception {
testJMSSelectors(CoreConnection, AMQPConnection);
}
@Test
public void testJMSSelectorsAMQPProducerCoreConsumer() throws Exception {
testJMSSelectors(AMQPConnection, CoreConnection);
}
@Test
public void testJMSSelectorsOpenWireProducerOpenWireConsumer() throws Exception {
testJMSSelectors(OpenWireConnection, OpenWireConnection);
}
@Test
public void testJMSSelectorsCoreProducerOpenWireConsumer() throws Exception {
testJMSSelectors(CoreConnection, OpenWireConnection);
}
@Test
public void testJMSSelectorsOpenWireProducerCoreConsumer() throws Exception {
testJMSSelectors(OpenWireConnection, CoreConnection);
}
@Test
public void testJMSSelectorsAMQPProducerOpenWireConsumer() throws Exception {
testJMSSelectors(AMQPConnection, OpenWireConnection);
}
@Test
public void testJMSSelectorsOpenWireProducerAMQPConsumer() throws Exception {
testJMSSelectors(OpenWireConnection, AMQPConnection);
}
public void testJMSSelectors(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("color", "blue"), "color = 'blue'");
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setJMSCorrelationID("correlation"), "JMSCorrelationID = 'correlation'");
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, null, "JMSPriority = 1", Message.DEFAULT_DELIVERY_MODE, 1, Message.DEFAULT_TIME_TO_LIVE);
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("JMSXGroupID", "groupA"), "JMSXGroupID = 'groupA'");
}
public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector) throws Exception {
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, queueName, setValue, selector, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
}
public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector, int deliveryMode, int priority, long timeToLive) throws Exception {
sendMessage(producerConnectionSupplier, queueName, setValue, deliveryMode, priority, timeToLive);
receiveLVQ(consumerConnectionSupplier, queueName, selector);
}
private void receiveLVQ(ConnectionSupplier consumerConnectionSupplier, String queueName, String selector) throws JMSException {
try (Connection consumerConnection = consumerConnectionSupplier.createConnection()) {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = consumerSession.createQueue(queueName);
MessageConsumer consumer = consumerSession.createConsumer(consumerQueue, selector);
TextMessage msg = (TextMessage) consumer.receive(1000);
assertNotNull(msg);
assertEquals("how are you", msg.getText());
assertNull(consumer.receive(1000));
consumer.close();
}
}
private void sendMessage(ConnectionSupplier producerConnectionSupplier, String queueName, MessageSetter setValue, int deliveryMode, int priority, long timeToLive) throws JMSException {
try (Connection producerConnection = producerConnectionSupplier.createConnection()) {
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue1 = producerSession.createQueue(queueName);
MessageProducer p = producerSession.createProducer(null);
TextMessage message1 = producerSession.createTextMessage();
message1.setText("hello");
p.send(queue1, message1);
TextMessage message2 = producerSession.createTextMessage();
if (setValue != null) {
setValue.accept(message2);
}
message2.setText("how are you");
p.send(queue1, message2, deliveryMode, priority, timeToLive);
}
}
public interface MessageSetter {
void accept(javax.jms.Message message) throws JMSException;
}
}