diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index ac325be8ea..69881989ae 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -63,6 +63,7 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ private String brokerXmlConfig; private BrokerService broker; private ActiveMQConnectionFactory connectionFactory; + private Thread brokerStartThread; /** * @@ -77,12 +78,28 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { this.bootstrapContext = bootstrapContext; if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) { + brokerStartThread = new Thread("Starting ActiveMQ Broker") { + public void run () { + try { + synchronized( ActiveMQResourceAdapter.this ) { + broker = BrokerFactory.createBroker(new URI(brokerXmlConfig)); + } + broker.start(); + } catch (Throwable e) { + LOG.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage()); + LOG.debug("Reason for: "+e.getMessage(), e); + } + } + }; + brokerStartThread.setDaemon(true); + brokerStartThread.start(); + + // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it.. try { - broker = BrokerFactory.createBroker(new URI(brokerXmlConfig)); - broker.start(); - } catch (Throwable e) { - throw new ResourceAdapterInternalException("Failed to startup an embedded broker: " + brokerXmlConfig + ", due to: " + e, e); - } + brokerStartThread.join(1000*5); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } @@ -176,10 +193,17 @@ public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializ ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next(); endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); } - if (broker != null) { - ServiceSupport.dispose(broker); - broker = null; + + synchronized( this ) { + if (broker != null) { + if( brokerStartThread.isAlive() ) { + brokerStartThread.interrupt(); + } + ServiceSupport.dispose(broker); + broker = null; + } } + this.bootstrapContext = null; }