Implements AMQ-4526: ActiveMQ should automatically restart if a Locker looses it's lock.

* Adds a new broker config option 'restartAllowed'.  Set it to false to revert to the preserve behavior.
* Adds a new 'restart' JMX operation on the broker
* The default IO exception handler will trigger a broker restart instead of a broker stop.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1480325 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-08 15:34:00 +00:00
parent b5f6bc3c90
commit 4a16c1ff27
6 changed files with 89 additions and 109 deletions

View File

@ -236,6 +236,9 @@ public class BrokerService implements Service {
private Date startDate;
private boolean slave = true;
private boolean restartAllowed = true;
private boolean restartRequested = false;
static {
try {
@ -2846,4 +2849,35 @@ public class BrokerService implements Service {
public boolean isStopping() {
return this.stopping.get();
}
/**
* @return true if the broker allowed to restart on shutdown.
*/
public boolean isRestartAllowed() {
return restartAllowed;
}
/**
* Sets if the broker allowed to restart on shutdown.
* @return
*/
public void setRestartAllowed(boolean restartAllowed) {
this.restartAllowed = restartAllowed;
}
/**
* A lifecycle manager of the BrokerService should
* inspect this property after a broker shutdown has occurred
* to find out if the broker needs to be re-created and started
* again.
*
* @return true if the broker wants to be restarted after it shuts down.
*/
public boolean isRestartRequested() {
return restartRequested;
}
public void requestRestart() {
this.restartRequested = true;
}
}

View File

@ -133,6 +133,9 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L
// we can no longer keep the lock so lets fail
LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
try {
if( brokerService.isRestartAllowed() ) {
brokerService.requestRestart();
}
brokerService.stop();
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker");

View File

@ -108,6 +108,12 @@ public class BrokerView implements BrokerViewMBean {
brokerService.stop();
}
@Override
public void restart() throws Exception {
brokerService.requestRestart();
brokerService.stop();
}
@Override
public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
throws Exception {

View File

@ -140,6 +140,13 @@ public interface BrokerViewMBean extends Service {
*/
@MBeanInfo("Stop the broker and all its components.")
void stop() throws Exception;
/**
* Restart the broker and all it's components.
*/
@MBeanInfo("Restart the broker and all its components.")
void restart() throws Exception;
@MBeanInfo("Poll for queues matching queueName are empty before stopping")
void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception;

View File

@ -128,6 +128,9 @@ import org.slf4j.LoggerFactory;
new Thread("Stopping the broker due to IO exception") {
public void run() {
try {
if( broker.isRestartAllowed() ) {
broker.requestRestart();
}
broker.stop();
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker", e);

View File

@ -18,11 +18,8 @@
package org.apache.activemq.console.command;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
@ -58,9 +55,6 @@ public class StartCommand extends AbstractCommand {
""
};
private URI configURI;
private List<BrokerService> brokers = new ArrayList<BrokerService>(5);
@Override
public String getName() {
return "start";
@ -77,124 +71,57 @@ public class StartCommand extends AbstractCommand {
* @param brokerURIs
*/
protected void runTask(List<String> brokerURIs) throws Exception {
try {
// If no config uri, use default setting
if (brokerURIs.isEmpty()) {
setConfigUri(new URI(DEFAULT_CONFIG_URI));
startBroker(getConfigUri());
URI configURI;
// Set configuration data, if available, which in this case
// would be the config URI
} else {
String strConfigURI;
while (!brokerURIs.isEmpty()) {
strConfigURI = (String)brokerURIs.remove(0);
try {
setConfigUri(new URI(strConfigURI));
} catch (URISyntaxException e) {
context.printException(e);
return;
}
startBroker(getConfigUri());
while( true ) {
final BrokerService broker;
try {
// If no config uri, use default setting
if (brokerURIs.isEmpty()) {
configURI = new URI(DEFAULT_CONFIG_URI);
} else {
configURI = new URI(brokerURIs.get(0));
}
System.out.println("Loading message broker from: " + configURI);
broker = BrokerFactory.createBroker(configURI);
broker.start();
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e));
throw e;
}
// Prevent the main thread from exiting unless it is terminated
// elsewhere
} catch (Exception e) {
context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e));
throw new Exception(e);
}
// The broker start up fine. If this unblocks it's cause they were stopped
// and this would occur because of an internal error (like the DB going offline)
waitForShutdown();
}
if (!broker.waitUntilStarted()) {
throw new Exception(broker.getStartException());
}
/**
* Create and run a broker specified by the given configuration URI
*
* @param configURI
* @throws Exception
*/
public void startBroker(URI configURI) throws Exception {
System.out.println("Loading message broker from: " + configURI);
BrokerService broker = BrokerFactory.createBroker(configURI);
brokers.add(broker);
broker.start();
if (!broker.waitUntilStarted()) {
throw new Exception(broker.getStartException());
}
}
/**
* Wait for a shutdown invocation elsewhere
*
* @throws Exception
*/
protected void waitForShutdown() throws Exception {
final boolean[] shutdown = new boolean[] {
false
};
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
for (Iterator<BrokerService> i = brokers.iterator(); i.hasNext();) {
// The broker started up fine. Now lets wait for it to stop...
final CountDownLatch shutdownLatch = new CountDownLatch(1);
final Thread jvmShutdownHook = new Thread() {
public void run() {
try {
BrokerService broker = i.next();
broker.stop();
} catch (Exception e) {
}
}
}
});
final AtomicInteger brokerCounter = new AtomicInteger(brokers.size());
for (BrokerService bs : brokers) {
bs.addShutdownHook(new Runnable() {
};
Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
broker.addShutdownHook(new Runnable() {
public void run() {
// When the last broker lets us know he is closed....
if( brokerCounter.decrementAndGet() == 0 ) {
synchronized (shutdown) {
shutdown[0] = true;
shutdown.notify();
}
}
shutdownLatch.countDown();
}
});
}
// Wait for any shutdown event
synchronized (shutdown) {
while (!shutdown[0]) {
try {
shutdown.wait();
} catch (InterruptedException e) {
}
// The broker has stopped..
shutdownLatch.await();
Runtime.getRuntime().removeShutdownHook(jvmShutdownHook);
if( !broker.isRestartRequested() ) {
break;
}
System.out.println("Restarting broker");
}
}
/**
* Sets the current configuration URI used by the start task
*
* @param uri
*/
public void setConfigUri(URI uri) {
configURI = uri;
}
/**
* Gets the current configuration URI used by the start task
*
* @return current configuration URI
*/
public URI getConfigUri() {
return configURI;
}
/**