https://issues.apache.org/jira/browse/AMQ-6086 - start exception can prevent stop from exiting fully, need to be more selective in creation on stop.

(cherry picked from commit 35df815fb8)
This commit is contained in:
gtully 2015-12-14 15:39:37 +00:00 committed by Christopher L. Shannon (cshannon)
parent f3aedc753c
commit 2ffd7498a8
2 changed files with 77 additions and 11 deletions

View File

@ -785,9 +785,7 @@ public class BrokerService implements Service {
return;
}
if (started.get()) {
setStartException(new BrokerStoppedException("Stop invoked"));
}
MDC.put("activemq.broker", brokerName);
if (systemExitOnShutdown) {
@ -836,7 +834,7 @@ public class BrokerService implements Service {
stopper.stop(getPersistenceAdapter());
persistenceAdapter = null;
if (isUseJmx()) {
stopper.stop(getManagementContext());
stopper.stop(managementContext);
managementContext = null;
}
// Clear SelectorParser cache to free memory
@ -1229,8 +1227,7 @@ public class BrokerService implements Service {
}
public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException {
if (persistenceAdapter == null) {
checkStartException();
if (persistenceAdapter == null && !hasStartException()) {
persistenceAdapter = createPersistenceAdapter();
configureService(persistenceAdapter);
this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
@ -1330,6 +1327,10 @@ public class BrokerService implements Service {
}
}
synchronized private boolean hasStartException() {
return startException != null;
}
synchronized private void setStartException(Throwable t) {
startException = t;
}

View File

@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
@ -51,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -61,7 +64,11 @@ public class StartAndConcurrentStopBrokerTest {
@Test(timeout = 30000)
public void testConcurrentStop() throws Exception {
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
final CountDownLatch gotBrokerMbean = new CountDownLatch(1);
final CountDownLatch gotPaMBean = new CountDownLatch(1);
final AtomicBoolean checkPaMBean = new AtomicBoolean(false);
final HashMap mbeans = new HashMap();
final MBeanServer mBeanServer = new MBeanServer() {
@Override
@ -87,7 +94,7 @@ public class StartAndConcurrentStopBrokerTest {
@Override
public ObjectInstance registerMBean(Object object, ObjectName name) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
if (mbeans.containsKey(name)) {
throw new InstanceAlreadyExistsException("Got one already");
throw new InstanceAlreadyExistsException("Got one already: " + name);
}
LOG.info("register:" + name);
@ -95,8 +102,16 @@ public class StartAndConcurrentStopBrokerTest {
if (name.compareTo(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost")) == 0) {
gotBrokerMbean.countDown();
}
if (checkPaMBean.get()) {
if (new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,service=PersistenceAdapter,instanceName=*").apply(name)) {
gotPaMBean.countDown();
}
}
} catch (Exception e) {
e.printStackTrace();
error.set(e);
}
mbeans.put(name, object);
return new ObjectInstance(name, object.getClass().getName());
@ -261,6 +276,7 @@ public class StartAndConcurrentStopBrokerTest {
final BrokerService broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute(new Runnable() {
@ -272,6 +288,7 @@ public class StartAndConcurrentStopBrokerTest {
} catch (BrokerStoppedException expected) {
} catch (Exception e) {
e.printStackTrace();
error.set(e);
}
}
});
@ -285,6 +302,7 @@ public class StartAndConcurrentStopBrokerTest {
broker.stop();
} catch (Exception e) {
e.printStackTrace();
error.set(e);
}
}
});
@ -292,10 +310,57 @@ public class StartAndConcurrentStopBrokerTest {
executor.shutdown();
assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS));
BrokerService second = new BrokerService();
second.getManagementContext().setMBeanServer(mBeanServer);
second.start();
second.stop();
BrokerService sanityBroker = new BrokerService();
sanityBroker.getManagementContext().setMBeanServer(mBeanServer);
sanityBroker.start();
sanityBroker.stop();
assertNull("No error", error.get());
// again, after Persistence adapter mbean
final BrokerService brokerTwo = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
checkPaMBean.set(true);
executor = Executors.newFixedThreadPool(4);
executor.execute(new Runnable() {
@Override
public void run() {
try {
brokerTwo.getManagementContext().setMBeanServer(mBeanServer);
brokerTwo.start();
} catch (BrokerStoppedException expected) {
} catch (Exception e) {
e.printStackTrace();
error.set(e);
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
assertTrue("broker has registered persistence adapter mbean", gotPaMBean.await(10, TimeUnit.SECONDS));
brokerTwo.stop();
} catch (Exception e) {
e.printStackTrace();
error.set(e);
}
}
});
executor.shutdown();
assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS));
assertTrue("broker has registered persistence adapter mbean", gotPaMBean.await(0, TimeUnit.SECONDS));
sanityBroker = new BrokerService();
sanityBroker.getManagementContext().setMBeanServer(mBeanServer);
sanityBroker.start();
sanityBroker.stop();
assertNull("No error", error.get());
}