From 5daeb53cc418e8f233aadf53aaf4cb720d50b951 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 17 Aug 2011 09:25:45 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3455: Broker may deadlock when creating queues under load with wildcard consumers - remove unnecessary class sync that results in lock order acquisition problem - related to https://issues.apache.org/jira/browse/AMQ-3197 sync on create. Additional test with concurrent producers/consumers on wildcards reproduces git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1158591 13f79535-47bb-0310-9956-ffa450edef68 --- .../VirtualDestinationInterceptor.java | 6 +- .../ConcurrentDestinationCreationTest.java | 152 ++++++++++++++++++ 2 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentDestinationCreationTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java index 85e605a328..fa7099f4c1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java @@ -43,10 +43,10 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor { private DestinationMap destinationMap = new DestinationMap(); private VirtualDestination[] virtualDestinations; - public synchronized Destination intercept(Destination destination) { - Set virtualDestinations = destinationMap.get(destination.getActiveMQDestination()); + public Destination intercept(Destination destination) { + Set matchingDestinations = destinationMap.get(destination.getActiveMQDestination()); List destinations = new ArrayList(); - for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) { + for (Iterator iter = matchingDestinations.iterator(); iter.hasNext();) { VirtualDestination virtualDestination = (VirtualDestination)iter.next(); Destination newDestination = virtualDestination.intercept(destination); destinations.add(newDestination); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentDestinationCreationTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentDestinationCreationTest.java new file mode 100644 index 0000000000..eb9e5d4144 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentDestinationCreationTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConcurrentDestinationCreationTest extends org.apache.activemq.TestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDestinationCreationTest.class); + BrokerService broker; + + @Override + protected void setUp() throws Exception { + broker = createBroker(); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + broker.stop(); + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.watchTopicAdvisories=false&jms.closeTimeout=35000"); + } + + BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setDeleteAllMessagesOnStartup(true); + service.setAdvisorySupport(false); + service.setTransportConnectorURIs(new String[]{"tcp://localhost:0"}); + service.setPersistent(false); + service.setUseJmx(false); + service.start(); + return service; + } + + public void testSendRateWithActivatingConsumers() throws Exception { + + final Vector exceptions = new Vector(); + final int jobs = 50; + final int destinationCount = 10; + final CountDownLatch allDone = new CountDownLatch(jobs); + ExecutorService executor = java.util.concurrent.Executors.newCachedThreadPool(); + for (int i = 0; i < jobs; i++) { + if (i %2 == 0 && i")); + consumer.receiveNoWait(); + } + connection.close(); + allDone.countDown(); + LOG.info("Consumers done!"); + } catch (Exception ignored) { + LOG.error("unexpected ", ignored); + exceptions.add(ignored); + } + } + }); + } + } + LOG.info("Waiting for completion"); + executor.shutdown(); + boolean success = allDone.await(30, TimeUnit.SECONDS); + if (!success) { + dumpAllThreads("hung"); + + ThreadMXBean bean = ManagementFactory.getThreadMXBean(); + LOG.info("Supports dead lock detection: " + bean.isSynchronizerUsageSupported()); + long[] threadIds = bean.findDeadlockedThreads(); + if (threadIds != null) { + System.err.println("Dead locked threads...."); + ThreadInfo[] infos = bean.getThreadInfo(threadIds); + + for (ThreadInfo info : infos) { + StackTraceElement[] stack = info.getStackTrace(); + System.err.println(" " + info + ", stack size::" + stack.length); + for (StackTraceElement stackEntry : stack) { + System.err.println(" " + stackEntry); + } + } + } + } + assertTrue("Finished on time", success); + assertTrue("No unexpected exceptions", exceptions.isEmpty()); + } +}