Use of sync block was causing unneeded serialization of calls to add and remove producers and consumers when the inactivity purge wasn't running.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1141741 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-06-30 22:22:04 +00:00
parent fc4a6497ea
commit ead8df6ed2
1 changed files with 29 additions and 9 deletions

View File

@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
@ -105,6 +106,8 @@ public class RegionBroker extends EmptyBroker {
private final Scheduler scheduler; private final Scheduler scheduler;
private final ThreadPoolExecutor executor; private final ThreadPoolExecutor executor;
private boolean allowTempAutoCreationOnSend; private boolean allowTempAutoCreationOnSend;
private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
private final Runnable purgeInactiveDestinationsTask = new Runnable() { private final Runnable purgeInactiveDestinationsTask = new Runnable() {
public void run() { public void run() {
purgeInactiveDestinations(); purgeInactiveDestinations();
@ -388,9 +391,9 @@ public class RegionBroker extends EmptyBroker {
@Override @Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
synchronized (purgeInactiveDestinationsTask) { if (destination != null) {
if (destination != null) { inactiveDestinationsPurgeLock.readLock().lock();
try {
// This seems to cause the destination to be added but without // This seems to cause the destination to be added but without
// advisories firing... // advisories firing...
context.getBroker().addDestination(context, destination, true); context.getBroker().addDestination(context, destination, true);
@ -408,6 +411,8 @@ public class RegionBroker extends EmptyBroker {
tempTopicRegion.addProducer(context, info); tempTopicRegion.addProducer(context, info);
break; break;
} }
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
} }
} }
} }
@ -415,8 +420,9 @@ public class RegionBroker extends EmptyBroker {
@Override @Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
synchronized (purgeInactiveDestinationsTask) { if (destination != null) {
if (destination != null) { inactiveDestinationsPurgeLock.readLock().lock();
try {
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeProducer(context, info); queueRegion.removeProducer(context, info);
@ -431,6 +437,8 @@ public class RegionBroker extends EmptyBroker {
tempTopicRegion.removeProducer(context, info); tempTopicRegion.removeProducer(context, info);
break; break;
} }
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
} }
} }
} }
@ -441,7 +449,8 @@ public class RegionBroker extends EmptyBroker {
if (destinationInterceptor != null) { if (destinationInterceptor != null) {
destinationInterceptor.create(this, context, destination); destinationInterceptor.create(this, context, destination);
} }
synchronized (purgeInactiveDestinationsTask) { inactiveDestinationsPurgeLock.readLock().lock();
try {
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.addConsumer(context, info); return queueRegion.addConsumer(context, info);
@ -458,13 +467,16 @@ public class RegionBroker extends EmptyBroker {
default: default:
throw createUnknownDestinationTypeException(destination); throw createUnknownDestinationTypeException(destination);
} }
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
} }
} }
@Override @Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
synchronized (purgeInactiveDestinationsTask) { inactiveDestinationsPurgeLock.readLock().lock();
try {
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
@ -482,13 +494,18 @@ public class RegionBroker extends EmptyBroker {
default: default:
throw createUnknownDestinationTypeException(destination); throw createUnknownDestinationTypeException(destination);
} }
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
} }
} }
@Override @Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
synchronized (purgeInactiveDestinationsTask) { inactiveDestinationsPurgeLock.readLock().lock();
try {
topicRegion.removeSubscription(context, info); topicRegion.removeSubscription(context, info);
} finally {
inactiveDestinationsPurgeLock.readLock().unlock();
} }
} }
@ -934,7 +951,8 @@ public class RegionBroker extends EmptyBroker {
} }
protected void purgeInactiveDestinations() { protected void purgeInactiveDestinations() {
synchronized (purgeInactiveDestinationsTask) { inactiveDestinationsPurgeLock.writeLock().lock();
try {
List<BaseDestination> list = new ArrayList<BaseDestination>(); List<BaseDestination> list = new ArrayList<BaseDestination>();
Map<ActiveMQDestination, Destination> map = getDestinationMap(); Map<ActiveMQDestination, Destination> map = getDestinationMap();
if (isAllowTempAutoCreationOnSend()) { if (isAllowTempAutoCreationOnSend()) {
@ -968,6 +986,8 @@ public class RegionBroker extends EmptyBroker {
} }
} }
} }
} finally {
inactiveDestinationsPurgeLock.writeLock().unlock();
} }
} }