https://issues.apache.org/jira/browse/AMQ-3696 - make start() sync because of the tests and embedded brokers and introduce startAsync() to be used by XBean

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1362979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-07-18 15:25:25 +00:00
parent 3adc88fa94
commit ed58e9a05d
3 changed files with 104 additions and 79 deletions

View File

@ -116,7 +116,7 @@ public class StartCommand extends AbstractCommand {
brokers.add(broker);
broker.start();
if (!broker.waitUntilStarted()) {
throw broker.getStartException();
throw new Exception(broker.getStartException());
}
}

View File

@ -223,7 +223,7 @@ public class BrokerService implements Service {
private final Object persistenceAdapterLock = new Object();
private boolean persistenceAdapterStarted = false;
private Exception startException = null;
private Throwable startException = null;
static {
String localHostName = "localhost";
@ -496,11 +496,19 @@ public class BrokerService implements Service {
@PostConstruct
public void autoStart() throws Exception {
if(shouldAutostart()) {
start();
startAsync();
}
}
public void start() throws Exception {
doStart(false);
}
public void startAsync() throws Exception {
doStart(true);
}
public void doStart(boolean async) throws Exception {
if (stopped.get() || !started.compareAndSet(false, true)) {
// lets just ignore redundant start() calls
// as its way too easy to not be completely sure if start() has been
@ -521,8 +529,8 @@ public class BrokerService implements Service {
startManagementContext();
}
startPersistenceAdapter();
startBroker();
startPersistenceAdapter(async);
startBroker(async);
startedLatch.countDown();
} catch (Exception e) {
LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
@ -539,86 +547,103 @@ public class BrokerService implements Service {
}
}
private void startPersistenceAdapter() throws Exception {
new Thread() {
@Override
public void run() {
try {
getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
getPersistenceAdapter().setBrokerName(getBrokerName());
LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
if (deleteAllMessagesOnStartup) {
deleteAllMessages();
}
getPersistenceAdapter().start();
} catch (Exception e) {
startException = e;
} finally {
synchronized (persistenceAdapterLock) {
persistenceAdapterLock.notifyAll();
private void startPersistenceAdapter(boolean async) throws Exception {
if (async) {
new Thread("Persistence Adapter Starting Thread") {
@Override
public void run() {
try {
doStartPersistenceAdapter();
} catch (Throwable e) {
startException = e;
} finally {
synchronized (persistenceAdapterLock) {
persistenceAdapterLock.notifyAll();
}
}
}
}
}.start();
}.start();
} else {
doStartPersistenceAdapter();
}
}
private void startBroker() throws Exception {
new Thread() {
@Override
public void run() {
try {
synchronized (persistenceAdapterLock) {
persistenceAdapterLock.wait();
}
if (startException != null) {
return;
}
slave = false;
startDestinations();
addShutdownHook();
getBroker().start();
if (isUseJmx()) {
if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
// try to restart management context
// typical for slaves that use the same ports as master
managementContext.stop();
startManagementContext();
private void doStartPersistenceAdapter() throws Exception {
getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
getPersistenceAdapter().setBrokerName(getBrokerName());
LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
if (deleteAllMessagesOnStartup) {
deleteAllMessages();
}
getPersistenceAdapter().start();
}
private void startBroker(boolean async) throws Exception {
if (async) {
new Thread("Broker Starting Thread") {
@Override
public void run() {
try {
synchronized (persistenceAdapterLock) {
persistenceAdapterLock.wait();
}
ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
managedBroker.setContextBroker(broker);
adminView.setBroker(managedBroker);
doStartBroker();
} catch (Throwable t) {
startException = t;
}
BrokerRegistry.getInstance().bind(getBrokerName(), BrokerService.this);
// see if there is a MasterBroker service and if so, configure
// it and start it.
for (Service service : services) {
if (service instanceof MasterConnector) {
configureService(service);
service.start();
}
}
if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
startAllConnectors();
}
if (!stopped.get()) {
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
}
if (brokerId == null) {
brokerId = broker.getBrokerId();
}
if (ioExceptionHandler == null) {
setIoExceptionHandler(new DefaultIOExceptionHandler());
}
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
getBroker().brokerServiceStarted();
checkSystemUsageLimits();
} catch (Exception e) {
startException = e;
}
}.start();
} else {
doStartBroker();
}
}
private void doStartBroker() throws Exception {
if (startException != null) {
return;
}
slave = false;
startDestinations();
addShutdownHook();
getBroker().start();
if (isUseJmx()) {
if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
// try to restart management context
// typical for slaves that use the same ports as master
managementContext.stop();
startManagementContext();
}
}.start();
ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
managedBroker.setContextBroker(broker);
adminView.setBroker(managedBroker);
}
BrokerRegistry.getInstance().bind(getBrokerName(), BrokerService.this);
// see if there is a MasterBroker service and if so, configure
// it and start it.
for (Service service : services) {
if (service instanceof MasterConnector) {
configureService(service);
service.start();
}
}
if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
startAllConnectors();
}
if (!stopped.get()) {
if (isUseJmx() && masterConnector != null) {
registerFTConnectorMBean(masterConnector);
}
}
if (brokerId == null) {
brokerId = broker.getBrokerId();
}
if (ioExceptionHandler == null) {
setIoExceptionHandler(new DefaultIOExceptionHandler());
}
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
getBroker().brokerServiceStarted();
checkSystemUsageLimits();
}
/**
@ -2762,7 +2787,7 @@ public class BrokerService implements Service {
getVirtualTopicConsumerDestinationFilter().matches(destination);
}
public Exception getStartException() {
public Throwable getStartException() {
return startException;
}
}

View File

@ -57,7 +57,7 @@ public class XBeanBrokerService extends BrokerService {
public void afterPropertiesSet() throws Exception {
ensureSystemUsageHasStore();
if (shouldAutostart()) {
start();
startAsync();
}
}