https://issues.apache.org/jira/browse/AMQ-6086 - add some determinism to interleaved stop and start calls on broker service

This commit is contained in:
gtully 2015-12-11 16:22:31 +00:00
parent 93092f7ea0
commit da076f4a63
4 changed files with 333 additions and 12 deletions

View File

@ -587,6 +587,7 @@ public class BrokerService implements Service {
return;
}
setStartException(null);
stopping.set(false);
startDate = new Date();
MDC.put("activemq.broker", brokerName);
@ -642,7 +643,7 @@ public class BrokerService implements Service {
try {
doStartPersistenceAdapter();
} catch (Throwable e) {
startException = e;
setStartException(e);
} finally {
synchronized (persistenceAdapterStarted) {
persistenceAdapterStarted.set(true);
@ -704,7 +705,7 @@ public class BrokerService implements Service {
}
doStartBroker();
} catch (Throwable t) {
startException = t;
setStartException(t);
}
}
}.start();
@ -714,9 +715,7 @@ public class BrokerService implements Service {
}
private void doStartBroker() throws Exception {
if (startException != null) {
return;
}
checkStartException();
startDestinations();
addShutdownHook();
@ -786,6 +785,9 @@ public class BrokerService implements Service {
return;
}
if (started.get()) {
setStartException(new BrokerStoppedException("Stop invoked"));
}
MDC.put("activemq.broker", brokerName);
if (systemExitOnShutdown) {
@ -831,7 +833,7 @@ public class BrokerService implements Service {
tempDataStore = null;
}
try {
stopper.stop(persistenceAdapter);
stopper.stop(getPersistenceAdapter());
persistenceAdapter = null;
if (isUseJmx()) {
stopper.stop(getManagementContext());
@ -989,7 +991,7 @@ public class BrokerService implements Service {
long expiration = Math.max(0, timeout + System.currentTimeMillis());
while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
try {
if (startException != null) {
if (getStartException() != null) {
return waitSucceeded;
}
waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
@ -1006,6 +1008,7 @@ public class BrokerService implements Service {
*/
public Broker getBroker() throws Exception {
if (broker == null) {
checkStartException();
broker = createBroker();
}
return broker;
@ -1225,8 +1228,9 @@ public class BrokerService implements Service {
addService(this.producerSystemUsage);
}
public PersistenceAdapter getPersistenceAdapter() throws IOException {
public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException {
if (persistenceAdapter == null) {
checkStartException();
persistenceAdapter = createPersistenceAdapter();
configureService(persistenceAdapter);
this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
@ -1314,11 +1318,22 @@ public class BrokerService implements Service {
public ManagementContext getManagementContext() {
if (managementContext == null) {
checkStartException();
managementContext = new ManagementContext();
}
return managementContext;
}
synchronized private void checkStartException() {
if (startException != null) {
throw new BrokerStoppedException(startException);
}
}
synchronized private void setStartException(Throwable t) {
startException = t;
}
public void setManagementContext(ManagementContext managementContext) {
this.managementContext = managementContext;
}
@ -2688,6 +2703,7 @@ public class BrokerService implements Service {
}
protected void startVirtualConsumerDestinations() throws Exception {
checkStartException();
ConnectionContext adminConnectionContext = getAdminConnectionContext();
Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
@ -3063,7 +3079,7 @@ public class BrokerService implements Service {
getVirtualTopicConsumerDestinationFilter().matches(destination);
}
public Throwable getStartException() {
synchronized public Throwable getStartException() {
return startException;
}

View File

@ -111,7 +111,7 @@ public class LockFile {
/**
*/
public void unlock() {
synchronized public void unlock() {
if (DISABLE_FILE_LOCK) {
return;
}

View File

@ -23,10 +23,12 @@ import java.net.MalformedURLException;
import java.util.List;
import java.util.Set;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.util.TestUtils;
import org.junit.Before;
import org.junit.Test;
@ -40,6 +42,7 @@ public class DuplexNetworkMBeanTest {
private int primaryBrokerPort;
private int secondaryBrokerPort;
private MBeanServer mBeanServer = new ManagementContext().getMBeanServer();
@Before
public void setUp() throws Exception {
@ -155,7 +158,7 @@ public class DuplexNetworkMBeanTest {
}
LOG.info("Query name: " + beanName);
mbeans = broker.getManagementContext().queryNames(beanName, null);
mbeans = mBeanServer.queryNames(beanName, null);
if (mbeans != null) {
count = mbeans.size();
} else {
@ -175,7 +178,7 @@ public class DuplexNetworkMBeanTest {
private void logAllMbeans(BrokerService broker) throws MalformedURLException {
try {
// trace all existing MBeans
Set<?> all = broker.getManagementContext().queryNames(null, null);
Set<?> all = mBeanServer.queryNames(null, null);
LOG.info("Total MBean count=" + all.size());
for (Object o : all) {
ObjectInstance bean = (ObjectInstance)o;

View File

@ -0,0 +1,302 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.ObjectInputStream;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.InvalidAttributeValueException;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanInfo;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.NotCompliantMBeanException;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.OperationsException;
import javax.management.QueryExp;
import javax.management.ReflectionException;
import javax.management.loading.ClassLoaderRepository;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerStoppedException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
public class StartAndConcurrentStopBrokerTest {
private static final Logger LOG = LoggerFactory.getLogger(StartAndConcurrentStopBrokerTest.class);
@Test(timeout = 30000)
public void testConcurrentStop() throws Exception {
final CountDownLatch gotBrokerMbean = new CountDownLatch(1);
final HashMap mbeans = new HashMap();
final MBeanServer mBeanServer = new MBeanServer() {
@Override
public ObjectInstance createMBean(String className, ObjectName name) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException {
return null;
}
@Override
public ObjectInstance createMBean(String className, ObjectName name, ObjectName loaderName) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException {
return null;
}
@Override
public ObjectInstance createMBean(String className, ObjectName name, Object[] params, String[] signature) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException {
return null;
}
@Override
public ObjectInstance createMBean(String className, ObjectName name, ObjectName loaderName, Object[] params, String[] signature) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException {
return null;
}
@Override
public ObjectInstance registerMBean(Object object, ObjectName name) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
if (mbeans.containsKey(name)) {
throw new InstanceAlreadyExistsException("Got one already");
}
LOG.info("register:" + name);
try {
if (name.compareTo(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost")) == 0) {
gotBrokerMbean.countDown();
}
} catch (Exception e) {
e.printStackTrace();
}
mbeans.put(name, object);
return new ObjectInstance(name, object.getClass().getName());
}
@Override
public void unregisterMBean(ObjectName name) throws InstanceNotFoundException, MBeanRegistrationException {
mbeans.remove(name);
}
@Override
public ObjectInstance getObjectInstance(ObjectName name) throws InstanceNotFoundException {
return null;
}
@Override
public Set<ObjectInstance> queryMBeans(ObjectName name, QueryExp query) {
return null;
}
@Override
public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {
return null;
}
@Override
public boolean isRegistered(ObjectName name) {
return mbeans.containsKey(name);
}
@Override
public Integer getMBeanCount() {
return null;
}
@Override
public Object getAttribute(ObjectName name, String attribute) throws MBeanException, AttributeNotFoundException, InstanceNotFoundException, ReflectionException {
return null;
}
@Override
public AttributeList getAttributes(ObjectName name, String[] attributes) throws InstanceNotFoundException, ReflectionException {
return null;
}
@Override
public void setAttribute(ObjectName name, Attribute attribute) throws InstanceNotFoundException, AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException {
}
@Override
public AttributeList setAttributes(ObjectName name, AttributeList attributes) throws InstanceNotFoundException, ReflectionException {
return null;
}
@Override
public Object invoke(ObjectName name, String operationName, Object[] params, String[] signature) throws InstanceNotFoundException, MBeanException, ReflectionException {
return null;
}
@Override
public String getDefaultDomain() {
return null;
}
@Override
public String[] getDomains() {
return new String[0];
}
@Override
public void addNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException {
}
@Override
public void addNotificationListener(ObjectName name, ObjectName listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException {
}
@Override
public void removeNotificationListener(ObjectName name, ObjectName listener) throws InstanceNotFoundException, ListenerNotFoundException {
}
@Override
public void removeNotificationListener(ObjectName name, ObjectName listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException, ListenerNotFoundException {
}
@Override
public void removeNotificationListener(ObjectName name, NotificationListener listener) throws InstanceNotFoundException, ListenerNotFoundException {
}
@Override
public void removeNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException, ListenerNotFoundException {
}
@Override
public MBeanInfo getMBeanInfo(ObjectName name) throws InstanceNotFoundException, IntrospectionException, ReflectionException {
return null;
}
@Override
public boolean isInstanceOf(ObjectName name, String className) throws InstanceNotFoundException {
return false;
}
@Override
public Object instantiate(String className) throws ReflectionException, MBeanException {
return null;
}
@Override
public Object instantiate(String className, ObjectName loaderName) throws ReflectionException, MBeanException, InstanceNotFoundException {
return null;
}
@Override
public Object instantiate(String className, Object[] params, String[] signature) throws ReflectionException, MBeanException {
return null;
}
@Override
public Object instantiate(String className, ObjectName loaderName, Object[] params, String[] signature) throws ReflectionException, MBeanException, InstanceNotFoundException {
return null;
}
@Override
public ObjectInputStream deserialize(ObjectName name, byte[] data) throws InstanceNotFoundException, OperationsException {
return null;
}
@Override
public ObjectInputStream deserialize(String className, byte[] data) throws OperationsException, ReflectionException {
return null;
}
@Override
public ObjectInputStream deserialize(String className, ObjectName loaderName, byte[] data) throws InstanceNotFoundException, OperationsException, ReflectionException {
return null;
}
@Override
public ClassLoader getClassLoaderFor(ObjectName mbeanName) throws InstanceNotFoundException {
return null;
}
@Override
public ClassLoader getClassLoader(ObjectName loaderName) throws InstanceNotFoundException {
return null;
}
@Override
public ClassLoaderRepository getClassLoaderRepository() {
return null;
}
};
final BrokerService broker = new BrokerService();
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute(new Runnable() {
@Override
public void run() {
try {
broker.getManagementContext().setMBeanServer(mBeanServer);
broker.start();
} catch (BrokerStoppedException expected) {
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
assertTrue("broker has registered mbean", gotBrokerMbean.await(10, TimeUnit.SECONDS));
broker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.shutdown();
assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS));
BrokerService second = new BrokerService();
second.getManagementContext().setMBeanServer(mBeanServer);
second.start();
second.stop();
}
}