mirror of https://github.com/apache/activemq.git
AMQ-7021 - add unsynchronised accessors to destination map for usage with rw lock from abstract region; allow concurrent read of the destination map
This commit is contained in:
parent
28819aea4a
commit
0b76d3a0ea
|
@ -162,7 +162,7 @@ public abstract class AbstractRegion implements Region {
|
|||
addSubscriptionsForDestination(context, dest);
|
||||
destinations.put(destination, dest);
|
||||
updateRegionDestCounts(destination, 1);
|
||||
destinationMap.put(destination, dest);
|
||||
destinationMap.unsynchronizedPut(destination, dest);
|
||||
}
|
||||
if (dest == null) {
|
||||
throw new DestinationDoesNotExistException(destination.getQualifiedName());
|
||||
|
@ -217,7 +217,7 @@ public abstract class AbstractRegion implements Region {
|
|||
// If a destination isn't specified, then just count up
|
||||
// non-advisory destinations (ie count all destinations)
|
||||
int destinationSize = (int) (entry.getDestination() != null ?
|
||||
destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
|
||||
destinationMap.unsynchronizedGet(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
|
||||
if (destinationSize >= entry.getMaxDestinations()) {
|
||||
if (entry.getDestination() != null) {
|
||||
throw new IllegalStateException(
|
||||
|
@ -296,7 +296,7 @@ public abstract class AbstractRegion implements Region {
|
|||
dest.removeSubscription(context, sub, 0l);
|
||||
}
|
||||
}
|
||||
destinationMap.remove(destination, dest);
|
||||
destinationMap.unsynchronizedRemove(destination, dest);
|
||||
dispose(context, dest);
|
||||
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
|
||||
if (destinationInterceptor != null) {
|
||||
|
@ -321,7 +321,7 @@ public abstract class AbstractRegion implements Region {
|
|||
public Set<Destination> getDestinations(ActiveMQDestination destination) {
|
||||
destinationsLock.readLock().lock();
|
||||
try{
|
||||
return destinationMap.get(destination);
|
||||
return destinationMap.unsynchronizedGet(destination);
|
||||
} finally {
|
||||
destinationsLock.readLock().unlock();
|
||||
}
|
||||
|
@ -387,7 +387,7 @@ public abstract class AbstractRegion implements Region {
|
|||
List<Destination> addList = new ArrayList<Destination>();
|
||||
destinationsLock.readLock().lock();
|
||||
try {
|
||||
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
|
||||
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
|
||||
addList.add(dest);
|
||||
}
|
||||
// ensure sub visible to any new dest addSubscriptionsForDestination
|
||||
|
@ -467,7 +467,7 @@ public abstract class AbstractRegion implements Region {
|
|||
List<Destination> removeList = new ArrayList<Destination>();
|
||||
destinationsLock.readLock().lock();
|
||||
try {
|
||||
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
|
||||
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
|
||||
removeList.add(dest);
|
||||
}
|
||||
} finally {
|
||||
|
@ -552,15 +552,7 @@ public abstract class AbstractRegion implements Region {
|
|||
// Try to auto create the destination... re-invoke broker
|
||||
// from the
|
||||
// top so that the proper security checks are performed.
|
||||
context.getBroker().addDestination(context, destination, createTemporary);
|
||||
dest = addDestination(context, destination, false);
|
||||
// We should now have the dest created.
|
||||
destinationsLock.readLock().lock();
|
||||
try {
|
||||
dest = destinations.get(destination);
|
||||
} finally {
|
||||
destinationsLock.readLock().unlock();
|
||||
}
|
||||
dest = context.getBroker().addDestination(context, destination, createTemporary);
|
||||
}
|
||||
|
||||
if (dest == null) {
|
||||
|
@ -644,7 +636,7 @@ public abstract class AbstractRegion implements Region {
|
|||
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||
destinationsLock.readLock().lock();
|
||||
try {
|
||||
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
|
||||
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
|
||||
dest.addProducer(context, info);
|
||||
}
|
||||
} finally {
|
||||
|
@ -665,7 +657,7 @@ public abstract class AbstractRegion implements Region {
|
|||
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||
destinationsLock.readLock().lock();
|
||||
try {
|
||||
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
|
||||
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
|
||||
dest.removeProducer(context, info);
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -59,7 +59,7 @@ public class MappedQueueFilter extends DestinationFilter {
|
|||
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
|
||||
|
||||
final ActiveMQDestination newDestination = sub.getActiveMQDestination();
|
||||
final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
|
||||
BaseDestination regionDest = null;
|
||||
|
||||
for (Destination virtualDest : virtualDests) {
|
||||
if (virtualDest.getActiveMQDestination().isTopic() &&
|
||||
|
@ -75,6 +75,9 @@ public class MappedQueueFilter extends DestinationFilter {
|
|||
final Message copy = message.copy();
|
||||
copy.setOriginalDestination(message.getDestination());
|
||||
copy.setDestination(newDestination);
|
||||
if (regionDest == null) {
|
||||
regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
|
||||
}
|
||||
copy.setRegionDestination(regionDest);
|
||||
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
@ -60,13 +59,20 @@ public class DestinationMap {
|
|||
* matching values.
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public synchronized Set get(ActiveMQDestination key) {
|
||||
public Set get(ActiveMQDestination key) {
|
||||
synchronized (this) {
|
||||
return unsynchronizedGet(key);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public Set unsynchronizedGet(ActiveMQDestination key) {
|
||||
if (key.isComposite()) {
|
||||
ActiveMQDestination[] destinations = key.getCompositeDestinations();
|
||||
Set answer = new HashSet(destinations.length);
|
||||
for (int i = 0; i < destinations.length; i++) {
|
||||
ActiveMQDestination childDestination = destinations[i];
|
||||
Object value = get(childDestination);
|
||||
Object value = unsynchronizedGet(childDestination);
|
||||
if (value instanceof Set) {
|
||||
answer.addAll((Set) value);
|
||||
} else if (value != null) {
|
||||
|
@ -78,7 +84,13 @@ public class DestinationMap {
|
|||
return findWildcardMatches(key);
|
||||
}
|
||||
|
||||
public synchronized void put(ActiveMQDestination key, Object value) {
|
||||
public void put(ActiveMQDestination key, Object value) {
|
||||
synchronized (this) {
|
||||
unsynchronizedPut(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
public void unsynchronizedPut(ActiveMQDestination key, Object value) {
|
||||
if (key.isComposite()) {
|
||||
ActiveMQDestination[] destinations = key.getCompositeDestinations();
|
||||
for (int i = 0; i < destinations.length; i++) {
|
||||
|
@ -95,7 +107,13 @@ public class DestinationMap {
|
|||
/**
|
||||
* Removes the value from the associated destination
|
||||
*/
|
||||
public synchronized void remove(ActiveMQDestination key, Object value) {
|
||||
public void remove(ActiveMQDestination key, Object value) {
|
||||
synchronized (this) {
|
||||
unsynchronizedRemove(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
public void unsynchronizedRemove(ActiveMQDestination key, Object value) {
|
||||
if (key.isComposite()) {
|
||||
ActiveMQDestination[] destinations = key.getCompositeDestinations();
|
||||
for (int i = 0; i < destinations.length; i++) {
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.virtual;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class VirtualTopicDestinationMapAccessTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDestinationMapAccessTest.class);
|
||||
|
||||
BrokerService brokerService;
|
||||
ConnectionFactory connectionFactory;
|
||||
|
||||
@Before
|
||||
public void createBroker() throws Exception {
|
||||
createBroker(true);
|
||||
}
|
||||
|
||||
public void createBroker(boolean delete) throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
brokerService.setDeleteAllMessagesOnStartup(delete);
|
||||
brokerService.setAdvisorySupport(false);
|
||||
brokerService.start();
|
||||
|
||||
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
|
||||
ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
|
||||
zeroPrefetch.setAll(0);
|
||||
activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
|
||||
connectionFactory = activeMQConnectionFactory;
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
brokerService.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("perf test that needs manual comparator")
|
||||
public void testX() throws Exception {
|
||||
|
||||
final int numConnections = 200;
|
||||
final int numDestinations = 10000;
|
||||
final AtomicInteger numConsumers = new AtomicInteger(numDestinations);
|
||||
final AtomicInteger numProducers = new AtomicInteger(numDestinations);
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(numConnections);
|
||||
|
||||
// precreate dests to accentuate read access
|
||||
for (int i=0; i<numDestinations; i++ ) {
|
||||
brokerService.getRegionBroker().addDestination(
|
||||
brokerService.getAdminConnectionContext(),
|
||||
new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i),
|
||||
false);
|
||||
brokerService.getRegionBroker().addDestination(
|
||||
brokerService.getAdminConnectionContext(), new ActiveMQTopic("VirtualTopic.TEST-" + i), false);
|
||||
|
||||
}
|
||||
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
try {
|
||||
int opsCount = 0;
|
||||
|
||||
Connection connection1 = connectionFactory.createConnection();
|
||||
connection1.start();
|
||||
|
||||
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
|
||||
do {
|
||||
boolean consumerOrProducer = opsCount++ % 2 == 0;
|
||||
int i = consumerOrProducer ? numConsumers.decrementAndGet() : numProducers.decrementAndGet();
|
||||
if (i > 0) {
|
||||
if (consumerOrProducer) {
|
||||
session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i));
|
||||
} else {
|
||||
producer.send(new ActiveMQTopic("VirtualTopic.TEST-" + i), new ActiveMQMessage());
|
||||
}
|
||||
}
|
||||
} while (numConsumers.get() > 0 || numProducers.get() > 0);
|
||||
connection1.close();
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < numConnections; i++) {
|
||||
executorService.execute(runnable);
|
||||
}
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
LOG.info("Starting timer: " + start);
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(5, TimeUnit.MINUTES);
|
||||
LOG.info("Done, duration: " + (System.currentTimeMillis() - start));
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue