mirror of https://github.com/apache/activemq.git
If multiple concurrent threads were creating vm://localhost connection, it was possible multiple "localhost" broker would be started.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@388082 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
47f2bb7cc2
commit
61fef743e8
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -29,22 +29,30 @@ public class BrokerRegistry {
|
|||
public static BrokerRegistry getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
ConcurrentHashMap brokers = new ConcurrentHashMap();
|
||||
|
||||
private BrokerRegistry() {
|
||||
}
|
||||
|
||||
private final Object mutex = new Object();
|
||||
private final HashMap brokers = new HashMap();
|
||||
|
||||
public BrokerService lookup(String brokerName) {
|
||||
return (BrokerService)brokers.get(brokerName);
|
||||
synchronized(mutex) {
|
||||
return (BrokerService)brokers.get(brokerName);
|
||||
}
|
||||
}
|
||||
|
||||
public void bind(String brokerName, BrokerService broker) {
|
||||
brokers.put(brokerName, broker);
|
||||
synchronized(mutex) {
|
||||
brokers.put(brokerName, broker);
|
||||
}
|
||||
}
|
||||
|
||||
public void unbind(String brokerName) {
|
||||
brokers.remove(brokerName);
|
||||
synchronized(mutex) {
|
||||
brokers.remove(brokerName);
|
||||
}
|
||||
}
|
||||
|
||||
public Object getRegistryMutext() {
|
||||
return mutex;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -86,29 +86,38 @@ public class VMTransportFactory extends TransportFactory{
|
|||
VMTransportServer server=(VMTransportServer) servers.get(host);
|
||||
// validate the broker is still active
|
||||
if(!validateBroker(host)||server==null){
|
||||
BrokerService broker=BrokerRegistry.getInstance().lookup(host);
|
||||
if(broker==null){
|
||||
try{
|
||||
if(brokerFactoryHandler!=null){
|
||||
broker=brokerFactoryHandler.createBroker(brokerURI);
|
||||
}else{
|
||||
broker=BrokerFactory.createBroker(brokerURI);
|
||||
BrokerService broker=null;
|
||||
// Synchronize on the registry so that multiple concurrent threads
|
||||
// doing this do not think that the broker has not been created and cause multiple
|
||||
// brokers to be started.
|
||||
synchronized( BrokerRegistry.getInstance().getRegistryMutext() ) {
|
||||
broker=BrokerRegistry.getInstance().lookup(host);
|
||||
if(broker==null){
|
||||
try{
|
||||
if(brokerFactoryHandler!=null){
|
||||
broker=brokerFactoryHandler.createBroker(brokerURI);
|
||||
}else{
|
||||
broker=BrokerFactory.createBroker(brokerURI);
|
||||
}
|
||||
broker.start();
|
||||
}catch(URISyntaxException e){
|
||||
throw IOExceptionSupport.create(e);
|
||||
}
|
||||
broker.start();
|
||||
}catch(URISyntaxException e){
|
||||
throw IOExceptionSupport.create(e);
|
||||
brokers.put(host,broker);
|
||||
}
|
||||
brokers.put(host,broker);
|
||||
|
||||
server=(VMTransportServer) servers.get(host);
|
||||
if(server==null){
|
||||
server=(VMTransportServer) bind(location,true);
|
||||
TransportConnector connector=new TransportConnector(broker.getBroker(),server);
|
||||
connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() );
|
||||
connector.start();
|
||||
connectors.put(host,connector);
|
||||
}
|
||||
|
||||
}
|
||||
server=(VMTransportServer) servers.get(host);
|
||||
if(server==null){
|
||||
server=(VMTransportServer) bind(location,true);
|
||||
TransportConnector connector=new TransportConnector(broker.getBroker(),server);
|
||||
connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() );
|
||||
connector.start();
|
||||
connectors.put(host,connector);
|
||||
}
|
||||
}else{}
|
||||
}
|
||||
|
||||
VMTransport vmtransport=server.connect();
|
||||
IntrospectionSupport.setProperties(vmtransport,options);
|
||||
Transport transport=vmtransport;
|
||||
|
|
Loading…
Reference in New Issue