updated to test durable subscriptions get propagated

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@379767 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-02-22 13:28:04 +00:00
parent 6f82a857c7
commit 9c4223615d
4 changed files with 119 additions and 98 deletions

View File

@ -329,7 +329,7 @@ public class BrokerService implements Service {
addShutdownHook(); addShutdownHook();
if (deleteAllMessagesOnStartup) { if (deleteAllMessagesOnStartup) {
getPersistenceAdapter().deleteAllMessages(); deleteAllMessages();
} }
if (isUseJmx()) { if (isUseJmx()) {
@ -688,6 +688,14 @@ public class BrokerService implements Service {
public void setPlugins(BrokerPlugin[] plugins) { public void setPlugins(BrokerPlugin[] plugins) {
this.plugins = plugins; this.plugins = plugins;
} }
/**
* Delete all messages from the persistent store
* @throws IOException
*/
public void deleteAllMessages() throws IOException{
getPersistenceAdapter().deleteAllMessages();
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@ -1,18 +1,15 @@
/** /**
* *
* Copyright 2005-2006 The Apache Software Foundation * Copyright 2005-2006 The Apache Software Foundation
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* you may not use this file except in compliance with the License. * the License. You may obtain a copy of the License at
* You may obtain a copy of the License at *
*
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
@ -31,102 +28,115 @@ import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
public class SimpleNetworkTest extends TestCase{
public class SimpleNetworkTest extends TestCase { protected static final int MESSAGE_COUNT=10;
protected static final int MESSAGE_COUNT = 10;
protected AbstractApplicationContext context; protected AbstractApplicationContext context;
protected Connection localConnection; protected Connection localConnection;
protected Connection remoteConnection; protected Connection remoteConnection;
protected BrokerService localBroker; protected BrokerService localBroker;
protected BrokerService remoteBroker; protected BrokerService remoteBroker;
protected Session localSession;
protected Session remoteSession;
protected ActiveMQTopic included;
protected ActiveMQTopic excluded;
protected String consumerName="durableSubs";
protected void setUp() throws Exception {
super.setUp();
Resource resource = new ClassPathResource("org/apache/activemq/network/localBroker.xml");
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
localBroker = factory.getBroker();
resource = new ClassPathResource("org/apache/activemq/network/remoteBroker.xml");
factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
remoteBroker = factory.getBroker();
localBroker.start();
remoteBroker.start();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
localConnection = fac.createConnection();
localConnection.start();
URI remoteURI = remoteBroker.getVmConnectorURI();
fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.start();
}
protected void tearDown() throws Exception {
localConnection.close();
remoteConnection.close();
localBroker.stop();
remoteBroker.stop();
super.tearDown();
}
public void testFiltering() throws Exception{ public void testFiltering() throws Exception{
ActiveMQTopic included = new ActiveMQTopic("include.test.bar"); MessageConsumer includedConsumer=remoteSession.createConsumer(included);
ActiveMQTopic excluded = new ActiveMQTopic("exclude.test.bar"); MessageConsumer excludedConsumer=remoteSession.createConsumer(excluded);
Session localSession = localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageProducer includedProducer=localSession.createProducer(included);
Session remoteSession = remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageProducer excludedProducer=localSession.createProducer(excluded);
MessageConsumer includedConsumer = remoteSession.createConsumer(included);
MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded);
MessageProducer includedProducer = localSession.createProducer(included);
MessageProducer excludedProducer = localSession.createProducer(excluded);
Thread.sleep(1000);
Message test = localSession.createTextMessage("test");
includedProducer.send(test);
excludedProducer.send(test);
assertNull(excludedConsumer.receive(500));
assertNotNull(includedConsumer.receive(500));
}
public void testConduitBridge() throws Exception{
ActiveMQTopic included = new ActiveMQTopic("include.test.bar");
Session localSession = localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Session remoteSession = remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageConsumer consumer2 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Thread.sleep(1000); Thread.sleep(1000);
Message test=localSession.createTextMessage("test");
includedProducer.send(test);
int count = 10; excludedProducer.send(test);
for (int i = 0; i < count; i++){ assertNull(excludedConsumer.receive(500));
Message test = localSession.createTextMessage("test-" + i); assertNotNull(includedConsumer.receive(500));
}
public void testConduitBridge() throws Exception{
MessageConsumer consumer1=remoteSession.createConsumer(included);
MessageConsumer consumer2=remoteSession.createConsumer(included);
MessageProducer producer=localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Thread.sleep(1000);
for(int i=0;i<MESSAGE_COUNT;i++){
Message test=localSession.createTextMessage("test-"+i);
producer.send(test); producer.send(test);
assertNotNull(consumer1.receive(500)); assertNotNull(consumer1.receive(500));
assertNotNull(consumer2.receive(500)); assertNotNull(consumer2.receive(500));
} }
// ensure no more messages received
//ensure no more messages received
assertNull(consumer1.receive(500)); assertNull(consumer1.receive(500));
assertNull(consumer2.receive(500)); assertNull(consumer2.receive(500));
} }
public void testDurableStoreAndForward() throws Exception{
// create a remote durable consumer
MessageConsumer remoteConsumer=remoteSession.createDurableSubscriber(included,consumerName);
Thread.sleep(1000);
// now close everything down and restart
doTearDown();
doSetUp();
MessageProducer producer=localSession.createProducer(included);
for(int i=0;i<MESSAGE_COUNT;i++){
Message test=localSession.createTextMessage("test-"+i);
producer.send(test);
}
Thread.sleep(1000);
// close everything down and restart
doTearDown();
doSetUp();
remoteConsumer=remoteSession.createDurableSubscriber(included,consumerName);
for(int i=0;i<MESSAGE_COUNT;i++){
Message test=localSession.createTextMessage("test-"+i);
assertNotNull(remoteConsumer.receive(500));
}
}
protected void setUp() throws Exception{
super.setUp();
doSetUp();
}
protected void tearDown() throws Exception{
localBroker.deleteAllMessages();
remoteBroker.deleteAllMessages();
doTearDown();
super.tearDown();
}
protected void doTearDown() throws Exception{
localConnection.close();
remoteConnection.close();
localBroker.stop();
remoteBroker.stop();
}
protected void doSetUp() throws Exception{
Resource resource=new ClassPathResource("org/apache/activemq/network/localBroker.xml");
BrokerFactoryBean factory=new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
localBroker=factory.getBroker();
resource=new ClassPathResource("org/apache/activemq/network/remoteBroker.xml");
factory=new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
remoteBroker=factory.getBroker();
localBroker.start();
remoteBroker.start();
URI localURI=localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac=new ActiveMQConnectionFactory(localURI);
localConnection=fac.createConnection();
localConnection.setClientID("local");
localConnection.start();
URI remoteURI=remoteBroker.getVmConnectorURI();
fac=new ActiveMQConnectionFactory(remoteURI);
remoteConnection=fac.createConnection();
remoteConnection.setClientID("remote");
remoteConnection.start();
included=new ActiveMQTopic("include.test.bar");
excluded=new ActiveMQTopic("exclude.test.bar");
localSession=localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
remoteSession=remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
}
} }

View File

@ -16,13 +16,16 @@
--> -->
<beans xmlns="http://activemq.org/config/1.0"> <beans xmlns="http://activemq.org/config/1.0">
<broker brokerName="localBroker" persistent="false" useShutdownHook="false" deleteAllMessagesOnStartup="true"> <broker brokerName="localBroker" persistent="true" useShutdownHook="false">
<transportConnectors> <transportConnectors>
<transportConnector uri="tcp://localhost:61616"/> <transportConnector uri="tcp://localhost:61616"/>
</transportConnectors> </transportConnectors>
<networkConnectors> <networkConnectors>
<networkConnector uri="static://(tcp://localhost:61617)"> <networkConnector uri="static://(tcp://localhost:61617)">
dynamicOnly = false
conduitSubscriptions = true
decreaseNetworkConsumerPriority = false
<excludedDestinations> <excludedDestinations>
<queue physicalName="exclude.test.foo"/> <queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/> <topic physicalName="exclude.test.bar"/>

View File

@ -16,7 +16,7 @@
--> -->
<beans xmlns="http://activemq.org/config/1.0"> <beans xmlns="http://activemq.org/config/1.0">
<broker brokerName="remoteBroker" persistent="false" useShutdownHook="false" deleteAllMessagesOnStartup="true"> <broker brokerName="remoteBroker" persistent="true" useShutdownHook="false">
<transportConnectors> <transportConnectors>
<transportConnector uri="tcp://localhost:61617"/> <transportConnector uri="tcp://localhost:61617"/>
</transportConnectors> </transportConnectors>