mirror of https://github.com/apache/activemq.git
check if a broker held by the TransportConnector is stopped before asigning it for re-use (i.e. VMTransportServer)
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@372910 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4e5012ca5d
commit
233b604642
|
@ -197,9 +197,13 @@ public interface Broker extends Region, Service {
|
|||
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable;
|
||||
|
||||
/**
|
||||
*
|
||||
* @return true if the broker is running as a slave
|
||||
*/
|
||||
public boolean isSlaveBroker();
|
||||
|
||||
/**
|
||||
* @return true if the broker has stopped
|
||||
*/
|
||||
public boolean isStopped();
|
||||
|
||||
}
|
||||
|
|
|
@ -184,4 +184,8 @@ public class BrokerFilter implements Broker {
|
|||
return next.isSlaveBroker();
|
||||
}
|
||||
|
||||
public boolean isStopped(){
|
||||
return next.isStopped();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -182,4 +182,8 @@ public class EmptyBroker implements Broker{
|
|||
return false;
|
||||
}
|
||||
|
||||
public boolean isStopped(){
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -180,5 +180,9 @@ public class ErrorBroker implements Broker {
|
|||
throw new IllegalStateException(this.message);
|
||||
}
|
||||
|
||||
public boolean isStopped(){
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -194,4 +194,8 @@ public class MutableBrokerFilter implements Broker {
|
|||
return getNext().isSlaveBroker();
|
||||
}
|
||||
|
||||
public boolean isStopped(){
|
||||
return getNext().isStopped();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ public class RegionBroker implements Broker {
|
|||
private final Region tempQueueRegion;
|
||||
private final Region tempTopicRegion;
|
||||
private BrokerService brokerService;
|
||||
private boolean stopped = false;
|
||||
|
||||
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||
|
||||
|
@ -130,6 +131,7 @@ public class RegionBroker implements Broker {
|
|||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
stopped = true;
|
||||
ServiceStopper ss = new ServiceStopper();
|
||||
ss.stop(queueRegion);
|
||||
ss.stop(topicRegion);
|
||||
|
@ -443,6 +445,10 @@ public class RegionBroker implements Broker {
|
|||
return brokerService.isSlave();
|
||||
}
|
||||
|
||||
public boolean isStopped(){
|
||||
return stopped;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -89,7 +89,8 @@ public class VMTransportFactory extends TransportFactory {
|
|||
}
|
||||
|
||||
VMTransportServer server = (VMTransportServer) servers.get(host);
|
||||
if( server == null ) {
|
||||
//validate the broker is still active
|
||||
if( !validateBroker(host) || server == null ) {
|
||||
BrokerService broker = BrokerRegistry.getInstance().lookup(host);
|
||||
if (broker == null) {
|
||||
try {
|
||||
|
@ -112,6 +113,8 @@ public class VMTransportFactory extends TransportFactory {
|
|||
connector.start();
|
||||
connectors.put(host, connector);
|
||||
}
|
||||
}else {
|
||||
|
||||
}
|
||||
|
||||
VMTransport vmtransport = server.connect();
|
||||
|
@ -171,4 +174,25 @@ public class VMTransportFactory extends TransportFactory {
|
|||
this.brokerFactoryHandler = brokerFactoryHandler;
|
||||
}
|
||||
|
||||
|
||||
private boolean validateBroker(String host){
|
||||
boolean result=true;
|
||||
if(brokers.containsKey(host)||servers.containsKey(host)||connectors.containsKey(host)){
|
||||
//check the broker is still in the BrokerRegistry
|
||||
TransportConnector connector=(TransportConnector) connectors.get(host);
|
||||
if(BrokerRegistry.getInstance().lookup(host)==null||(connector!=null&&connector.getBroker().isStopped())){
|
||||
result=false;
|
||||
//clean-up
|
||||
brokers.remove(host);
|
||||
servers.remove(host);
|
||||
if(connector!=null){
|
||||
connectors.remove(host);
|
||||
if(connector!=null){
|
||||
ServiceSupport.dispose(connector);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue