Start up the embeded broker in an async thread so that it does not block the AppServer startup if a slave broker is started.


git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@609016 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-01-04 21:38:44 +00:00
parent 1a2e583595
commit 43d58b57b7
1 changed files with 32 additions and 8 deletions

View File

@ -63,6 +63,7 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
private String brokerXmlConfig;
private BrokerService broker;
private ActiveMQConnectionFactory connectionFactory;
private Thread brokerStartThread;
/**
*
@ -77,12 +78,28 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
this.bootstrapContext = bootstrapContext;
if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
brokerStartThread = new Thread("Starting ActiveMQ Broker") {
public void run () {
try {
synchronized( ActiveMQResourceAdapter.this ) {
broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
}
broker.start();
} catch (Throwable e) {
LOG.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
LOG.debug("Reason for: "+e.getMessage(), e);
}
}
};
brokerStartThread.setDaemon(true);
brokerStartThread.start();
// Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it..
try {
broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
broker.start();
} catch (Throwable e) {
throw new ResourceAdapterInternalException("Failed to startup an embedded broker: " + brokerXmlConfig + ", due to: " + e, e);
}
brokerStartThread.join(1000*5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@ -176,10 +193,17 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ
ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
}
if (broker != null) {
ServiceSupport.dispose(broker);
broker = null;
synchronized( this ) {
if (broker != null) {
if( brokerStartThread.isAlive() ) {
brokerStartThread.interrupt();
}
ServiceSupport.dispose(broker);
broker = null;
}
}
this.bootstrapContext = null;
}