Use a read / write lock on the destination mutex as many of the access points are simply reads of the underlying maps or a quite copy to another Collection.  The only time a write is done is during addDestination and removeDestination.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1160894 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-08-23 21:51:10 +00:00
parent d36f2610de
commit 3efd4b0515
2 changed files with 103 additions and 49 deletions

View File

@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
@ -65,7 +67,7 @@ public abstract class AbstractRegion implements Region {
protected final RegionBroker broker;
protected boolean autoCreateDestinations = true;
protected final TaskRunnerFactory taskRunnerFactory;
protected final Object destinationsMutex = new Object();
protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock();
protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
protected boolean started;
@ -96,21 +98,27 @@ public abstract class AbstractRegion implements Region {
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.getBroker().addDestination(context, dest, false);
}
synchronized (destinationsMutex) {
destinationsLock.readLock().lock();
try{
for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
Destination dest = i.next();
dest.start();
}
} finally {
destinationsLock.readLock().unlock();
}
}
public void stop() throws Exception {
started = false;
synchronized (destinationsMutex) {
destinationsLock.readLock().lock();
try{
for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
Destination dest = i.next();
dest.stop();
}
} finally {
destinationsLock.readLock().unlock();
}
destinations.clear();
}
@ -118,7 +126,9 @@ public abstract class AbstractRegion implements Region {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
boolean createIfTemporary) throws Exception {
LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
synchronized (destinationsMutex) {
destinationsLock.writeLock().lock();
try {
Destination dest = destinations.get(destination);
if (dest == null) {
if (destination.isTemporary() == false || createIfTemporary) {
@ -150,6 +160,8 @@ public abstract class AbstractRegion implements Region {
}
}
return dest;
} finally {
destinationsLock.writeLock().unlock();
}
}
@ -197,7 +209,8 @@ public abstract class AbstractRegion implements Region {
LOG.debug("Removing destination: " + destination);
synchronized (destinationsMutex) {
destinationsLock.writeLock().lock();
try {
Destination dest = destinations.remove(destination);
if (dest != null) {
// timeout<0 or we timed out, we now force any remaining
@ -218,6 +231,8 @@ public abstract class AbstractRegion implements Region {
} else {
LOG.debug("Destination doesn't exist: " + dest);
}
} finally {
destinationsLock.writeLock().unlock();
}
}
@ -226,18 +241,26 @@ public abstract class AbstractRegion implements Region {
*
* @return a set of matching destination objects.
*/
@SuppressWarnings("unchecked")
public Set<Destination> getDestinations(ActiveMQDestination destination) {
synchronized (destinationsMutex) {
destinationsLock.readLock().lock();
try{
return destinationMap.get(destination);
} finally {
destinationsLock.readLock().unlock();
}
}
public Map<ActiveMQDestination, Destination> getDestinationMap() {
synchronized (destinationsMutex) {
destinationsLock.readLock().lock();
try{
return new HashMap<ActiveMQDestination, Destination>(destinations);
} finally {
destinationsLock.readLock().unlock();
}
}
@SuppressWarnings("unchecked")
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: "
+ info.getDestination());
@ -258,8 +281,7 @@ public abstract class AbstractRegion implements Region {
synchronized (addGuard) {
Subscription o = subscriptions.get(info.getConsumerId());
if (o != null) {
LOG
.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
return o;
}
@ -293,11 +315,13 @@ public abstract class AbstractRegion implements Region {
// Add the subscription to all the matching queues.
// But copy the matches first - to prevent deadlocks
List<Destination> addList = new ArrayList<Destination>();
synchronized (destinationsMutex) {
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
addList.add(dest);
}
} finally {
destinationsLock.readLock().unlock();
}
for (Destination dest : addList) {
@ -317,6 +341,7 @@ public abstract class AbstractRegion implements Region {
*
* @return Set of all stored destinations
*/
@SuppressWarnings("rawtypes")
public Set getDurableDestinations() {
return destinationFactory.getDestinations();
}
@ -326,12 +351,16 @@ public abstract class AbstractRegion implements Region {
*/
protected Set<ActiveMQDestination> getInactiveDestinations() {
Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
synchronized (destinationsMutex) {
destinationsLock.readLock().lock();
try {
inactiveDests.removeAll(destinations.keySet());
} finally {
destinationsLock.readLock().unlock();
}
return inactiveDests;
}
@SuppressWarnings("unchecked")
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: "
+ info.getDestination());
@ -342,12 +371,13 @@ public abstract class AbstractRegion implements Region {
// remove the subscription from all the matching queues.
List<Destination> removeList = new ArrayList<Destination>();
synchronized (destinationsMutex) {
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
removeList.add(dest);
}
} finally {
destinationsLock.readLock().unlock();
}
for (Destination dest : removeList) {
dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
@ -407,9 +437,14 @@ public abstract class AbstractRegion implements Region {
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception {
Destination dest = null;
synchronized (destinationsMutex) {
destinationsLock.readLock().lock();
try {
dest = destinations.get(destination);
} finally {
destinationsLock.readLock().unlock();
}
if (dest == null) {
if (isAutoCreateDestinations()) {
// Try to auto create the destination... re-invoke broker
@ -423,10 +458,14 @@ public abstract class AbstractRegion implements Region {
// this error
}
// We should now have the dest created.
synchronized (destinationsMutex) {
destinationsLock.readLock().lock();
try {
dest = destinations.get(destination);
} finally {
destinationsLock.readLock().unlock();
}
}
if (dest == null) {
throw new JMSException("The destination " + destination + " does not exist.");
}
@ -454,9 +493,13 @@ public abstract class AbstractRegion implements Region {
protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification)
throws Exception {
Destination dest = null;
synchronized (destinationsMutex) {
destinationsLock.readLock().lock();
try {
dest = destinations.get(messageDispatchNotification.getDestination());
} finally {
destinationsLock.readLock().unlock();
}
if (dest != null) {
dest.processDispatchNotification(messageDispatchNotification);
} else {
@ -468,15 +511,17 @@ public abstract class AbstractRegion implements Region {
}
public void gc() {
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next();
for (Subscription sub : subscriptions.values()) {
sub.gc();
}
synchronized (destinationsMutex) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Destination dest = iter.next();
destinationsLock.readLock().lock();
try {
for (Destination dest : destinations.values()) {
dest.gc();
}
} finally {
destinationsLock.readLock().unlock();
}
}
@ -495,12 +540,15 @@ public abstract class AbstractRegion implements Region {
this.autoCreateDestinations = autoCreateDestinations;
}
@SuppressWarnings("unchecked")
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
synchronized (destinationsMutex) {
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
dest.addProducer(context, info);
}
} finally {
destinationsLock.readLock().unlock();
}
}
@ -512,12 +560,15 @@ public abstract class AbstractRegion implements Region {
* @throws Exception
* TODO
*/
@SuppressWarnings("unchecked")
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
synchronized (destinationsMutex) {
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
destinationsLock.readLock().lock();
try {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
dest.removeProducer(context, info);
}
} finally {
destinationsLock.readLock().unlock();
}
}

View File

@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public class TopicRegion extends AbstractRegion {
private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
@ -60,7 +60,6 @@ public class TopicRegion extends AbstractRegion {
public void run() {
doCleanup();
}
};
this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
}
@ -118,15 +117,17 @@ public class TopicRegion extends AbstractRegion {
if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
// Remove the consumer first then add it.
durableSubscriptions.remove(key);
synchronized (destinationsMutex) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Destination dest = iter.next();
destinationsLock.readLock().lock();
try {
for (Destination dest : destinations.values()) {
//Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
}
}
} finally {
destinationsLock.readLock().unlock();
}
super.removeConsumer(context, sub.getConsumerInfo());
super.addConsumer(context, info);
@ -179,16 +180,19 @@ public class TopicRegion extends AbstractRegion {
throw new JMSException("Durable consumer is in use");
}
synchronized (destinationsMutex) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Destination dest = iter.next();
//Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
}
destinationsLock.readLock().lock();
try {
for (Destination dest : destinations.values()) {
//Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
}
}
} finally {
destinationsLock.readLock().unlock();
}
if (subscriptions.get(sub.getConsumerInfo()) != null) {
super.removeConsumer(context, sub.getConsumerInfo());
} else {
@ -243,8 +247,7 @@ public class TopicRegion extends AbstractRegion {
// Now perhaps there other durable subscriptions (via wild card)
// that would match this destination..
durableSubscriptions.values();
for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
DurableTopicSubscription sub = iterator.next();
for (DurableTopicSubscription sub : durableSubscriptions.values()) {
// Skip over subscriptions that we allready added..
if (dupChecker.contains(sub)) {
continue;
@ -284,16 +287,16 @@ public class TopicRegion extends AbstractRegion {
@Override
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
ActiveMQDestination destination = info.getDestination();
if (info.isDurable()) {
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
throw new JMSException("Cannot create a durable subscription for an advisory Topic");
}
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key);
if (sub == null) {
sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
if (destination != null && broker.getDestinationPolicy() != null) {