mirror of https://github.com/apache/activemq.git
moved the virtual topic test cases to a better package and added a test case that demonstrates how to configure the virtual topic policy to enable all topics to be virtual topics together with changing the consumer queue prefix
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@426087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
876d9cf64e
commit
370734c821
|
@ -38,6 +38,7 @@ import org.apache.activemq.broker.region.DestinationInterceptor;
|
|||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.virtual.*;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.BrokerId;
|
||||
import org.apache.activemq.memory.UsageManager;
|
||||
import org.apache.activemq.network.ConnectionFilter;
|
||||
|
@ -128,6 +129,7 @@ public class BrokerService implements Service, Serializable {
|
|||
private boolean useVirtualTopics=true;
|
||||
private BrokerId brokerId;
|
||||
private DestinationInterceptor[] destinationInterceptors;
|
||||
private ActiveMQDestination[] destinations;
|
||||
|
||||
/**
|
||||
* Adds a new transport connector for the given bind address
|
||||
|
@ -355,6 +357,8 @@ public class BrokerService implements Service, Serializable {
|
|||
|
||||
BrokerRegistry.getInstance().bind(getBrokerName(), this);
|
||||
|
||||
startDestinations();
|
||||
|
||||
addShutdownHook();
|
||||
if (deleteAllMessagesOnStartup) {
|
||||
deleteAllMessages();
|
||||
|
@ -833,11 +837,22 @@ public class BrokerService implements Service, Serializable {
|
|||
/**
|
||||
* Sets whether or not
|
||||
* <a href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual Topics</a>
|
||||
* should be supported.
|
||||
* should be supported by defaut if they have not been explicitly configured.
|
||||
*/
|
||||
public void setUseVirtualTopics(boolean useVirtualTopics) {
|
||||
this.useVirtualTopics = useVirtualTopics;
|
||||
}
|
||||
|
||||
public DestinationInterceptor[] getDestinationInterceptors() {
|
||||
return destinationInterceptors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the destination interceptors to use
|
||||
*/
|
||||
public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
|
||||
this.destinationInterceptors = destinationInterceptors;
|
||||
}
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
|
@ -1169,7 +1184,23 @@ public class BrokerService implements Service, Serializable {
|
|||
System.err.println("Failed to shut down: " + e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Starts any configured destinations on startup
|
||||
*
|
||||
*/
|
||||
protected void startDestinations() throws Exception {
|
||||
if (destinations != null) {
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
context.setBroker(getBroker());
|
||||
|
||||
for (int i = 0; i < destinations.length; i++) {
|
||||
ActiveMQDestination destination = destinations[i];
|
||||
getBroker().addDestination(context, destination);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start all transport and network connections, proxies and bridges
|
||||
* @throws Exception
|
||||
|
|
|
@ -32,6 +32,8 @@ import java.util.Set;
|
|||
* href="http://incubator.apache.org/activemq/virtual-destinations.html">Virtual
|
||||
* Topics</a>.
|
||||
*
|
||||
* @org.apache.xbean.XBean
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class VirtualDestinationInterceptor implements DestinationInterceptor {
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.virtual;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.context.support.AbstractApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Neil Clayton
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class MultipleTestsWithSpringFactoryBeanTest extends TestCase {
|
||||
|
||||
protected static final Log log = LogFactory.getLog(MultipleTestsWithSpringFactoryBeanTest.class);
|
||||
|
||||
protected AbstractApplicationContext context;
|
||||
protected BrokerService service;
|
||||
private Connection connection;
|
||||
|
||||
public void test1() throws Exception {
|
||||
}
|
||||
|
||||
public void test2() throws Exception {
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
log.info("### starting up the test case: " + getName());
|
||||
|
||||
super.setUp();
|
||||
context = new ClassPathXmlApplicationContext("org/apache/activemq/xbean/spring2.xml");
|
||||
service = (BrokerService) context.getBean("broker");
|
||||
|
||||
// already started
|
||||
service.start();
|
||||
|
||||
connection = createConnectionFactory().createConnection();
|
||||
connection.start();
|
||||
log.info("### started up the test case: " + getName());
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
connection.close();
|
||||
|
||||
// stopped as part of the context
|
||||
service.stop();
|
||||
context.close();
|
||||
super.tearDown();
|
||||
|
||||
log.info("### closed down the test case: " + getName());
|
||||
}
|
||||
|
||||
protected ConnectionFactory createConnectionFactory() {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
factory.setBrokerURL("vm://localhost");
|
||||
return factory;
|
||||
}
|
||||
}
|
|
@ -13,7 +13,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
package org.apache.activemq.broker.virtual;
|
||||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
|
@ -42,7 +42,7 @@ public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
|
|||
ConsumerBean messageList = new ConsumerBean();
|
||||
messageList.setVerbose(true);
|
||||
|
||||
String queueAName = "Consumer.A.VirtualTopic.TEST";
|
||||
String queueAName = getVirtualTopicConsumerName();
|
||||
// create consumer 'cluster'
|
||||
ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
|
||||
ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
|
||||
|
@ -55,7 +55,7 @@ public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
|
|||
c2.setMessageListener(messageList);
|
||||
|
||||
// create topic producer
|
||||
MessageProducer producer = session.createProducer(new ActiveMQTopic("VirtualTopic.TEST"));
|
||||
MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
|
||||
assertNotNull(producer);
|
||||
|
||||
int total = 10;
|
||||
|
@ -67,6 +67,16 @@ public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
|
|||
}
|
||||
|
||||
|
||||
protected String getVirtualTopicName() {
|
||||
return "VirtualTopic.TEST";
|
||||
}
|
||||
|
||||
|
||||
protected String getVirtualTopicConsumerName() {
|
||||
return "Consumer.A.VirtualTopic.TEST";
|
||||
}
|
||||
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.virtual;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.xbean.XBeanBrokerFactory;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class VirtualTopicPubSubUsingXBeanTest extends VirtualTopicPubSubTest {
|
||||
|
||||
protected String getVirtualTopicConsumerName() {
|
||||
return "VirtualTopicConsumers.ConsumerNumberOne.FOO";
|
||||
}
|
||||
|
||||
protected String getVirtualTopicName() {
|
||||
return "FOO";
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
XBeanBrokerFactory factory = new XBeanBrokerFactory();
|
||||
BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
|
||||
|
||||
// lets disable persistence as we are a test
|
||||
answer.setPersistent(false);
|
||||
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected String getBrokerConfigUri() {
|
||||
return "org/apache/activemq/broker/virtual/global-virtual-topics.xml";
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright 2005-2006 The Apache Software Foundation
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- this file can only be parsed using the xbean-spring library -->
|
||||
<!-- START SNIPPET: xbean -->
|
||||
<beans>
|
||||
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||
|
||||
<broker xmlns="http://activemq.org/config/1.0">
|
||||
<destinationInterceptors>
|
||||
<virtualDestinationInterceptor>
|
||||
<virtualDestinations>
|
||||
<virtualTopic name=">" prefix="VirtualTopicConsumers.*."/>
|
||||
</virtualDestinations>
|
||||
</virtualDestinationInterceptor>
|
||||
</destinationInterceptors>
|
||||
|
||||
</broker>
|
||||
|
||||
</beans>
|
||||
<!-- END SNIPPET: xbean -->
|
Loading…
Reference in New Issue