https://issues.apache.org/jira/browse/AMQ-3696 - start broker asynchronously since hanging in start() method leads to problems with stopping slaves in osgi

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1362950 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-07-18 13:43:09 +00:00
parent bf62dc88c1
commit 7f89b33aa4
2 changed files with 98 additions and 48 deletions

View File

@ -115,6 +115,9 @@ public class StartCommand extends AbstractCommand {
BrokerService broker = BrokerFactory.createBroker(configURI);
brokers.add(broker);
broker.start();
if (!broker.waitUntilStarted()) {
throw broker.getStartException();
}
}
/**

View File

@ -221,6 +221,10 @@ public class BrokerService implements Service {
private int offlineDurableSubscriberTaskSchedule = 300000;
private DestinationFilter virtualConsumerDestinationFilter;
private final Object persistenceAdapterLock = new Object();
private boolean persistenceAdapterStarted = false;
private Exception startException = null;
static {
String localHostName = "localhost";
try {
@ -517,54 +521,8 @@ public class BrokerService implements Service {
startManagementContext();
}
getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
getPersistenceAdapter().setBrokerName(getBrokerName());
LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
if (deleteAllMessagesOnStartup) {
deleteAllMessages();
}
getPersistenceAdapter().start();
slave = false;
startDestinations();
addShutdownHook();
getBroker().start();
if (isUseJmx()) {
if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
// try to restart management context
// typical for slaves that use the same ports as master
managementContext.stop();
startManagementContext();
}
ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
managedBroker.setContextBroker(broker);
adminView.setBroker(managedBroker);
}
BrokerRegistry.getInstance().bind(getBrokerName(), this);
// see if there is a MasterBroker service and if so, configure
// it and start it.
for (Service service : services) {
if (service instanceof MasterConnector) {
configureService(service);
service.start();
}
}
if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
startAllConnectors();
}
if (!stopped.get()) {
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
}
if (brokerId == null) {
brokerId = broker.getBrokerId();
}
if (ioExceptionHandler == null) {
setIoExceptionHandler(new DefaultIOExceptionHandler());
}
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
getBroker().brokerServiceStarted();
checkSystemUsageLimits();
startPersistenceAdapter();
startBroker();
startedLatch.countDown();
} catch (Exception e) {
LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
@ -581,6 +539,88 @@ public class BrokerService implements Service {
}
}
private void startPersistenceAdapter() throws Exception {
new Thread() {
@Override
public void run() {
try {
getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
getPersistenceAdapter().setBrokerName(getBrokerName());
LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
if (deleteAllMessagesOnStartup) {
deleteAllMessages();
}
getPersistenceAdapter().start();
} catch (Exception e) {
startException = e;
} finally {
synchronized (persistenceAdapterLock) {
persistenceAdapterLock.notifyAll();
}
}
}
}.start();
}
private void startBroker() throws Exception {
new Thread() {
@Override
public void run() {
try {
synchronized (persistenceAdapterLock) {
persistenceAdapterLock.wait();
}
if (startException != null) {
return;
}
slave = false;
startDestinations();
addShutdownHook();
getBroker().start();
if (isUseJmx()) {
if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
// try to restart management context
// typical for slaves that use the same ports as master
managementContext.stop();
startManagementContext();
}
ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
managedBroker.setContextBroker(broker);
adminView.setBroker(managedBroker);
}
BrokerRegistry.getInstance().bind(getBrokerName(), BrokerService.this);
// see if there is a MasterBroker service and if so, configure
// it and start it.
for (Service service : services) {
if (service instanceof MasterConnector) {
configureService(service);
service.start();
}
}
if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
startAllConnectors();
}
if (!stopped.get()) {
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
}
if (brokerId == null) {
brokerId = broker.getBrokerId();
}
if (ioExceptionHandler == null) {
setIoExceptionHandler(new DefaultIOExceptionHandler());
}
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
getBroker().brokerServiceStarted();
checkSystemUsageLimits();
} catch (Exception e) {
startException = e;
}
}
}.start();
}
/**
*
* @throws Exception
@ -783,6 +823,9 @@ public class BrokerService implements Service {
boolean waitSucceeded = false;
while (isStarted() && !stopped.get() && !waitSucceeded) {
try {
if (startException != null) {
return waitSucceeded;
}
waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
@ -2718,4 +2761,8 @@ public class BrokerService implements Service {
return isUseVirtualTopics() && destination.isQueue() &&
getVirtualTopicConsumerDestinationFilter().matches(destination);
}
public Exception getStartException() {
return startException;
}
}