From 3326ab91769493ba2bc9370bee73e87a2993c059 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 1 Jun 2011 16:08:37 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3347 - restart network connectors git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1130203 13f79535-47bb-0310-9956-ffa450edef68 --- .../network/DiscoveryNetworkConnector.java | 1 + .../activemq/network/NetworkConnector.java | 4 + .../apache/activemq/util/ServiceSupport.java | 1 + .../activemq/network/NetworkRestartTest.java | 149 ++++++++++++++++++ .../apache/activemq/network/localBroker.xml | 3 +- 5 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 4dbbb49231..40173f4f74 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -195,6 +195,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco stopper.onException(this, e); } } + bridges.clear(); try { this.discoveryAgent.stop(); } catch (Exception e) { diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java index 68ba4600c3..96efb0dadb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -217,6 +217,10 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem LOG.info("Network Connector " + this + " Stopped"); } + public boolean isStarted() { + return serviceSupport.isStarted(); + } + public ObjectName getObjectName() { return objectName; } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java index 50a781181e..821810fb85 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java @@ -49,6 +49,7 @@ public abstract class ServiceSupport implements Service { public void start() throws Exception { if (started.compareAndSet(false, true)) { boolean success = false; + stopped.set(false); try { doStart(); success = true; diff --git a/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java b/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java new file mode 100644 index 0000000000..fa3b408552 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/NetworkRestartTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + +import javax.jms.*; + +public class NetworkRestartTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(NetworkRestartTest.class); + + protected Connection localConnection; + protected Connection remoteConnection; + protected BrokerService localBroker; + protected BrokerService remoteBroker; + protected Session localSession; + protected Session remoteSession; + + protected ActiveMQQueue included=new ActiveMQQueue("include.test.foo"); + + + public void testConnectorRestart() throws Exception { + MessageConsumer remoteConsumer = remoteSession.createConsumer(included); + MessageProducer localProducer = localSession.createProducer(included); + + localProducer.send(localSession.createTextMessage("before")); + Message before = remoteConsumer.receive(1000); + assertNotNull(before); + assertEquals("before", ((TextMessage)before).getText()); + + // restart connector + + NetworkConnector connector = localBroker.getNetworkConnectorByName("networkConnector"); + + LOG.info("Stopping connector"); + connector.stop(); + + Thread.sleep(5000); + LOG.info("Starting connector"); + connector.start(); + + Thread.sleep(5000); + + + localProducer.send(localSession.createTextMessage("after")); + Message after = remoteConsumer.receive(1000); + assertNotNull(after); + assertEquals("after", ((TextMessage)after).getText()); + + } + + + protected void setUp() throws Exception { + super.setUp(); + doSetUp(); + } + + protected void tearDown() throws Exception { + localBroker.deleteAllMessages(); + remoteBroker.deleteAllMessages(); + doTearDown(); + super.tearDown(); + } + + protected void doTearDown() throws Exception { + localConnection.close(); + remoteConnection.close(); + localBroker.stop(); + localBroker.waitUntilStopped(); + remoteBroker.stop(); + remoteBroker.waitUntilStopped(); + } + + protected void doSetUp() throws Exception { + + remoteBroker = createRemoteBroker(); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + localBroker = createLocalBroker(); + localBroker.start(); + localBroker.waitUntilStarted(); + + String localURI = "tcp://localhost:61616"; + String remoteURI = "tcp://localhost:61617"; + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + localConnection = fac.createConnection(); + localConnection.setClientID("local"); + localConnection.start(); + + fac = new ActiveMQConnectionFactory(remoteURI); + fac.setWatchTopicAdvisories(false); + remoteConnection = fac.createConnection(); + remoteConnection.setClientID("remote"); + remoteConnection.start(); + + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected String getRemoteBrokerURI() { + return "org/apache/activemq/network/remoteBroker.xml"; + } + + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/localBroker.xml"; + } + + protected BrokerService createBroker(String uri) throws Exception { + Resource resource = new ClassPathResource(uri); + BrokerFactoryBean factory = new BrokerFactoryBean(resource); + resource = new ClassPathResource(uri); + factory = new BrokerFactoryBean(resource); + factory.afterPropertiesSet(); + BrokerService result = factory.getBroker(); + return result; + } + + protected BrokerService createLocalBroker() throws Exception { + return createBroker(getLocalBrokerURI()); + } + + protected BrokerService createRemoteBroker() throws Exception { + return createBroker(getRemoteBrokerURI()); + } + +} diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml index 6689c590b1..9bf75f4038 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml @@ -28,7 +28,8 @@ + decreaseNetworkConsumerPriority = "false" + name="networkConnector">