lock dispatching (again) whilst adding a consumer

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@618689 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-02-05 16:20:11 +00:00
parent ecc87ea672
commit f81d0d59d3
2 changed files with 86 additions and 83 deletions

View File

@ -128,10 +128,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public void add(MessageReference node) throws Exception { public void add(MessageReference node) throws Exception {
synchronized (pendingLock) { synchronized (pendingLock) {
enqueueCounter++; enqueueCounter++;
pending.addMessageLast(node); pending.addMessageLast(node);
dispatchPending();
} }
dispatchPending();
} }
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {

View File

@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -83,6 +84,7 @@ public class Queue extends BaseDestination implements Task {
private final Object sendLock = new Object(); private final Object sendLock = new Object();
private final TaskRunner taskRunner; private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final ReentrantLock dispatchLock = new ReentrantLock();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() { public void run() {
wakeup(); wakeup();
@ -98,7 +100,6 @@ public class Queue extends BaseDestination implements Task {
} else { } else {
this.messages = new StoreQueueCursor(broker,this); this.messages = new StoreQueueCursor(broker,this);
} }
this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName()); this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName());
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
} }
@ -172,61 +173,67 @@ public class Queue extends BaseDestination implements Task {
return true; return true;
} }
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
sub.add(context, this); dispatchLock.lock();
destinationStatistics.getConsumers().increment(); try {
MessageEvaluationContext msgContext = new MessageEvaluationContext(); sub.add(context, this);
destinationStatistics.getConsumers().increment();
MessageEvaluationContext msgContext = new MessageEvaluationContext();
// needs to be synchronized - so no contention with dispatching // needs to be synchronized - so no contention with dispatching
synchronized (consumers) { synchronized (consumers) {
consumers.add(sub); consumers.add(sub);
if (sub.getConsumerInfo().isExclusive()) { if (sub.getConsumerInfo().isExclusive()) {
LockOwner owner = (LockOwner) sub; LockOwner owner = (LockOwner) sub;
if (exclusiveOwner == null) { if (exclusiveOwner == null) {
exclusiveOwner = owner;
} else {
// switch the owner if the priority is higher.
if (owner.getLockPriority() > exclusiveOwner
.getLockPriority()) {
exclusiveOwner = owner; exclusiveOwner = owner;
} else {
// switch the owner if the priority is higher.
if (owner.getLockPriority() > exclusiveOwner
.getLockPriority()) {
exclusiveOwner = owner;
}
} }
} }
} }
}
// we hold the lock on the dispatchValue - so lets build the paged in // we hold the lock on the dispatchValue - so lets build the paged
// list directly; // in
buildList(false); // list directly;
doPageIn(false);
// synchronize with dispatch method so that no new messages are sent // synchronize with dispatch method so that no new messages are sent
// while // while
// setting up a subscription. avoid out of order messages, // setting up a subscription. avoid out of order messages,
// duplicates // duplicates
// etc. // etc.
msgContext.setDestination(destination); msgContext.setDestination(destination);
synchronized (pagedInMessages) { synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the // Add all the matching messages in the queue to the
// subscription. // subscription.
for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i for (Iterator<MessageReference> i = pagedInMessages.values()
.hasNext();) { .iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i.next(); QueueMessageReference node = (QueueMessageReference) i
if (node.isDropped() .next();
|| (!sub.getConsumerInfo().isBrowser() && node if (node.isDropped()
.getLockOwner() != null)) { || (!sub.getConsumerInfo().isBrowser() && node
continue; .getLockOwner() != null)) {
} continue;
try { }
msgContext.setMessageReference(node); try {
if (sub.matches(node, msgContext)) { msgContext.setMessageReference(node);
sub.add(node); if (sub.matches(node, msgContext)) {
sub.add(node);
}
} catch (IOException e) {
log.warn("Could not load message: " + e, e);
} }
} catch (IOException e) {
log.warn("Could not load message: " + e, e);
} }
} }
} finally {
dispatchLock.unlock();
} }
} }
public void removeSubscription(ConnectionContext context, Subscription sub) public void removeSubscription(ConnectionContext context, Subscription sub)
@ -956,54 +963,51 @@ public class Queue extends BaseDestination implements Task {
wakeup(); wakeup();
} }
final synchronized void wakeup() { final void wakeup() {
try { try {
taskRunner.wakeup(); taskRunner.wakeup();
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.warn("Task Runner failed to wakeup ", e); log.warn("Task Runner failed to wakeup ", e);
} }
} }
private List<MessageReference> doPageIn(boolean force) throws Exception {
private List<MessageReference> doPageIn(boolean force) throws Exception {
List<MessageReference> result = null;
result = buildList(force);
return result;
}
private List<MessageReference> buildList(boolean force) throws Exception {
final int toPageIn = getMaxPageSize() - pagedInMessages.size();
List<MessageReference> result = null; List<MessageReference> result = null;
if ((force || !consumers.isEmpty()) && toPageIn > 0) { dispatchLock.lock();
messages.setMaxBatchSize(toPageIn); try {
int count = 0; final int toPageIn = getMaxPageSize() - pagedInMessages.size();
result = new ArrayList<MessageReference>(toPageIn); if ((force || !consumers.isEmpty()) && toPageIn > 0) {
synchronized (messages) { messages.setMaxBatchSize(toPageIn);
try { int count = 0;
messages.reset(); result = new ArrayList<MessageReference>(toPageIn);
while (messages.hasNext() && count < toPageIn) { synchronized (messages) {
MessageReference node = messages.next(); try {
messages.remove(); messages.reset();
if (!broker.isExpired(node)) { while (messages.hasNext() && count < toPageIn) {
node = createMessageReference(node.getMessage()); MessageReference node = messages.next();
result.add(node); messages.remove();
count++; if (!broker.isExpired(node)) {
} else { node = createMessageReference(node.getMessage());
broker.messageExpired(createConnectionContext(), result.add(node);
node); count++;
destinationStatistics.getMessages().decrement(); } else {
broker.messageExpired(createConnectionContext(),
node);
destinationStatistics.getMessages().decrement();
}
} }
} finally {
messages.release();
}
}
synchronized (pagedInMessages) {
for(MessageReference ref:result) {
pagedInMessages.put(ref.getMessageId(), ref);
} }
} finally {
messages.release();
}
}
synchronized (pagedInMessages) {
for(MessageReference ref:result) {
pagedInMessages.put(ref.getMessageId(), ref);
} }
} }
}finally {
dispatchLock.unlock();
} }
return result; return result;
} }