From 2ffd7498a835be85b08e5ca9966a26a2d4ac0cd3 Mon Sep 17 00:00:00 2001 From: gtully Date: Mon, 14 Dec 2015 15:39:37 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6086 - start exception can prevent stop from exiting fully, need to be more selective in creation on stop. (cherry picked from commit 35df815fb86d201e63b4f0f2d53bee3bae5c0752) --- .../apache/activemq/broker/BrokerService.java | 13 ++-- .../StartAndConcurrentStopBrokerTest.java | 75 +++++++++++++++++-- 2 files changed, 77 insertions(+), 11 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 62af182c40..d028078054 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -785,9 +785,7 @@ public class BrokerService implements Service { return; } - if (started.get()) { - setStartException(new BrokerStoppedException("Stop invoked")); - } + setStartException(new BrokerStoppedException("Stop invoked")); MDC.put("activemq.broker", brokerName); if (systemExitOnShutdown) { @@ -836,7 +834,7 @@ public class BrokerService implements Service { stopper.stop(getPersistenceAdapter()); persistenceAdapter = null; if (isUseJmx()) { - stopper.stop(getManagementContext()); + stopper.stop(managementContext); managementContext = null; } // Clear SelectorParser cache to free memory @@ -1229,8 +1227,7 @@ public class BrokerService implements Service { } public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException { - if (persistenceAdapter == null) { - checkStartException(); + if (persistenceAdapter == null && !hasStartException()) { persistenceAdapter = createPersistenceAdapter(); configureService(persistenceAdapter); this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); @@ -1330,6 +1327,10 @@ public class BrokerService implements Service { } } + synchronized private boolean hasStartException() { + return startException != null; + } + synchronized private void setStartException(Throwable t) { startException = t; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java index b2ad1cdd67..4d194caeb9 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java @@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.management.Attribute; import javax.management.AttributeList; import javax.management.AttributeNotFoundException; @@ -51,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -61,7 +64,11 @@ public class StartAndConcurrentStopBrokerTest { @Test(timeout = 30000) public void testConcurrentStop() throws Exception { + final AtomicReference error = new AtomicReference(); final CountDownLatch gotBrokerMbean = new CountDownLatch(1); + final CountDownLatch gotPaMBean = new CountDownLatch(1); + final AtomicBoolean checkPaMBean = new AtomicBoolean(false); + final HashMap mbeans = new HashMap(); final MBeanServer mBeanServer = new MBeanServer() { @Override @@ -87,7 +94,7 @@ public class StartAndConcurrentStopBrokerTest { @Override public ObjectInstance registerMBean(Object object, ObjectName name) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { if (mbeans.containsKey(name)) { - throw new InstanceAlreadyExistsException("Got one already"); + throw new InstanceAlreadyExistsException("Got one already: " + name); } LOG.info("register:" + name); @@ -95,8 +102,16 @@ public class StartAndConcurrentStopBrokerTest { if (name.compareTo(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost")) == 0) { gotBrokerMbean.countDown(); } + + if (checkPaMBean.get()) { + if (new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,service=PersistenceAdapter,instanceName=*").apply(name)) { + gotPaMBean.countDown(); + } + } + } catch (Exception e) { e.printStackTrace(); + error.set(e); } mbeans.put(name, object); return new ObjectInstance(name, object.getClass().getName()); @@ -261,6 +276,7 @@ public class StartAndConcurrentStopBrokerTest { final BrokerService broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); ExecutorService executor = Executors.newFixedThreadPool(4); executor.execute(new Runnable() { @@ -272,6 +288,7 @@ public class StartAndConcurrentStopBrokerTest { } catch (BrokerStoppedException expected) { } catch (Exception e) { e.printStackTrace(); + error.set(e); } } }); @@ -285,6 +302,7 @@ public class StartAndConcurrentStopBrokerTest { broker.stop(); } catch (Exception e) { e.printStackTrace(); + error.set(e); } } }); @@ -292,10 +310,57 @@ public class StartAndConcurrentStopBrokerTest { executor.shutdown(); assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS)); - BrokerService second = new BrokerService(); - second.getManagementContext().setMBeanServer(mBeanServer); - second.start(); - second.stop(); + BrokerService sanityBroker = new BrokerService(); + sanityBroker.getManagementContext().setMBeanServer(mBeanServer); + sanityBroker.start(); + sanityBroker.stop(); + + assertNull("No error", error.get()); + + // again, after Persistence adapter mbean + final BrokerService brokerTwo = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + + checkPaMBean.set(true); + executor = Executors.newFixedThreadPool(4); + executor.execute(new Runnable() { + @Override + public void run() { + try { + brokerTwo.getManagementContext().setMBeanServer(mBeanServer); + brokerTwo.start(); + } catch (BrokerStoppedException expected) { + } catch (Exception e) { + e.printStackTrace(); + error.set(e); + } + } + }); + + executor.execute(new Runnable() { + @Override + public void run() { + try { + assertTrue("broker has registered persistence adapter mbean", gotPaMBean.await(10, TimeUnit.SECONDS)); + brokerTwo.stop(); + } catch (Exception e) { + e.printStackTrace(); + error.set(e); + } + } + }); + + executor.shutdown(); + assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS)); + + assertTrue("broker has registered persistence adapter mbean", gotPaMBean.await(0, TimeUnit.SECONDS)); + + sanityBroker = new BrokerService(); + sanityBroker.getManagementContext().setMBeanServer(mBeanServer); + sanityBroker.start(); + sanityBroker.stop(); + + assertNull("No error", error.get()); }