added support for AMQ-1073 to allow selectors to be used with virtual destinations

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@479694 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-11-27 17:16:33 +00:00
parent e78e72f957
commit 4f7aeecc1e
5 changed files with 218 additions and 5 deletions

View File

@ -22,6 +22,7 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Collection;
import java.util.Iterator;
@ -29,7 +30,7 @@ import java.util.Iterator;
/**
* Represents a composite {@link Destination} where send()s are replicated to
* each Destination instance.
*
*
* @version $Revision$
*/
public class CompositeDestinationInterceptor extends DestinationFilter {
@ -46,8 +47,29 @@ public class CompositeDestinationInterceptor extends DestinationFilter {
}
public void send(ConnectionContext context, Message message) throws Exception {
MessageEvaluationContext messageContext = null;
for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
ActiveMQDestination destination = (ActiveMQDestination) iter.next();
ActiveMQDestination destination = null;
Object value = iter.next();
if (value instanceof FilteredDestination) {
FilteredDestination filteredDestination = (FilteredDestination) value;
if (messageContext == null) {
messageContext = new MessageEvaluationContext();
messageContext.setMessageReference(message);
}
messageContext.setDestination(filteredDestination.getDestination());
if (filteredDestination.matches(messageContext)) {
destination = filteredDestination.getDestination();
}
}
else if (value instanceof ActiveMQDestination) {
destination = (ActiveMQDestination) value;
}
if (destination == null) {
continue;
}
if (copyMessage) {
message = message.copy();

View File

@ -0,0 +1,95 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
/**
* Represents a destination which is filtered using some predicate such as a selector
* so that messages are only dispatched to the destination if they match the filter.
*
* @org.apache.xbean.XBean
*
* @version $Revision$
*/
public class FilteredDestination {
private ActiveMQDestination destination;
private String selector;
private BooleanExpression filter;
public boolean matches(MessageEvaluationContext context) throws JMSException {
BooleanExpression booleanExpression = getFilter();
if (booleanExpression == null) {
return false;
}
return booleanExpression.matches(context);
}
public ActiveMQDestination getDestination() {
return destination;
}
/**
* The destination to send messages to if they match the filter
*/
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
}
public String getSelector() {
return selector;
}
/**
* Sets the JMS selector used to filter messages before forwarding them to this destination
*/
public void setSelector(String selector) throws InvalidSelectorException {
this.selector = selector;
setFilter(new SelectorParser().parse(selector));
}
public BooleanExpression getFilter() {
return filter;
}
public void setFilter(BooleanExpression filter) {
this.filter = filter;
}
/**
* Sets the destination property to the given queue name
*/
public void setQueue(String queue) {
setDestination(ActiveMQDestination.createDestination(queue, ActiveMQDestination.QUEUE_TYPE));
}
/**
* Sets the destination property to the given topic name
*/
public void setTopic(String topic) {
setDestination(ActiveMQDestination.createDestination(topic, ActiveMQDestination.TOPIC_TYPE));
}
}

View File

@ -30,6 +30,8 @@ import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import java.net.URI;
@ -43,6 +45,9 @@ public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
private Connection connection;
protected int total = 10;
public void testVirtualTopicCreation() throws Exception {
if (connection == null) {
connection = createConnection();
@ -73,15 +78,27 @@ public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
int total = 10;
for (int i = 0; i < total; i++) {
producer.send(session.createTextMessage("message: " + i));
producer.send(createMessage(session, i));
}
assertMessagesArrived(messageList1, messageList2);
}
protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
messageList1.assertMessagesArrived(total);
messageList2.assertMessagesArrived(total);
}
protected TextMessage createMessage(Session session, int i) throws JMSException {
TextMessage textMessage = session.createTextMessage("message: " + i);
if (i % 2 == 1) {
textMessage.setStringProperty("odd", "yes");
}
textMessage.setIntProperty("i", i);
return textMessage;
}
protected Destination getConsumer1Dsetination() {
return new ActiveMQQueue("FOO");
}

View File

@ -0,0 +1,37 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import org.apache.activemq.spring.ConsumerBean;
/**
* @version $Revision$
*/
public class FilteredQueueTest extends CompositeQueueTest {
@Override
protected String getBrokerConfigUri() {
return "org/apache/activemq/broker/virtual/filtered-queue.xml";
}
@Override
protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
messageList1.assertMessagesArrived(total / 2);
messageList2.assertMessagesArrived(1);
}
}

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: xbean -->
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<broker xmlns="http://activemq.org/config/1.0">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<filteredDestination selector="odd = 'yes'" queue="FOO"/>
<filteredDestination selector="i = 5" topic="BAR"/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
</beans>
<!-- END SNIPPET: xbean -->