mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-03-06 17:30:11 +00:00
This closes #2400
This commit is contained in:
commit
46588c8bd4
@ -48,6 +48,7 @@ public class SelectorTranslator {
|
|||||||
filterString = SelectorTranslator.parse(filterString, "JMSTimestamp", "AMQTimestamp");
|
filterString = SelectorTranslator.parse(filterString, "JMSTimestamp", "AMQTimestamp");
|
||||||
filterString = SelectorTranslator.parse(filterString, "JMSMessageID", "AMQUserID");
|
filterString = SelectorTranslator.parse(filterString, "JMSMessageID", "AMQUserID");
|
||||||
filterString = SelectorTranslator.parse(filterString, "JMSExpiration", "AMQExpiration");
|
filterString = SelectorTranslator.parse(filterString, "JMSExpiration", "AMQExpiration");
|
||||||
|
filterString = SelectorTranslator.parse(filterString, "JMSXGroupID", "AMQGroupID");
|
||||||
|
|
||||||
return filterString;
|
return filterString;
|
||||||
|
|
||||||
|
@ -67,6 +67,11 @@ public final class FilterConstants {
|
|||||||
*/
|
*/
|
||||||
public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress");
|
public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Name of the ActiveMQ Artemis Message group id header.
|
||||||
|
*/
|
||||||
|
public static final SimpleString ACTIVEMQ_GROUP_ID = new SimpleString("AMQGroupID");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All ActiveMQ Artemis headers are prepended by this prefix.
|
* All ActiveMQ Artemis headers are prepended by this prefix.
|
||||||
*/
|
*/
|
||||||
|
@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
|||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||||
|
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
||||||
import org.apache.activemq.command.ConsumerControl;
|
import org.apache.activemq.command.ConsumerControl;
|
||||||
import org.apache.activemq.command.ConsumerId;
|
import org.apache.activemq.command.ConsumerId;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
@ -117,7 +118,7 @@ public class AMQConsumer {
|
|||||||
|
|
||||||
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
|
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
|
||||||
|
|
||||||
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
|
SimpleString selector = info.getSelector() == null ? null : new SimpleString(SelectorTranslator.convertToActiveMQFilterString(info.getSelector()));
|
||||||
boolean preAck = false;
|
boolean preAck = false;
|
||||||
if (info.isNoLocal()) {
|
if (info.isNoLocal()) {
|
||||||
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
|
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
|
||||||
|
@ -176,6 +176,8 @@ public class FilterImpl implements Filter {
|
|||||||
return msg.getEncodeSize();
|
return msg.getEncodeSize();
|
||||||
} else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) {
|
} else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) {
|
||||||
return msg.getAddress();
|
return msg.getAddress();
|
||||||
|
} else if (FilterConstants.ACTIVEMQ_GROUP_ID.equals(fieldName)) {
|
||||||
|
return msg.getGroupID();
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,162 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class JMSSelectorTest extends JMSClientTestSupport {
|
||||||
|
|
||||||
|
private static final String NORMAL_QUEUE_NAME = "NORMAL";
|
||||||
|
|
||||||
|
private ConnectionSupplier AMQPConnection = () -> createConnection();
|
||||||
|
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
|
||||||
|
private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getConfiguredProtocols() {
|
||||||
|
return "AMQP,OPENWIRE,CORE";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addConfiguration(ActiveMQServer server) {
|
||||||
|
server.getConfiguration().setPersistenceEnabled(false);
|
||||||
|
server.getAddressSettingsRepository().addMatch(NORMAL_QUEUE_NAME, new AddressSettings());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
|
||||||
|
super.createAddressAndQueues(server);
|
||||||
|
|
||||||
|
//Add Standard Queue
|
||||||
|
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST));
|
||||||
|
server.createQueue(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST, SimpleString.toSimpleString(NORMAL_QUEUE_NAME), null, true, false, -1, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsAMQPProducerAMQPConsumer() throws Exception {
|
||||||
|
testJMSSelectors(AMQPConnection, AMQPConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsCoreProducerCoreConsumer() throws Exception {
|
||||||
|
testJMSSelectors(CoreConnection, CoreConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsCoreProducerAMQPConsumer() throws Exception {
|
||||||
|
testJMSSelectors(CoreConnection, AMQPConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsAMQPProducerCoreConsumer() throws Exception {
|
||||||
|
testJMSSelectors(AMQPConnection, CoreConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsOpenWireProducerOpenWireConsumer() throws Exception {
|
||||||
|
testJMSSelectors(OpenWireConnection, OpenWireConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsCoreProducerOpenWireConsumer() throws Exception {
|
||||||
|
testJMSSelectors(CoreConnection, OpenWireConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsOpenWireProducerCoreConsumer() throws Exception {
|
||||||
|
testJMSSelectors(OpenWireConnection, CoreConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsAMQPProducerOpenWireConsumer() throws Exception {
|
||||||
|
testJMSSelectors(AMQPConnection, OpenWireConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJMSSelectorsOpenWireProducerAMQPConsumer() throws Exception {
|
||||||
|
testJMSSelectors(OpenWireConnection, AMQPConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testJMSSelectors(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
|
||||||
|
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("color", "blue"), "color = 'blue'");
|
||||||
|
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setJMSCorrelationID("correlation"), "JMSCorrelationID = 'correlation'");
|
||||||
|
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, null, "JMSPriority = 1", Message.DEFAULT_DELIVERY_MODE, 1, Message.DEFAULT_TIME_TO_LIVE);
|
||||||
|
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("JMSXGroupID", "groupA"), "JMSXGroupID = 'groupA'");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector) throws Exception {
|
||||||
|
testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, queueName, setValue, selector, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector, int deliveryMode, int priority, long timeToLive) throws Exception {
|
||||||
|
|
||||||
|
sendMessage(producerConnectionSupplier, queueName, setValue, deliveryMode, priority, timeToLive);
|
||||||
|
|
||||||
|
receiveLVQ(consumerConnectionSupplier, queueName, selector);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void receiveLVQ(ConnectionSupplier consumerConnectionSupplier, String queueName, String selector) throws JMSException {
|
||||||
|
try (Connection consumerConnection = consumerConnectionSupplier.createConnection()) {
|
||||||
|
|
||||||
|
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue consumerQueue = consumerSession.createQueue(queueName);
|
||||||
|
MessageConsumer consumer = consumerSession.createConsumer(consumerQueue, selector);
|
||||||
|
TextMessage msg = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals("how are you", msg.getText());
|
||||||
|
assertNull(consumer.receive(1000));
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendMessage(ConnectionSupplier producerConnectionSupplier, String queueName, MessageSetter setValue, int deliveryMode, int priority, long timeToLive) throws JMSException {
|
||||||
|
try (Connection producerConnection = producerConnectionSupplier.createConnection()) {
|
||||||
|
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue1 = producerSession.createQueue(queueName);
|
||||||
|
MessageProducer p = producerSession.createProducer(null);
|
||||||
|
|
||||||
|
TextMessage message1 = producerSession.createTextMessage();
|
||||||
|
message1.setText("hello");
|
||||||
|
p.send(queue1, message1);
|
||||||
|
|
||||||
|
TextMessage message2 = producerSession.createTextMessage();
|
||||||
|
if (setValue != null) {
|
||||||
|
setValue.accept(message2);
|
||||||
|
}
|
||||||
|
message2.setText("how are you");
|
||||||
|
p.send(queue1, message2, deliveryMode, priority, timeToLive);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface MessageSetter {
|
||||||
|
|
||||||
|
void accept(javax.jms.Message message) throws JMSException;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user