mirror of https://github.com/apache/activemq.git
added test cases for composite destinations (which are kinds of virtual destinations) - see http://incubator.apache.org/activemq/virtual-destinations.html
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@426094 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f1db92fb07
commit
4ae7e1cede
|
@ -27,11 +27,12 @@ import java.util.Collection;
|
||||||
public abstract class CompositeDestination implements VirtualDestination {
|
public abstract class CompositeDestination implements VirtualDestination {
|
||||||
|
|
||||||
private String name;
|
private String name;
|
||||||
private Collection forwardDestinations;
|
private Collection forwardTo;
|
||||||
private boolean forwardOnly = true;
|
private boolean forwardOnly = true;
|
||||||
|
private boolean copyMessage = true;
|
||||||
|
|
||||||
public Destination intercept(Destination destination) {
|
public Destination intercept(Destination destination) {
|
||||||
return new CompositeDestinationInterceptor(destination, getForwardDestinations(), isForwardOnly());
|
return new CompositeDestinationInterceptor(destination, getForwardTo(), isForwardOnly(), isCopyMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
|
@ -45,15 +46,15 @@ public abstract class CompositeDestination implements VirtualDestination {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection getForwardDestinations() {
|
public Collection getForwardTo() {
|
||||||
return forwardDestinations;
|
return forwardTo;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the list of destinations to forward to
|
* Sets the list of destinations to forward to
|
||||||
*/
|
*/
|
||||||
public void setForwardDestinations(Collection forwardDestinations) {
|
public void setForwardTo(Collection forwardDestinations) {
|
||||||
this.forwardDestinations = forwardDestinations;
|
this.forwardTo = forwardDestinations;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isForwardOnly() {
|
public boolean isForwardOnly() {
|
||||||
|
@ -68,4 +69,18 @@ public abstract class CompositeDestination implements VirtualDestination {
|
||||||
public void setForwardOnly(boolean forwardOnly) {
|
public void setForwardOnly(boolean forwardOnly) {
|
||||||
this.forwardOnly = forwardOnly;
|
this.forwardOnly = forwardOnly;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isCopyMessage() {
|
||||||
|
return copyMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets whether a copy of the message will be sent to each destination.
|
||||||
|
* Defaults to true so that the forward destination is set as the
|
||||||
|
* destination of the message
|
||||||
|
*/
|
||||||
|
public void setCopyMessage(boolean copyMessage) {
|
||||||
|
this.copyMessage = copyMessage;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,16 +35,24 @@ public class CompositeDestinationInterceptor extends DestinationFilter {
|
||||||
|
|
||||||
private Collection forwardDestinations;
|
private Collection forwardDestinations;
|
||||||
private boolean forwardOnly;
|
private boolean forwardOnly;
|
||||||
|
private boolean copyMessage;
|
||||||
|
|
||||||
public CompositeDestinationInterceptor(Destination next, Collection forwardDestinations, boolean forwardOnly) {
|
public CompositeDestinationInterceptor(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage) {
|
||||||
super(next);
|
super(next);
|
||||||
this.forwardDestinations = forwardDestinations;
|
this.forwardDestinations = forwardDestinations;
|
||||||
this.forwardOnly = forwardOnly;
|
this.forwardOnly = forwardOnly;
|
||||||
|
this.copyMessage = copyMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void send(ConnectionContext context, Message message) throws Exception {
|
public void send(ConnectionContext context, Message message) throws Exception {
|
||||||
for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
|
for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
|
||||||
ActiveMQDestination destination = (ActiveMQDestination) iter.next();
|
ActiveMQDestination destination = (ActiveMQDestination) iter.next();
|
||||||
|
|
||||||
|
if (copyMessage) {
|
||||||
|
message = message.copy();
|
||||||
|
message.setDestination(destination);
|
||||||
|
}
|
||||||
|
|
||||||
send(context, message, destination);
|
send(context, message, destination);
|
||||||
}
|
}
|
||||||
if (!forwardOnly) {
|
if (!forwardOnly) {
|
||||||
|
|
|
@ -35,7 +35,7 @@ import org.springframework.jms.core.JmsTemplate;
|
||||||
*
|
*
|
||||||
* @version $Revision: 1.1 $
|
* @version $Revision: 1.1 $
|
||||||
*/
|
*/
|
||||||
public class EmbeddedBrokerTestSupport extends TestCase {
|
public abstract class EmbeddedBrokerTestSupport extends TestCase {
|
||||||
|
|
||||||
protected static final Log log = LogFactory.getLog(EmbeddedBrokerTestSupport.class);
|
protected static final Log log = LogFactory.getLog(EmbeddedBrokerTestSupport.class);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,116 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.virtual;
|
||||||
|
|
||||||
|
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.spring.ConsumerBean;
|
||||||
|
import org.apache.activemq.xbean.XBeanBrokerFactory;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public class CompositeQueueTest extends EmbeddedBrokerTestSupport {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(CompositeQueueTest.class);
|
||||||
|
|
||||||
|
private Connection connection;
|
||||||
|
|
||||||
|
public void testVirtualTopicCreation() throws Exception {
|
||||||
|
if (connection == null) {
|
||||||
|
connection = createConnection();
|
||||||
|
}
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
ConsumerBean messageList1 = new ConsumerBean();
|
||||||
|
ConsumerBean messageList2 = new ConsumerBean();
|
||||||
|
messageList1.setVerbose(true);
|
||||||
|
messageList2.setVerbose(true);
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
Destination producerDestination = getProducerDestination();
|
||||||
|
Destination destination1 = getConsumer1Dsetination();
|
||||||
|
Destination destination2 = getConsumer2Dsetination();
|
||||||
|
|
||||||
|
log.info("Sending to: " + producerDestination);
|
||||||
|
log.info("Consuming from: " + destination1 + " and " + destination2);
|
||||||
|
|
||||||
|
MessageConsumer c1 = session.createConsumer(destination1);
|
||||||
|
MessageConsumer c2 = session.createConsumer(destination2);
|
||||||
|
|
||||||
|
c1.setMessageListener(messageList1);
|
||||||
|
c2.setMessageListener(messageList2);
|
||||||
|
|
||||||
|
// create topic producer
|
||||||
|
MessageProducer producer = session.createProducer(producerDestination);
|
||||||
|
assertNotNull(producer);
|
||||||
|
|
||||||
|
int total = 10;
|
||||||
|
for (int i = 0; i < total; i++) {
|
||||||
|
producer.send(session.createTextMessage("message: " + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
messageList1.assertMessagesArrived(total);
|
||||||
|
messageList2.assertMessagesArrived(total);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination getConsumer1Dsetination() {
|
||||||
|
return new ActiveMQQueue("FOO");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination getConsumer2Dsetination() {
|
||||||
|
return new ActiveMQTopic("BAR");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination getProducerDestination() {
|
||||||
|
return new ActiveMQQueue("MY.QUEUE");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
XBeanBrokerFactory factory = new XBeanBrokerFactory();
|
||||||
|
BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
|
||||||
|
|
||||||
|
// lets disable persistence as we are a test
|
||||||
|
answer.setPersistent(false);
|
||||||
|
|
||||||
|
return answer;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getBrokerConfigUri() {
|
||||||
|
return "org/apache/activemq/broker/virtual/composite-queue.xml";
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.virtual;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public class CompositeTopicTest extends CompositeQueueTest {
|
||||||
|
|
||||||
|
protected Destination getConsumer1Dsetination() {
|
||||||
|
return new ActiveMQQueue("FOO");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination getConsumer2Dsetination() {
|
||||||
|
return new ActiveMQTopic("BAR");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination getProducerDestination() {
|
||||||
|
return new ActiveMQTopic("MY.TOPIC");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getBrokerConfigUri() {
|
||||||
|
return "org/apache/activemq/broker/virtual/composite-topic.xml";
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!--
|
||||||
|
Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<!-- this file can only be parsed using the xbean-spring library -->
|
||||||
|
<!-- START SNIPPET: xbean -->
|
||||||
|
<beans>
|
||||||
|
|
||||||
|
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||||
|
|
||||||
|
<broker xmlns="http://activemq.org/config/1.0">
|
||||||
|
<destinationInterceptors>
|
||||||
|
<virtualDestinationInterceptor>
|
||||||
|
<virtualDestinations>
|
||||||
|
<compositeQueue name="MY.QUEUE">
|
||||||
|
<forwardTo>
|
||||||
|
<queue physicalName="FOO"/>
|
||||||
|
<topic physicalName="BAR"/>
|
||||||
|
</forwardTo>
|
||||||
|
</compositeQueue>
|
||||||
|
</virtualDestinations>
|
||||||
|
</virtualDestinationInterceptor>
|
||||||
|
</destinationInterceptors>
|
||||||
|
|
||||||
|
</broker>
|
||||||
|
|
||||||
|
</beans>
|
||||||
|
<!-- END SNIPPET: xbean -->
|
|
@ -0,0 +1,41 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!--
|
||||||
|
Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<!-- this file can only be parsed using the xbean-spring library -->
|
||||||
|
<!-- START SNIPPET: xbean -->
|
||||||
|
<beans>
|
||||||
|
|
||||||
|
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||||
|
|
||||||
|
<broker xmlns="http://activemq.org/config/1.0">
|
||||||
|
<destinationInterceptors>
|
||||||
|
<virtualDestinationInterceptor>
|
||||||
|
<virtualDestinations>
|
||||||
|
<compositeTopic name="MY.TOPIC">
|
||||||
|
<forwardTo>
|
||||||
|
<queue physicalName="FOO"/>
|
||||||
|
<topic physicalName="BAR"/>
|
||||||
|
</forwardTo>
|
||||||
|
</compositeTopic>
|
||||||
|
</virtualDestinations>
|
||||||
|
</virtualDestinationInterceptor>
|
||||||
|
</destinationInterceptors>
|
||||||
|
|
||||||
|
</broker>
|
||||||
|
|
||||||
|
</beans>
|
||||||
|
<!-- END SNIPPET: xbean -->
|
Loading…
Reference in New Issue