Switch to LinkedHashMap with R/W locking for concurrent add / remove
protection
This commit is contained in:
Timothy Bish 2014-08-29 15:52:23 -04:00
parent 60bdfc061c
commit b2afb8c969
1 changed files with 34 additions and 11 deletions

View File

@ -16,12 +16,14 @@
*/
package org.apache.activemq.advisory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
@ -70,7 +72,8 @@ public class AdvisoryBroker extends BrokerFilter {
protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>();
private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
@ -103,7 +106,12 @@ public class AdvisoryBroker extends BrokerFilter {
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.offer(info);
consumersLock.writeLock().lock();
try {
consumers.put(info.getConsumerId(), info);
} finally {
consumersLock.writeLock().unlock();
}
fireConsumerAdvisory(context, info.getDestination(), topic, info);
} else {
// We need to replay all the previously collected state objects
@ -148,10 +156,15 @@ public class AdvisoryBroker extends BrokerFilter {
// Replay the consumers.
if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext(); ) {
ConsumerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
consumersLock.readLock().lock();
try {
for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
ConsumerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
}
} finally {
consumersLock.readLock().unlock();
}
}
@ -266,7 +279,12 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQDestination dest = info.getDestination();
if (!AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
consumers.remove(info);
consumersLock.writeLock().lock();
try {
consumers.remove(info.getConsumerId());
} finally {
consumersLock.writeLock().unlock();
}
if (!dest.isTemporary() || destinations.containsKey(dest)) {
fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
}
@ -623,8 +641,13 @@ public class AdvisoryBroker extends BrokerFilter {
return connections;
}
public Queue<ConsumerInfo> getAdvisoryConsumers() {
return consumers;
public Collection<ConsumerInfo> getAdvisoryConsumers() {
consumersLock.readLock().lock();
try {
return new ArrayList<ConsumerInfo>(consumers.values());
} finally {
consumersLock.readLock().unlock();
}
}
public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {