diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 50cd32411d..a18e7939ae 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -162,7 +162,7 @@ public abstract class AbstractRegion implements Region { addSubscriptionsForDestination(context, dest); destinations.put(destination, dest); updateRegionDestCounts(destination, 1); - destinationMap.put(destination, dest); + destinationMap.unsynchronizedPut(destination, dest); } if (dest == null) { throw new DestinationDoesNotExistException(destination.getQualifiedName()); @@ -217,7 +217,7 @@ public abstract class AbstractRegion implements Region { // If a destination isn't specified, then just count up // non-advisory destinations (ie count all destinations) int destinationSize = (int) (entry.getDestination() != null ? - destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount()); + destinationMap.unsynchronizedGet(entry.getDestination()).size() : regionStatistics.getDestinations().getCount()); if (destinationSize >= entry.getMaxDestinations()) { if (entry.getDestination() != null) { throw new IllegalStateException( @@ -296,7 +296,7 @@ public abstract class AbstractRegion implements Region { dest.removeSubscription(context, sub, 0l); } } - destinationMap.remove(destination, dest); + destinationMap.unsynchronizedRemove(destination, dest); dispose(context, dest); DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); if (destinationInterceptor != null) { @@ -321,7 +321,7 @@ public abstract class AbstractRegion implements Region { public Set getDestinations(ActiveMQDestination destination) { destinationsLock.readLock().lock(); try{ - return destinationMap.get(destination); + return destinationMap.unsynchronizedGet(destination); } finally { destinationsLock.readLock().unlock(); } @@ -387,7 +387,7 @@ public abstract class AbstractRegion implements Region { List addList = new ArrayList(); destinationsLock.readLock().lock(); try { - for (Destination dest : (Set) destinationMap.get(info.getDestination())) { + for (Destination dest : (Set) destinationMap.unsynchronizedGet(info.getDestination())) { addList.add(dest); } // ensure sub visible to any new dest addSubscriptionsForDestination @@ -467,7 +467,7 @@ public abstract class AbstractRegion implements Region { List removeList = new ArrayList(); destinationsLock.readLock().lock(); try { - for (Destination dest : (Set) destinationMap.get(info.getDestination())) { + for (Destination dest : (Set) destinationMap.unsynchronizedGet(info.getDestination())) { removeList.add(dest); } } finally { @@ -552,15 +552,7 @@ public abstract class AbstractRegion implements Region { // Try to auto create the destination... re-invoke broker // from the // top so that the proper security checks are performed. - context.getBroker().addDestination(context, destination, createTemporary); - dest = addDestination(context, destination, false); - // We should now have the dest created. - destinationsLock.readLock().lock(); - try { - dest = destinations.get(destination); - } finally { - destinationsLock.readLock().unlock(); - } + dest = context.getBroker().addDestination(context, destination, createTemporary); } if (dest == null) { @@ -644,7 +636,7 @@ public abstract class AbstractRegion implements Region { public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { destinationsLock.readLock().lock(); try { - for (Destination dest : (Set) destinationMap.get(info.getDestination())) { + for (Destination dest : (Set) destinationMap.unsynchronizedGet(info.getDestination())) { dest.addProducer(context, info); } } finally { @@ -665,7 +657,7 @@ public abstract class AbstractRegion implements Region { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { destinationsLock.readLock().lock(); try { - for (Destination dest : (Set) destinationMap.get(info.getDestination())) { + for (Destination dest : (Set) destinationMap.unsynchronizedGet(info.getDestination())) { dest.removeProducer(context, info); } } finally { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java index 490bf7bde9..2baa33a398 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java @@ -59,7 +59,7 @@ public class MappedQueueFilter extends DestinationFilter { final Set virtualDests = regionBroker.getDestinations(virtualDestination); final ActiveMQDestination newDestination = sub.getActiveMQDestination(); - final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); + BaseDestination regionDest = null; for (Destination virtualDest : virtualDests) { if (virtualDest.getActiveMQDestination().isTopic() && @@ -75,6 +75,9 @@ public class MappedQueueFilter extends DestinationFilter { final Message copy = message.copy(); copy.setOriginalDestination(message.getDestination()); copy.setDestination(newDestination); + if (regionDest == null) { + regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); + } copy.setRegionDestination(regionDest); sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); } diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java index 624b10fa7e..e16d80e521 100644 --- a/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java +++ b/activemq-client/src/main/java/org/apache/activemq/filter/DestinationMap.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.activemq.command.ActiveMQDestination; @@ -60,13 +59,20 @@ public class DestinationMap { * matching values. */ @SuppressWarnings({"rawtypes", "unchecked"}) - public synchronized Set get(ActiveMQDestination key) { + public Set get(ActiveMQDestination key) { + synchronized (this) { + return unsynchronizedGet(key); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Set unsynchronizedGet(ActiveMQDestination key) { if (key.isComposite()) { ActiveMQDestination[] destinations = key.getCompositeDestinations(); Set answer = new HashSet(destinations.length); for (int i = 0; i < destinations.length; i++) { ActiveMQDestination childDestination = destinations[i]; - Object value = get(childDestination); + Object value = unsynchronizedGet(childDestination); if (value instanceof Set) { answer.addAll((Set) value); } else if (value != null) { @@ -78,7 +84,13 @@ public class DestinationMap { return findWildcardMatches(key); } - public synchronized void put(ActiveMQDestination key, Object value) { + public void put(ActiveMQDestination key, Object value) { + synchronized (this) { + unsynchronizedPut(key, value); + } + } + + public void unsynchronizedPut(ActiveMQDestination key, Object value) { if (key.isComposite()) { ActiveMQDestination[] destinations = key.getCompositeDestinations(); for (int i = 0; i < destinations.length; i++) { @@ -95,7 +107,13 @@ public class DestinationMap { /** * Removes the value from the associated destination */ - public synchronized void remove(ActiveMQDestination key, Object value) { + public void remove(ActiveMQDestination key, Object value) { + synchronized (this) { + unsynchronizedRemove(key, value); + } + } + + public void unsynchronizedRemove(ActiveMQDestination key, Object value) { if (key.isComposite()) { ActiveMQDestination[] destinations = key.getCompositeDestinations(); for (int i = 0; i < destinations.length; i++) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java new file mode 100644 index 0000000000..c2f2b0edcc --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDestinationMapAccessTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class VirtualTopicDestinationMapAccessTest { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDestinationMapAccessTest.class); + + BrokerService brokerService; + ConnectionFactory connectionFactory; + + @Before + public void createBroker() throws Exception { + createBroker(true); + } + + public void createBroker(boolean delete) throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(delete); + brokerService.setAdvisorySupport(false); + brokerService.start(); + + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy(); + zeroPrefetch.setAll(0); + activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch); + connectionFactory = activeMQConnectionFactory; + } + + @After + public void stopBroker() throws Exception { + brokerService.stop(); + } + + @Test + @Ignore("perf test that needs manual comparator") + public void testX() throws Exception { + + final int numConnections = 200; + final int numDestinations = 10000; + final AtomicInteger numConsumers = new AtomicInteger(numDestinations); + final AtomicInteger numProducers = new AtomicInteger(numDestinations); + + ExecutorService executorService = Executors.newFixedThreadPool(numConnections); + + // precreate dests to accentuate read access + for (int i=0; i 0) { + if (consumerOrProducer) { + session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i)); + } else { + producer.send(new ActiveMQTopic("VirtualTopic.TEST-" + i), new ActiveMQMessage()); + } + } + } while (numConsumers.get() > 0 || numProducers.get() > 0); + connection1.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + for (int i = 0; i < numConnections; i++) { + executorService.execute(runnable); + } + + long start = System.currentTimeMillis(); + LOG.info("Starting timer: " + start); + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.MINUTES); + LOG.info("Done, duration: " + (System.currentTimeMillis() - start)); + + } +}