diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 0dced0294d..57813797fe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -329,7 +329,7 @@ public class BrokerService implements Service { addShutdownHook(); if (deleteAllMessagesOnStartup) { - getPersistenceAdapter().deleteAllMessages(); + deleteAllMessages(); } if (isUseJmx()) { @@ -688,6 +688,14 @@ public class BrokerService implements Service { public void setPlugins(BrokerPlugin[] plugins) { this.plugins = plugins; } + + /** + * Delete all messages from the persistent store + * @throws IOException + */ + public void deleteAllMessages() throws IOException{ + getPersistenceAdapter().deleteAllMessages(); + } // Implementation methods // ------------------------------------------------------------------------- diff --git a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index 4f6f0e5691..a8a60f50fb 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -1,18 +1,15 @@ /** - * + * * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.network; @@ -31,102 +28,115 @@ import org.apache.activemq.xbean.BrokerFactoryBean; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; - -public class SimpleNetworkTest extends TestCase { - - protected static final int MESSAGE_COUNT = 10; +public class SimpleNetworkTest extends TestCase{ + protected static final int MESSAGE_COUNT=10; protected AbstractApplicationContext context; protected Connection localConnection; protected Connection remoteConnection; protected BrokerService localBroker; protected BrokerService remoteBroker; - - + protected Session localSession; + protected Session remoteSession; + protected ActiveMQTopic included; + protected ActiveMQTopic excluded; + protected String consumerName="durableSubs"; - protected void setUp() throws Exception { - - super.setUp(); - Resource resource = new ClassPathResource("org/apache/activemq/network/localBroker.xml"); - BrokerFactoryBean factory = new BrokerFactoryBean(resource); - factory.afterPropertiesSet(); - localBroker = factory.getBroker(); - - resource = new ClassPathResource("org/apache/activemq/network/remoteBroker.xml"); - factory = new BrokerFactoryBean(resource); - factory.afterPropertiesSet(); - remoteBroker = factory.getBroker(); - - localBroker.start(); - remoteBroker.start(); - - URI localURI = localBroker.getVmConnectorURI(); - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); - localConnection = fac.createConnection(); - localConnection.start(); - - URI remoteURI = remoteBroker.getVmConnectorURI(); - fac = new ActiveMQConnectionFactory(remoteURI); - remoteConnection = fac.createConnection(); - remoteConnection.start(); - - } - - - protected void tearDown() throws Exception { - localConnection.close(); - remoteConnection.close(); - localBroker.stop(); - remoteBroker.stop(); - super.tearDown(); - } - - public void testFiltering() throws Exception{ - ActiveMQTopic included = new ActiveMQTopic("include.test.bar"); - ActiveMQTopic excluded = new ActiveMQTopic("exclude.test.bar"); - Session localSession = localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); - Session remoteSession = remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); - MessageConsumer includedConsumer = remoteSession.createConsumer(included); - MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded); - MessageProducer includedProducer = localSession.createProducer(included); - MessageProducer excludedProducer = localSession.createProducer(excluded); - Thread.sleep(1000); - - Message test = localSession.createTextMessage("test"); - includedProducer.send(test); - excludedProducer.send(test); - - assertNull(excludedConsumer.receive(500)); - assertNotNull(includedConsumer.receive(500)); - } - - public void testConduitBridge() throws Exception{ - ActiveMQTopic included = new ActiveMQTopic("include.test.bar"); - - Session localSession = localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); - Session remoteSession = remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer1 = remoteSession.createConsumer(included); - MessageConsumer consumer2 = remoteSession.createConsumer(included); - MessageProducer producer = localSession.createProducer(included); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - + MessageConsumer includedConsumer=remoteSession.createConsumer(included); + MessageConsumer excludedConsumer=remoteSession.createConsumer(excluded); + MessageProducer includedProducer=localSession.createProducer(included); + MessageProducer excludedProducer=localSession.createProducer(excluded); Thread.sleep(1000); - - - int count = 10; - for (int i = 0; i < count; i++){ - Message test = localSession.createTextMessage("test-" + i); + Message test=localSession.createTextMessage("test"); + includedProducer.send(test); + excludedProducer.send(test); + assertNull(excludedConsumer.receive(500)); + assertNotNull(includedConsumer.receive(500)); + } + + public void testConduitBridge() throws Exception{ + MessageConsumer consumer1=remoteSession.createConsumer(included); + MessageConsumer consumer2=remoteSession.createConsumer(included); + MessageProducer producer=localSession.createProducer(included); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + Thread.sleep(1000); + for(int i=0;i - + + dynamicOnly = false + conduitSubscriptions = true + decreaseNetworkConsumerPriority = false diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml index 08ab241632..b337304a0e 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml @@ -16,7 +16,7 @@ --> - +