make the locking more coarse grained

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@577746 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-09-20 13:49:32 +00:00
parent 417fcd7f19
commit 4d01443fa7
1 changed files with 103 additions and 110 deletions

View File

@ -58,7 +58,6 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@ -79,7 +78,6 @@ public class Queue implements Destination, Task {
private final Log log;
private final ActiveMQDestination destination;
private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
private final Valve dispatchValve = new Valve(true);
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
private final DestinationStatistics destinationStatistics = new DestinationStatistics();
@ -97,7 +95,6 @@ public class Queue implements Destination, Task {
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
private final Object exclusiveLockMutex = new Object();
private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@ -203,13 +200,15 @@ public class Queue implements Destination, Task {
return true;
}
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
public synchronized void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
//needs to be synchronized - so no contention with dispatching
synchronized (consumers) {
consumers.add(sub);
if (sub.getConsumerInfo().isExclusive()) {
@ -224,22 +223,26 @@ public class Queue implements Destination, Task {
}
}
}
// page in messages
doPageIn();
//we hold the lock on the dispatchValue - so lets build the paged in
//list directly;
buildList(false);
// synchronize with dispatch method so that no new messages are sent
// while
// setting up a subscription. avoid out of order messages,
// duplicates
// etc.
dispatchValve.turnOff();
try {
// etc.
msgContext.setDestination(destination);
synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the
// subscription.
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference)i.next();
if (node.isDropped()) {
if (node.isDropped() || (!sub.getConsumerInfo().isBrowser() && node.getLockOwner()!=null)) {
continue;
}
try {
@ -252,101 +255,94 @@ public class Queue implements Destination, Task {
}
}
}
} finally {
dispatchValve.turnOn();
}
} finally {
msgContext.clear();
}
}
public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception {
destinationStatistics.getConsumers().decrement();
maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
// synchronize with dispatch method so that no new messages are sent
// while
// removing up a subscription.
dispatchValve.turnOff();
try {
synchronized (consumers) {
consumers.remove(sub);
if (sub.getConsumerInfo().isExclusive()) {
LockOwner owner = (LockOwner)sub;
// Did we loose the exclusive owner??
if (exclusiveOwner == owner) {
// Find the exclusive consumer with the higest Lock
// Priority.
exclusiveOwner = null;
for (Iterator<Subscription> iter = consumers.iterator(); iter.hasNext();) {
Subscription s = iter.next();
LockOwner so = (LockOwner)s;
if (s.getConsumerInfo().isExclusive() && (exclusiveOwner == null || so.getLockPriority() > exclusiveOwner.getLockPriority())) {
exclusiveOwner = so;
}
}
}
}
if (consumers.isEmpty()) {
messages.gc();
}
}
sub.remove(context, this);
boolean wasExclusiveOwner = false;
if (exclusiveOwner == sub) {
exclusiveOwner = null;
wasExclusiveOwner = true;
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(consumerId);
if (!sub.getConsumerInfo().isBrowser()) {
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
msgContext.setDestination(destination);
// lets copy the messages to dispatch to avoid deadlock
List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
synchronized (pagedInMessages) {
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference)i.next();
if (node.isDropped()) {
continue;
}
String groupID = node.getGroupID();
// Re-deliver all messages that the sub locked
if (node.getLockOwner() == sub || wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) {
messagesToDispatch.add(node);
}
}
}
// now lets dispatch from the copy of the collection to
// avoid deadlocks
for (Iterator<QueueMessageReference> iter = messagesToDispatch.iterator(); iter.hasNext();) {
QueueMessageReference node = iter.next();
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(node, msgContext, consumers);
}
} finally {
msgContext.clear();
}
}
} finally {
dispatchValve.turnOn();
}
}
public synchronized void removeSubscription(ConnectionContext context,
Subscription sub) throws Exception{
destinationStatistics.getConsumers().decrement();
maximumPagedInMessages-=sub.getConsumerInfo().getPrefetchSize();
// synchronize with dispatch method so that no new messages are sent
// while
// removing up a subscription.
synchronized(consumers){
consumers.remove(sub);
if(sub.getConsumerInfo().isExclusive()){
LockOwner owner=(LockOwner)sub;
// Did we loose the exclusive owner??
if(exclusiveOwner==owner){
// Find the exclusive consumer with the higest Lock
// Priority.
exclusiveOwner=null;
for(Iterator<Subscription> iter=consumers.iterator();iter
.hasNext();){
Subscription s=iter.next();
LockOwner so=(LockOwner)s;
if(s.getConsumerInfo().isExclusive()
&&(exclusiveOwner==null||so.getLockPriority()>exclusiveOwner
.getLockPriority())){
exclusiveOwner=so;
}
}
}
}
if(consumers.isEmpty()){
messages.gc();
}
}
sub.remove(context,this);
boolean wasExclusiveOwner=false;
if(exclusiveOwner==sub){
exclusiveOwner=null;
wasExclusiveOwner=true;
}
ConsumerId consumerId=sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups=getMessageGroupOwners().removeConsumer(
consumerId);
if(!sub.getConsumerInfo().isBrowser()){
MessageEvaluationContext msgContext=context
.getMessageEvaluationContext();
try{
msgContext.setDestination(destination);
// lets copy the messages to dispatch to avoid deadlock
List<QueueMessageReference> messagesToDispatch=new ArrayList<QueueMessageReference>();
synchronized(pagedInMessages){
for(Iterator<MessageReference> i=pagedInMessages.iterator();i
.hasNext();){
QueueMessageReference node=(QueueMessageReference)i
.next();
if(node.isDropped()){
continue;
}
String groupID=node.getGroupID();
// Re-deliver all messages that the sub locked
if(node.getLockOwner()==sub
||wasExclusiveOwner
||(groupID!=null&&ownedGroups.contains(groupID))){
messagesToDispatch.add(node);
}
}
}
// now lets dispatch from the copy of the collection to
// avoid deadlocks
for(Iterator<QueueMessageReference> iter=messagesToDispatch
.iterator();iter.hasNext();){
QueueMessageReference node=iter.next();
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(node,msgContext,consumers);
}
}finally{
msgContext.clear();
}
}
}
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
@ -998,18 +994,20 @@ public class Queue implements Destination, Task {
pageInMessages(false);
}
private List<MessageReference> doPageIn() throws Exception {
return doPageIn(true);
private List<MessageReference> doPageIn(boolean force) throws Exception {
List<MessageReference> result = null;
result = buildList(force);
return result;
}
private List<MessageReference> doPageIn(boolean force) throws Exception {
private synchronized List<MessageReference> buildList(boolean force) throws Exception {
final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
List<MessageReference> result = null;
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
try {
dispatchValve.increment();
int count = 0;
result = new ArrayList<MessageReference>(toPageIn);
synchronized (messages) {
@ -1037,16 +1035,14 @@ public class Queue implements Destination, Task {
}
} finally {
queueMsgConext.clear();
dispatchValve.decrement();
}
}
return result;
}
private void doDispatch(List<MessageReference> list) throws Exception {
private synchronized void doDispatch(List<MessageReference> list) throws Exception {
if (list != null && !list.isEmpty()) {
try {
dispatchValve.increment();
for (int i = 0; i < list.size(); i++) {
MessageReference node = list.get(i);
queueMsgConext.setDestination(destination);
@ -1055,7 +1051,6 @@ public class Queue implements Destination, Task {
}
} finally {
queueMsgConext.clear();
dispatchValve.decrement();
}
}
}
@ -1065,9 +1060,7 @@ public class Queue implements Destination, Task {
}
private void pageInMessages(boolean force) throws Exception {
synchronized (doDispatchMutex) {
doDispatch(doPageIn(force));
}
}
}