From 4a16c1ff2769d042c90f204a71382d5817412a8a Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Wed, 8 May 2013 15:34:00 +0000 Subject: [PATCH] Implements AMQ-4526: ActiveMQ should automatically restart if a Locker looses it's lock. * Adds a new broker config option 'restartAllowed'. Set it to false to revert to the preserve behavior. * Adds a new 'restart' JMX operation on the broker * The default IO exception handler will trigger a broker restart instead of a broker stop. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1480325 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 34 ++++ .../broker/LockableServiceSupport.java | 3 + .../activemq/broker/jmx/BrokerView.java | 6 + .../activemq/broker/jmx/BrokerViewMBean.java | 7 + .../util/DefaultIOExceptionHandler.java | 3 + .../console/command/StartCommand.java | 145 +++++------------- 6 files changed, 89 insertions(+), 109 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index f1763a899d..98e67665b1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -236,6 +236,9 @@ public class BrokerService implements Service { private Date startDate; private boolean slave = true; + private boolean restartAllowed = true; + private boolean restartRequested = false; + static { try { @@ -2846,4 +2849,35 @@ public class BrokerService implements Service { public boolean isStopping() { return this.stopping.get(); } + + /** + * @return true if the broker allowed to restart on shutdown. + */ + public boolean isRestartAllowed() { + return restartAllowed; + } + + /** + * Sets if the broker allowed to restart on shutdown. + * @return + */ + public void setRestartAllowed(boolean restartAllowed) { + this.restartAllowed = restartAllowed; + } + + /** + * A lifecycle manager of the BrokerService should + * inspect this property after a broker shutdown has occurred + * to find out if the broker needs to be re-created and started + * again. + * + * @return true if the broker wants to be restarted after it shuts down. + */ + public boolean isRestartRequested() { + return restartRequested; + } + + public void requestRestart() { + this.restartRequested = true; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java index faa9e9f082..050264cc44 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java @@ -133,6 +133,9 @@ public abstract class LockableServiceSupport extends ServiceSupport implements L // we can no longer keep the lock so lets fail LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master"); try { + if( brokerService.isRestartAllowed() ) { + brokerService.requestRestart(); + } brokerService.stop(); } catch (Exception e) { LOG.warn("Failure occurred while stopping broker"); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index e969845ba3..4605a3ef01 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -108,6 +108,12 @@ public class BrokerView implements BrokerViewMBean { brokerService.stop(); } + @Override + public void restart() throws Exception { + brokerService.requestRestart(); + brokerService.stop(); + } + @Override public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index 3d50ed41a3..2c1af7d3fd 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -140,6 +140,13 @@ public interface BrokerViewMBean extends Service { */ @MBeanInfo("Stop the broker and all its components.") void stop() throws Exception; + + /** + * Restart the broker and all it's components. + */ + @MBeanInfo("Restart the broker and all its components.") + void restart() throws Exception; + @MBeanInfo("Poll for queues matching queueName are empty before stopping") void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception; diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java index 6db7c65d7c..0cb64e9d33 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java @@ -128,6 +128,9 @@ import org.slf4j.LoggerFactory; new Thread("Stopping the broker due to IO exception") { public void run() { try { + if( broker.isRestartAllowed() ) { + broker.requestRestart(); + } broker.stop(); } catch (Exception e) { LOG.warn("Failure occurred while stopping broker", e); diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java index 0fe42e624f..e2b5768329 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/StartCommand.java @@ -18,11 +18,8 @@ package org.apache.activemq.console.command; import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CountDownLatch; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; @@ -58,9 +55,6 @@ public class StartCommand extends AbstractCommand { "" }; - private URI configURI; - private List brokers = new ArrayList(5); - @Override public String getName() { return "start"; @@ -77,124 +71,57 @@ public class StartCommand extends AbstractCommand { * @param brokerURIs */ protected void runTask(List brokerURIs) throws Exception { - try { - // If no config uri, use default setting - if (brokerURIs.isEmpty()) { - setConfigUri(new URI(DEFAULT_CONFIG_URI)); - startBroker(getConfigUri()); + URI configURI; - // Set configuration data, if available, which in this case - // would be the config URI - } else { - String strConfigURI; - - while (!brokerURIs.isEmpty()) { - strConfigURI = (String)brokerURIs.remove(0); - - try { - setConfigUri(new URI(strConfigURI)); - } catch (URISyntaxException e) { - context.printException(e); - return; - } - - startBroker(getConfigUri()); + while( true ) { + final BrokerService broker; + try { + // If no config uri, use default setting + if (brokerURIs.isEmpty()) { + configURI = new URI(DEFAULT_CONFIG_URI); + } else { + configURI = new URI(brokerURIs.get(0)); } + + System.out.println("Loading message broker from: " + configURI); + broker = BrokerFactory.createBroker(configURI); + broker.start(); + + } catch (Exception e) { + context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e)); + throw e; } - // Prevent the main thread from exiting unless it is terminated - // elsewhere - } catch (Exception e) { - context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e)); - throw new Exception(e); - } - - // The broker start up fine. If this unblocks it's cause they were stopped - // and this would occur because of an internal error (like the DB going offline) - waitForShutdown(); - } + if (!broker.waitUntilStarted()) { + throw new Exception(broker.getStartException()); + } - /** - * Create and run a broker specified by the given configuration URI - * - * @param configURI - * @throws Exception - */ - public void startBroker(URI configURI) throws Exception { - System.out.println("Loading message broker from: " + configURI); - BrokerService broker = BrokerFactory.createBroker(configURI); - brokers.add(broker); - broker.start(); - if (!broker.waitUntilStarted()) { - throw new Exception(broker.getStartException()); - } - } - - /** - * Wait for a shutdown invocation elsewhere - * - * @throws Exception - */ - protected void waitForShutdown() throws Exception { - final boolean[] shutdown = new boolean[] { - false - }; - - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - for (Iterator i = brokers.iterator(); i.hasNext();) { + // The broker started up fine. Now lets wait for it to stop... + final CountDownLatch shutdownLatch = new CountDownLatch(1); + final Thread jvmShutdownHook = new Thread() { + public void run() { try { - BrokerService broker = i.next(); broker.stop(); } catch (Exception e) { } } - } - }); - - final AtomicInteger brokerCounter = new AtomicInteger(brokers.size()); - for (BrokerService bs : brokers) { - bs.addShutdownHook(new Runnable() { + }; + + Runtime.getRuntime().addShutdownHook(jvmShutdownHook); + broker.addShutdownHook(new Runnable() { public void run() { - // When the last broker lets us know he is closed.... - if( brokerCounter.decrementAndGet() == 0 ) { - synchronized (shutdown) { - shutdown[0] = true; - shutdown.notify(); - } - } + shutdownLatch.countDown(); } }); - } - // Wait for any shutdown event - synchronized (shutdown) { - while (!shutdown[0]) { - try { - shutdown.wait(); - } catch (InterruptedException e) { - } + // The broker has stopped.. + shutdownLatch.await(); + Runtime.getRuntime().removeShutdownHook(jvmShutdownHook); + if( !broker.isRestartRequested() ) { + break; } + System.out.println("Restarting broker"); } - - } - - /** - * Sets the current configuration URI used by the start task - * - * @param uri - */ - public void setConfigUri(URI uri) { - configURI = uri; - } - - /** - * Gets the current configuration URI used by the start task - * - * @return current configuration URI - */ - public URI getConfigUri() { - return configURI; } /**