diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index 4cd016b905..a343681b60 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -197,9 +197,13 @@ public interface Broker extends Region, Service { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable; /** - * * @return true if the broker is running as a slave */ public boolean isSlaveBroker(); + /** + * @return true if the broker has stopped + */ + public boolean isStopped(); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 38cac4f589..fabc48961b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -183,5 +183,9 @@ public class BrokerFilter implements Broker { public boolean isSlaveBroker(){ return next.isSlaveBroker(); } + + public boolean isStopped(){ + return next.isStopped(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 6c2346fe86..6c6d1d6c7f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -181,5 +181,9 @@ public class EmptyBroker implements Broker{ public boolean isSlaveBroker(){ return false; } + + public boolean isStopped(){ + return false; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index fe5ee6d2dd..18097982bc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -180,5 +180,9 @@ public class ErrorBroker implements Broker { throw new IllegalStateException(this.message); } + public boolean isStopped(){ + return true; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index d828dc67af..5946ec24c8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -193,5 +193,9 @@ public class MutableBrokerFilter implements Broker { public boolean isSlaveBroker(){ return getNext().isSlaveBroker(); } + + public boolean isStopped(){ + return getNext().isStopped(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 1d0d78b5f6..d5946eaca2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -67,6 +67,7 @@ public class RegionBroker implements Broker { private final Region tempQueueRegion; private final Region tempTopicRegion; private BrokerService brokerService; + private boolean stopped = false; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); @@ -130,6 +131,7 @@ public class RegionBroker implements Broker { } public void stop() throws Exception { + stopped = true; ServiceStopper ss = new ServiceStopper(); ss.stop(queueRegion); ss.stop(topicRegion); @@ -442,6 +444,10 @@ public class RegionBroker implements Broker { public boolean isSlaveBroker(){ return brokerService.isSlave(); } + + public boolean isStopped(){ + return stopped; + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java index e52d0e652f..feb43008fc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java @@ -88,8 +88,9 @@ public class VMTransportFactory extends TransportFactory { location = new URI("vm://"+host); } - VMTransportServer server = (VMTransportServer) servers.get(host); - if( server == null ) { + VMTransportServer server = (VMTransportServer) servers.get(host); + //validate the broker is still active + if( !validateBroker(host) || server == null ) { BrokerService broker = BrokerRegistry.getInstance().lookup(host); if (broker == null) { try { @@ -112,6 +113,8 @@ public class VMTransportFactory extends TransportFactory { connector.start(); connectors.put(host, connector); } + }else { + } VMTransport vmtransport = server.connect(); @@ -171,4 +174,25 @@ public class VMTransportFactory extends TransportFactory { this.brokerFactoryHandler = brokerFactoryHandler; } + + private boolean validateBroker(String host){ + boolean result=true; + if(brokers.containsKey(host)||servers.containsKey(host)||connectors.containsKey(host)){ + //check the broker is still in the BrokerRegistry + TransportConnector connector=(TransportConnector) connectors.get(host); + if(BrokerRegistry.getInstance().lookup(host)==null||(connector!=null&&connector.getBroker().isStopped())){ + result=false; + //clean-up + brokers.remove(host); + servers.remove(host); + if(connector!=null){ + connectors.remove(host); + if(connector!=null){ + ServiceSupport.dispose(connector); + } + } + } + } + return result; + } }