AMQ-7021 - add unsynchronised accessors to destination map for usage with rw lock from abstract region; allow concurrent read of the destination map

(cherry picked from commit 0b76d3a0ea)
This commit is contained in:
gtully 2018-07-26 10:42:10 +01:00 committed by Timothy Bish
parent 2a2e01df6a
commit c0a6f47a47
4 changed files with 172 additions and 23 deletions

View File

@ -162,7 +162,7 @@ public abstract class AbstractRegion implements Region {
addSubscriptionsForDestination(context, dest);
destinations.put(destination, dest);
updateRegionDestCounts(destination, 1);
destinationMap.put(destination, dest);
destinationMap.unsynchronizedPut(destination, dest);
}
if (dest == null) {
throw new DestinationDoesNotExistException(destination.getQualifiedName());
@ -217,7 +217,7 @@ public abstract class AbstractRegion implements Region {
// If a destination isn't specified, then just count up
// non-advisory destinations (ie count all destinations)
int destinationSize = (int) (entry.getDestination() != null ?
destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
destinationMap.unsynchronizedGet(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
if (destinationSize >= entry.getMaxDestinations()) {
if (entry.getDestination() != null) {
throw new IllegalStateException(
@ -296,7 +296,7 @@ public abstract class AbstractRegion implements Region {
dest.removeSubscription(context, sub, 0l);
}
}
destinationMap.remove(destination, dest);
destinationMap.unsynchronizedRemove(destination, dest);
dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) {
@ -321,7 +321,7 @@ public abstract class AbstractRegion implements Region {
public Set<Destination> getDestinations(ActiveMQDestination destination) {
destinationsLock.readLock().lock();
try{
return destinationMap.get(destination);
return destinationMap.unsynchronizedGet(destination);
} finally {
destinationsLock.readLock().unlock();
}
@ -387,7 +387,7 @@ public abstract class AbstractRegion implements Region {
List<Destination> addList = new ArrayList<Destination>();
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
addList.add(dest);
}
// ensure sub visible to any new dest addSubscriptionsForDestination
@ -467,7 +467,7 @@ public abstract class AbstractRegion implements Region {
List<Destination> removeList = new ArrayList<Destination>();
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
removeList.add(dest);
}
} finally {
@ -552,15 +552,7 @@ public abstract class AbstractRegion implements Region {
// Try to auto create the destination... re-invoke broker
// from the
// top so that the proper security checks are performed.
context.getBroker().addDestination(context, destination, createTemporary);
dest = addDestination(context, destination, false);
// We should now have the dest created.
destinationsLock.readLock().lock();
try {
dest = destinations.get(destination);
} finally {
destinationsLock.readLock().unlock();
}
dest = context.getBroker().addDestination(context, destination, createTemporary);
}
if (dest == null) {
@ -644,7 +636,7 @@ public abstract class AbstractRegion implements Region {
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
dest.addProducer(context, info);
}
} finally {
@ -665,7 +657,7 @@ public abstract class AbstractRegion implements Region {
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
for (Destination dest : (Set<Destination>) destinationMap.unsynchronizedGet(info.getDestination())) {
dest.removeProducer(context, info);
}
} finally {

View File

@ -59,7 +59,7 @@ public class MappedQueueFilter extends DestinationFilter {
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
final ActiveMQDestination newDestination = sub.getActiveMQDestination();
final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
BaseDestination regionDest = null;
for (Destination virtualDest : virtualDests) {
if (virtualDest.getActiveMQDestination().isTopic() &&
@ -75,6 +75,9 @@ public class MappedQueueFilter extends DestinationFilter {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
if (regionDest == null) {
regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
}
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
}

View File

@ -23,7 +23,6 @@ import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.activemq.command.ActiveMQDestination;
@ -60,13 +59,20 @@ public class DestinationMap {
* matching values.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public synchronized Set get(ActiveMQDestination key) {
public Set get(ActiveMQDestination key) {
synchronized (this) {
return unsynchronizedGet(key);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
public Set unsynchronizedGet(ActiveMQDestination key) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
Set answer = new HashSet(destinations.length);
for (int i = 0; i < destinations.length; i++) {
ActiveMQDestination childDestination = destinations[i];
Object value = get(childDestination);
Object value = unsynchronizedGet(childDestination);
if (value instanceof Set) {
answer.addAll((Set) value);
} else if (value != null) {
@ -78,7 +84,13 @@ public class DestinationMap {
return findWildcardMatches(key);
}
public synchronized void put(ActiveMQDestination key, Object value) {
public void put(ActiveMQDestination key, Object value) {
synchronized (this) {
unsynchronizedPut(key, value);
}
}
public void unsynchronizedPut(ActiveMQDestination key, Object value) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {
@ -95,7 +107,13 @@ public class DestinationMap {
/**
* Removes the value from the associated destination
*/
public synchronized void remove(ActiveMQDestination key, Object value) {
public void remove(ActiveMQDestination key, Object value) {
synchronized (this) {
unsynchronizedRemove(key, value);
}
}
public void unsynchronizedRemove(ActiveMQDestination key, Object value) {
if (key.isComposite()) {
ActiveMQDestination[] destinations = key.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class VirtualTopicDestinationMapAccessTest {
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDestinationMapAccessTest.class);
BrokerService brokerService;
ConnectionFactory connectionFactory;
@Before
public void createBroker() throws Exception {
createBroker(true);
}
public void createBroker(boolean delete) throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(delete);
brokerService.setAdvisorySupport(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
zeroPrefetch.setAll(0);
activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
connectionFactory = activeMQConnectionFactory;
}
@After
public void stopBroker() throws Exception {
brokerService.stop();
}
@Test
@Ignore("perf test that needs manual comparator")
public void testX() throws Exception {
final int numConnections = 200;
final int numDestinations = 10000;
final AtomicInteger numConsumers = new AtomicInteger(numDestinations);
final AtomicInteger numProducers = new AtomicInteger(numDestinations);
ExecutorService executorService = Executors.newFixedThreadPool(numConnections);
// precreate dests to accentuate read access
for (int i=0; i<numDestinations; i++ ) {
brokerService.getRegionBroker().addDestination(
brokerService.getAdminConnectionContext(),
new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i),
false);
brokerService.getRegionBroker().addDestination(
brokerService.getAdminConnectionContext(), new ActiveMQTopic("VirtualTopic.TEST-" + i), false);
}
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
int opsCount = 0;
Connection connection1 = connectionFactory.createConnection();
connection1.start();
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
do {
boolean consumerOrProducer = opsCount++ % 2 == 0;
int i = consumerOrProducer ? numConsumers.decrementAndGet() : numProducers.decrementAndGet();
if (i > 0) {
if (consumerOrProducer) {
session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST-" + i));
} else {
producer.send(new ActiveMQTopic("VirtualTopic.TEST-" + i), new ActiveMQMessage());
}
}
} while (numConsumers.get() > 0 || numProducers.get() > 0);
connection1.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < numConnections; i++) {
executorService.execute(runnable);
}
long start = System.currentTimeMillis();
LOG.info("Starting timer: " + start);
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.MINUTES);
LOG.info("Done, duration: " + (System.currentTimeMillis() - start));
}
}