diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index a89949085c..62af182c40 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -587,6 +587,7 @@ public class BrokerService implements Service { return; } + setStartException(null); stopping.set(false); startDate = new Date(); MDC.put("activemq.broker", brokerName); @@ -642,7 +643,7 @@ public class BrokerService implements Service { try { doStartPersistenceAdapter(); } catch (Throwable e) { - startException = e; + setStartException(e); } finally { synchronized (persistenceAdapterStarted) { persistenceAdapterStarted.set(true); @@ -704,7 +705,7 @@ public class BrokerService implements Service { } doStartBroker(); } catch (Throwable t) { - startException = t; + setStartException(t); } } }.start(); @@ -714,9 +715,7 @@ public class BrokerService implements Service { } private void doStartBroker() throws Exception { - if (startException != null) { - return; - } + checkStartException(); startDestinations(); addShutdownHook(); @@ -786,6 +785,9 @@ public class BrokerService implements Service { return; } + if (started.get()) { + setStartException(new BrokerStoppedException("Stop invoked")); + } MDC.put("activemq.broker", brokerName); if (systemExitOnShutdown) { @@ -831,7 +833,7 @@ public class BrokerService implements Service { tempDataStore = null; } try { - stopper.stop(persistenceAdapter); + stopper.stop(getPersistenceAdapter()); persistenceAdapter = null; if (isUseJmx()) { stopper.stop(getManagementContext()); @@ -989,7 +991,7 @@ public class BrokerService implements Service { long expiration = Math.max(0, timeout + System.currentTimeMillis()); while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { try { - if (startException != null) { + if (getStartException() != null) { return waitSucceeded; } waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); @@ -1006,6 +1008,7 @@ public class BrokerService implements Service { */ public Broker getBroker() throws Exception { if (broker == null) { + checkStartException(); broker = createBroker(); } return broker; @@ -1225,8 +1228,9 @@ public class BrokerService implements Service { addService(this.producerSystemUsage); } - public PersistenceAdapter getPersistenceAdapter() throws IOException { + public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException { if (persistenceAdapter == null) { + checkStartException(); persistenceAdapter = createPersistenceAdapter(); configureService(persistenceAdapter); this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); @@ -1314,11 +1318,22 @@ public class BrokerService implements Service { public ManagementContext getManagementContext() { if (managementContext == null) { + checkStartException(); managementContext = new ManagementContext(); } return managementContext; } + synchronized private void checkStartException() { + if (startException != null) { + throw new BrokerStoppedException(startException); + } + } + + synchronized private void setStartException(Throwable t) { + startException = t; + } + public void setManagementContext(ManagementContext managementContext) { this.managementContext = managementContext; } @@ -2688,6 +2703,7 @@ public class BrokerService implements Service { } protected void startVirtualConsumerDestinations() throws Exception { + checkStartException(); ConnectionContext adminConnectionContext = getAdminConnectionContext(); Set destinations = destinationFactory.getDestinations(); DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); @@ -3063,7 +3079,7 @@ public class BrokerService implements Service { getVirtualTopicConsumerDestinationFilter().matches(destination); } - public Throwable getStartException() { + synchronized public Throwable getStartException() { return startException; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java b/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java index b454c4199a..2f89bc5b2b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java @@ -111,7 +111,7 @@ public class LockFile { /** */ - public void unlock() { + synchronized public void unlock() { if (DISABLE_FILE_LOCK) { return; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java index 13a94cbfb0..bac271fdd5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java @@ -23,10 +23,12 @@ import java.net.MalformedURLException; import java.util.List; import java.util.Set; +import javax.management.MBeanServer; import javax.management.ObjectInstance; import javax.management.ObjectName; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.util.TestUtils; import org.junit.Before; import org.junit.Test; @@ -40,6 +42,7 @@ public class DuplexNetworkMBeanTest { private int primaryBrokerPort; private int secondaryBrokerPort; + private MBeanServer mBeanServer = new ManagementContext().getMBeanServer(); @Before public void setUp() throws Exception { @@ -155,7 +158,7 @@ public class DuplexNetworkMBeanTest { } LOG.info("Query name: " + beanName); - mbeans = broker.getManagementContext().queryNames(beanName, null); + mbeans = mBeanServer.queryNames(beanName, null); if (mbeans != null) { count = mbeans.size(); } else { @@ -175,7 +178,7 @@ public class DuplexNetworkMBeanTest { private void logAllMbeans(BrokerService broker) throws MalformedURLException { try { // trace all existing MBeans - Set all = broker.getManagementContext().queryNames(null, null); + Set all = mBeanServer.queryNames(null, null); LOG.info("Total MBean count=" + all.size()); for (Object o : all) { ObjectInstance bean = (ObjectInstance)o; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java new file mode 100755 index 0000000000..b2ad1cdd67 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.io.ObjectInputStream; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.AttributeNotFoundException; +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.IntrospectionException; +import javax.management.InvalidAttributeValueException; +import javax.management.ListenerNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanInfo; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.NotCompliantMBeanException; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.OperationsException; +import javax.management.QueryExp; +import javax.management.ReflectionException; +import javax.management.loading.ClassLoaderRepository; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerStoppedException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertTrue; + + +public class StartAndConcurrentStopBrokerTest { + private static final Logger LOG = LoggerFactory.getLogger(StartAndConcurrentStopBrokerTest.class); + + + @Test(timeout = 30000) + public void testConcurrentStop() throws Exception { + + final CountDownLatch gotBrokerMbean = new CountDownLatch(1); + final HashMap mbeans = new HashMap(); + final MBeanServer mBeanServer = new MBeanServer() { + @Override + public ObjectInstance createMBean(String className, ObjectName name) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException { + return null; + } + + @Override + public ObjectInstance createMBean(String className, ObjectName name, ObjectName loaderName) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException { + return null; + } + + @Override + public ObjectInstance createMBean(String className, ObjectName name, Object[] params, String[] signature) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException { + return null; + } + + @Override + public ObjectInstance createMBean(String className, ObjectName name, ObjectName loaderName, Object[] params, String[] signature) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException { + return null; + } + + @Override + public ObjectInstance registerMBean(Object object, ObjectName name) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + if (mbeans.containsKey(name)) { + throw new InstanceAlreadyExistsException("Got one already"); + } + LOG.info("register:" + name); + + try { + if (name.compareTo(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost")) == 0) { + gotBrokerMbean.countDown(); + } + } catch (Exception e) { + e.printStackTrace(); + } + mbeans.put(name, object); + return new ObjectInstance(name, object.getClass().getName()); + + } + + @Override + public void unregisterMBean(ObjectName name) throws InstanceNotFoundException, MBeanRegistrationException { + mbeans.remove(name); + } + + @Override + public ObjectInstance getObjectInstance(ObjectName name) throws InstanceNotFoundException { + return null; + } + + @Override + public Set queryMBeans(ObjectName name, QueryExp query) { + return null; + } + + @Override + public Set queryNames(ObjectName name, QueryExp query) { + return null; + } + + @Override + public boolean isRegistered(ObjectName name) { + return mbeans.containsKey(name); + } + + @Override + public Integer getMBeanCount() { + return null; + } + + @Override + public Object getAttribute(ObjectName name, String attribute) throws MBeanException, AttributeNotFoundException, InstanceNotFoundException, ReflectionException { + return null; + } + + @Override + public AttributeList getAttributes(ObjectName name, String[] attributes) throws InstanceNotFoundException, ReflectionException { + return null; + } + + @Override + public void setAttribute(ObjectName name, Attribute attribute) throws InstanceNotFoundException, AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException { + + } + + @Override + public AttributeList setAttributes(ObjectName name, AttributeList attributes) throws InstanceNotFoundException, ReflectionException { + return null; + } + + @Override + public Object invoke(ObjectName name, String operationName, Object[] params, String[] signature) throws InstanceNotFoundException, MBeanException, ReflectionException { + return null; + } + + @Override + public String getDefaultDomain() { + return null; + } + + @Override + public String[] getDomains() { + return new String[0]; + } + + @Override + public void addNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException { + + } + + @Override + public void addNotificationListener(ObjectName name, ObjectName listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException { + + } + + @Override + public void removeNotificationListener(ObjectName name, ObjectName listener) throws InstanceNotFoundException, ListenerNotFoundException { + + } + + @Override + public void removeNotificationListener(ObjectName name, ObjectName listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException, ListenerNotFoundException { + + } + + @Override + public void removeNotificationListener(ObjectName name, NotificationListener listener) throws InstanceNotFoundException, ListenerNotFoundException { + + } + + @Override + public void removeNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException, ListenerNotFoundException { + + } + + @Override + public MBeanInfo getMBeanInfo(ObjectName name) throws InstanceNotFoundException, IntrospectionException, ReflectionException { + return null; + } + + @Override + public boolean isInstanceOf(ObjectName name, String className) throws InstanceNotFoundException { + return false; + } + + @Override + public Object instantiate(String className) throws ReflectionException, MBeanException { + return null; + } + + @Override + public Object instantiate(String className, ObjectName loaderName) throws ReflectionException, MBeanException, InstanceNotFoundException { + return null; + } + + @Override + public Object instantiate(String className, Object[] params, String[] signature) throws ReflectionException, MBeanException { + return null; + } + + @Override + public Object instantiate(String className, ObjectName loaderName, Object[] params, String[] signature) throws ReflectionException, MBeanException, InstanceNotFoundException { + return null; + } + + @Override + public ObjectInputStream deserialize(ObjectName name, byte[] data) throws InstanceNotFoundException, OperationsException { + return null; + } + + @Override + public ObjectInputStream deserialize(String className, byte[] data) throws OperationsException, ReflectionException { + return null; + } + + @Override + public ObjectInputStream deserialize(String className, ObjectName loaderName, byte[] data) throws InstanceNotFoundException, OperationsException, ReflectionException { + return null; + } + + @Override + public ClassLoader getClassLoaderFor(ObjectName mbeanName) throws InstanceNotFoundException { + return null; + } + + @Override + public ClassLoader getClassLoader(ObjectName loaderName) throws InstanceNotFoundException { + return null; + } + + @Override + public ClassLoaderRepository getClassLoaderRepository() { + return null; + } + }; + + + final BrokerService broker = new BrokerService(); + + ExecutorService executor = Executors.newFixedThreadPool(4); + executor.execute(new Runnable() { + @Override + public void run() { + try { + broker.getManagementContext().setMBeanServer(mBeanServer); + broker.start(); + } catch (BrokerStoppedException expected) { + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + + executor.execute(new Runnable() { + @Override + public void run() { + try { + assertTrue("broker has registered mbean", gotBrokerMbean.await(10, TimeUnit.SECONDS)); + broker.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + executor.shutdown(); + assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS)); + + BrokerService second = new BrokerService(); + second.getManagementContext().setMBeanServer(mBeanServer); + second.start(); + second.stop(); + + } + +}