Split the internal messages in a Queue destination in two - messages accessed by the

PendingMessageCursor and work in progress or pagedInMessages - which are instances
of QueueMessageReference - hence keeping the semantics for Message Groups etc. and keeping
the majority of the 'fiddly bits' intact.



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@474475 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-11-13 20:06:49 +00:00
parent 55f4aad761
commit 6895d009d8
13 changed files with 499 additions and 347 deletions

View File

@ -58,7 +58,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
synchronized public void gc() { synchronized public void gc() {
} }
synchronized public void add(ConnectionContext context, Destination destination) throws Exception { public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination); super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination); destinations.put(destination.getActiveMQDestination(), destination);
if( active || keepDurableSubsActive ) { if( active || keepDurableSubsActive ) {
@ -68,7 +68,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
dispatchMatched(); dispatchMatched();
} }
synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception { public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
if( !active ) { if( !active ) {
this.active = true; this.active = true;
this.context = context; this.context = context;
@ -79,38 +79,43 @@ public class DurableTopicSubscription extends PrefetchSubscription {
topic.activate(context, this); topic.activate(context, this);
} }
} }
pending.start(); synchronized(pending) {
pending.start();
}
dispatchMatched(); dispatchMatched();
} }
} }
synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
active=false; active=false;
pending.stop(); synchronized(pending){
pending.stop();
}
if( !keepDurableSubsActive ) { if( !keepDurableSubsActive ) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next(); Topic topic = (Topic) iter.next();
topic.deactivate(context, this); topic.deactivate(context, this);
} }
} }
for (Iterator iter = dispatched.iterator(); iter.hasNext();) { synchronized(dispatched){
for(Iterator iter=dispatched.iterator();iter.hasNext();){
// Mark the dispatched messages as redelivered for next time. // Mark the dispatched messages as redelivered for next time.
MessageReference node = (MessageReference) iter.next(); MessageReference node=(MessageReference)iter.next();
Integer count = (Integer) redeliveredMessages.get(node.getMessageId()); Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
if( count !=null ) { if(count!=null){
redeliveredMessages.put(node.getMessageId(), new Integer(count.intValue()+1)); redeliveredMessages.put(node.getMessageId(),new Integer(count.intValue()+1));
} else { }else{
redeliveredMessages.put(node.getMessageId(), new Integer(1)); redeliveredMessages.put(node.getMessageId(),new Integer(1));
}
if(keepDurableSubsActive){
synchronized(pending){
pending.addMessageFirst(node);
}
}else{
node.decrementReferenceCount();
}
iter.remove();
} }
if( keepDurableSubsActive ) {
synchronized(pending) {
pending.addMessageFirst(node);
}
} else {
node.decrementReferenceCount();
}
iter.remove();
} }
if( !keepDurableSubsActive ) { if( !keepDurableSubsActive ) {
@ -135,7 +140,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
return md; return md;
} }
synchronized public void add(MessageReference node) throws Exception { public void add(MessageReference node) throws Exception {
if( !active && !keepDurableSubsActive ) { if( !active && !keepDurableSubsActive ) {
return; return;
} }
@ -190,7 +195,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
/** /**
* Release any references that we are holding. * Release any references that we are holding.
*/ */
synchronized public void destroy() { public void destroy() {
synchronized(pending) { synchronized(pending) {
pending.reset(); pending.reset();
while(pending.hasNext()) { while(pending.hasNext()) {

View File

@ -77,7 +77,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
/** /**
* Allows a message to be pulled on demand by a client * Allows a message to be pulled on demand by a client
*/ */
synchronized public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
// The slave should not deliver pull messages. TODO: when the slave becomes a master, // The slave should not deliver pull messages. TODO: when the slave becomes a master,
// He should send a NULL message to all the consumers to 'wake them up' in case // He should send a NULL message to all the consumers to 'wake them up' in case
// they were waiting for a message. // they were waiting for a message.
@ -111,7 +111,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* Occurs when a pull times out. If nothing has been dispatched * Occurs when a pull times out. If nothing has been dispatched
* since the timeout was setup, then send the NULL message. * since the timeout was setup, then send the NULL message.
*/ */
synchronized private void pullTimeout(long dispatchCounterBeforePull) { private void pullTimeout(long dispatchCounterBeforePull) {
if( dispatchCounterBeforePull == dispatchCounter ) { if( dispatchCounterBeforePull == dispatchCounter ) {
try { try {
add(QueueMessageReference.NULL_MESSAGE); add(QueueMessageReference.NULL_MESSAGE);
@ -122,15 +122,18 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
synchronized public void add(MessageReference node) throws Exception{ public void add(MessageReference node) throws Exception{
enqueueCounter++; boolean pendingEmpty = false;
synchronized(pending){
if(!isFull() && pending.isEmpty() ){ pendingEmpty=pending.isEmpty();
enqueueCounter++;
}
if(!isFull()&&pendingEmpty){
dispatch(node); dispatch(node);
}else{ }else{
optimizePrefetch(); optimizePrefetch();
synchronized(pending){ synchronized(pending){
if( pending.isEmpty() ) { if(log.isDebugEnabled() && pending.isEmpty()){
log.debug("Prefetch limit."); log.debug("Prefetch limit.");
} }
pending.addMessageLast(node); pending.addMessageLast(node);
@ -138,7 +141,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
synchronized public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
synchronized(pending){ synchronized(pending){
pending.reset(); pending.reset();
while(pending.hasNext()){ while(pending.hasNext()){
@ -154,107 +157,112 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{ public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case. // Handle the standard acknowledgment case.
if(ack.isStandardAck()){ synchronized(dispatched){
// Acknowledge all dispatched messages up till the message id of the acknowledgment. if(ack.isStandardAck()){
int index=0; // Acknowledge all dispatched messages up till the message id of the acknowledgment.
boolean inAckRange=false; int index=0;
for(Iterator iter=dispatched.iterator();iter.hasNext();){ boolean inAckRange=false;
final MessageReference node=(MessageReference) iter.next(); for(Iterator iter=dispatched.iterator();iter.hasNext();){
MessageId messageId=node.getMessageId(); final MessageReference node=(MessageReference)iter.next();
if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ MessageId messageId=node.getMessageId();
inAckRange=true; if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
} inAckRange=true;
if(inAckRange){ }
// Don't remove the nodes until we are committed. if(inAckRange){
if(!context.isInTransaction()){ // Don't remove the nodes until we are committed.
dequeueCounter++; if(!context.isInTransaction()){
node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); dequeueCounter++;
iter.remove(); node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
}else{ iter.remove();
// setup a Synchronization to remove nodes from the dispatched list. }else{
context.getTransaction().addSynchronization(new Synchronization(){ // setup a Synchronization to remove nodes from the dispatched list.
public void afterCommit() throws Exception{ context.getTransaction().addSynchronization(new Synchronization(){
synchronized(PrefetchSubscription.this){
dequeueCounter++;
dispatched.remove(node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
prefetchExtension--;
}
}
public void afterRollback() throws Exception { public void afterCommit() throws Exception{
super.afterRollback(); synchronized(PrefetchSubscription.this){
} dequeueCounter++;
}); dispatched.remove(node);
} node.getRegionDestination().getDestinationStatistics().getDequeues()
index++; .increment();
acknowledge(context,ack,node); prefetchExtension--;
if(ack.getLastMessageId().equals(messageId)){ }
if(context.isInTransaction()) { }
// extend prefetch window only if not a pulling consumer
if (getPrefetchSize() != 0) { public void afterRollback() throws Exception{
prefetchExtension=Math.max(prefetchExtension,index+1); super.afterRollback();
} }
});
} }
else { index++;
acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){
if(context.isInTransaction()){
// extend prefetch window only if not a pulling consumer
if(getPrefetchSize()!=0){
prefetchExtension=Math.max(prefetchExtension,index+1);
}
}else{
prefetchExtension=Math.max(0,prefetchExtension-(index+1));
}
dispatchMatched();
return;
}
}
}
//this only happens after a reconnect - get an ack which is not valid
log.info("Could not correlate acknowledgment with dispatched message: "+ack);
}else if(ack.isDeliveredAck()){
// Message was delivered but not acknowledged: update pre-fetch counters.
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0;
for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
final MessageReference node=(MessageReference)iter.next();
if(ack.getLastMessageId().equals(node.getMessageId())){
prefetchExtension=Math.max(prefetchExtension,index+1);
dispatchMatched();
return;
}
}
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
}else if(ack.isPoisonAck()){
// TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a DLQ
if(ack.isInTransaction())
throw new JMSException("Poison ack cannot be transacted: "+ack);
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0;
boolean inAckRange=false;
for(Iterator iter=dispatched.iterator();iter.hasNext();){
final MessageReference node=(MessageReference)iter.next();
MessageId messageId=node.getMessageId();
if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
inAckRange=true;
}
if(inAckRange){
sendToDLQ(context,node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
iter.remove();
dequeueCounter++;
index++;
acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){
prefetchExtension=Math.max(0,prefetchExtension-(index+1)); prefetchExtension=Math.max(0,prefetchExtension-(index+1));
dispatchMatched();
return;
} }
dispatchMatched();
return;
} }
} }
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
} }
log.info("Could not correlate acknowledgment with dispatched message: "+ack); if(isSlaveBroker()){
}else if(ack.isDeliveredAck()){ throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack
// Message was delivered but not acknowledged: update pre-fetch counters. +") was not in the dispatch list: "+dispatched);
// Acknowledge all dispatched messages up till the message id of the acknowledgment. }else{
int index=0; log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack);
for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
final MessageReference node=(MessageReference) iter.next();
if(ack.getLastMessageId().equals(node.getMessageId())){
prefetchExtension=Math.max(prefetchExtension,index+1);
dispatchMatched();
return;
}
} }
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
}else if(ack.isPoisonAck()){
// TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a DLQ
if(ack.isInTransaction())
throw new JMSException("Poison ack cannot be transacted: "+ack);
// Acknowledge all dispatched messages up till the message id of the acknowledgment.
int index=0;
boolean inAckRange=false;
for(Iterator iter=dispatched.iterator();iter.hasNext();){
final MessageReference node=(MessageReference) iter.next();
MessageId messageId=node.getMessageId();
if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
inAckRange=true;
}
if(inAckRange){
sendToDLQ(context, node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
iter.remove();
dequeueCounter++;
index++;
acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){
prefetchExtension=Math.max(0,prefetchExtension-(index+1));
dispatchMatched();
return;
}
}
}
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
}
if( isSlaveBroker() ) {
throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack+") was not in the dispatch list: "+dispatched);
} else {
log.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "+ack);
} }
} }
@ -306,8 +314,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
synchronized public int getDispatchedQueueSize(){ public int getDispatchedQueueSize(){
return dispatched.size(); synchronized(dispatched){
return dispatched.size();
}
} }
synchronized public long getDequeueCounter(){ synchronized public long getDequeueCounter(){
@ -359,17 +369,19 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
protected void dispatchMatched() throws IOException{ protected void dispatchMatched() throws IOException{
if(!dispatching){ synchronized(pending){
dispatching=true; if(!dispatching){
try{ dispatching=true;
pending.reset(); try{
while(pending.hasNext()&&!isFull()){ pending.reset();
MessageReference node=pending.next(); while(pending.hasNext()&&!isFull()){
pending.remove(); MessageReference node=pending.next();
dispatch(node); pending.remove();
dispatch(node);
}
}finally{
dispatching=false;
} }
}finally{
dispatching=false;
} }
} }
} }
@ -379,38 +391,39 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(message==null){ if(message==null){
return false; return false;
} }
// Make sure we can dispatch a message. synchronized(dispatched){
if(canDispatch(node)&&!isSlaveBroker()){ // Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){
MessageDispatch md=createMessageDispatch(node,message); MessageDispatch md=createMessageDispatch(node,message);
// NULL messages don't count... they don't get Acked. // NULL messages don't count... they don't get Acked.
if( node != QueueMessageReference.NULL_MESSAGE ) { if(node!=QueueMessageReference.NULL_MESSAGE){
dispatchCounter++; dispatchCounter++;
dispatched.addLast(node); dispatched.addLast(node);
} else { }else{
prefetchExtension=Math.max(0,prefetchExtension-1); prefetchExtension=Math.max(0,prefetchExtension-1);
} }
if(info.isDispatchAsync()){
if(info.isDispatchAsync()){ md.setConsumer(new Runnable(){
md.setConsumer(new Runnable(){
public void run(){ public void run(){
// Since the message gets queued up in async dispatch, we don't want to // Since the message gets queued up in async dispatch, we don't want to
// decrease the reference count until it gets put on the wire. // decrease the reference count until it gets put on the wire.
onDispatch(node,message); onDispatch(node,message);
} }
}); });
context.getConnection().dispatchAsync(md); context.getConnection().dispatchAsync(md);
}else{
context.getConnection().dispatchSync(md);
onDispatch(node,message);
}
return true;
}else{ }else{
context.getConnection().dispatchSync(md); return false;
onDispatch(node,message);
} }
return true;
} else {
return false;
} }
} }
synchronized protected void onDispatch(final MessageReference node,final Message message){ protected void onDispatch(final MessageReference node,final Message message){
if(node.getRegionDestination()!=null){ if(node.getRegionDestination()!=null){
if( node != QueueMessageReference.NULL_MESSAGE ) { if( node != QueueMessageReference.NULL_MESSAGE ) {
node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); node.getRegionDestination().getDestinationStatistics().getDispatched().increment();

View File

@ -42,6 +42,8 @@ import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve; import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
@ -64,27 +66,32 @@ import java.util.List;
* *
* @version $Revision: 1.28 $ * @version $Revision: 1.28 $
*/ */
public class Queue implements Destination { public class Queue implements Destination, Task {
private final Log log; private final Log log;
protected final ActiveMQDestination destination; private final ActiveMQDestination destination;
protected final List consumers = new CopyOnWriteArrayList(); private final List consumers = new CopyOnWriteArrayList();
protected final Valve dispatchValve = new Valve(true); private final Valve dispatchValve = new Valve(true);
protected final UsageManager usageManager; private final UsageManager usageManager;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); private final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected PendingMessageCursor messages = new VMPendingMessageCursor(); private PendingMessageCursor messages = new VMPendingMessageCursor();
private final LinkedList pagedInMessages = new LinkedList();
private LockOwner exclusiveOwner; private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
protected long garbageSize = 0; private int garbageSize = 0;
protected long garbageSizeBeforeCollection = 1000; private int garbageSizeBeforeCollection = 1000;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
protected final MessageStore store; private final MessageStore store;
protected int highestSubscriptionPriority = Integer.MIN_VALUE; private int highestSubscriptionPriority = Integer.MIN_VALUE;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
private final Object exclusiveLockMutex = new Object();
private TaskRunner taskRunner;
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception { TaskRunnerFactory taskFactory) throws Exception {
@ -92,6 +99,7 @@ public class Queue implements Destination {
this.usageManager = new UsageManager(memoryManager); this.usageManager = new UsageManager(memoryManager);
this.usageManager.setLimit(Long.MAX_VALUE); this.usageManager.setLimit(Long.MAX_VALUE);
this.store = store; this.store = store;
this.taskRunner = taskFactory.createTaskRunner(this, "Queue "+destination.getPhysicalName());
// Let the store know what usage manager we are using so that he can // Let the store know what usage manager we are using so that he can
// flush messages to disk // flush messages to disk
@ -106,43 +114,57 @@ public class Queue implements Destination {
} }
public void initialize() throws Exception { public void initialize() throws Exception{
if (store != null) { if(store!=null){
// Restore the persistent messages. // Restore the persistent messages.
store.recover(new MessageRecoveryListener() { messages.start();
public void recoverMessage(Message message) { if(messages.isRecoveryRequired()){
message.setRegionDestination(Queue.this); store.recover(new MessageRecoveryListener(){
MessageReference reference = createMessageReference(message);
synchronized (messages) { public void recoverMessage(Message message){
try{ message.setRegionDestination(Queue.this);
messages.addMessageLast(reference); synchronized(messages){
}catch(Exception e){ try{
log.fatal("Failed to add message to cursor",e); messages.addMessageLast(message);
}catch(Exception e){
log.fatal("Failed to add message to cursor",e);
}
} }
destinationStatistics.getMessages().increment();
} }
reference.decrementReferenceCount();
destinationStatistics.getMessages().increment();
}
public void recoverMessageReference(String messageReference) throws Exception { public void recoverMessageReference(String messageReference) throws Exception{
throw new RuntimeException("Should not be called."); throw new RuntimeException("Should not be called.");
} }
public void finished() { public void finished(){
} }
}); });
}
} }
} }
public synchronized boolean lock(MessageReference node, LockOwner lockOwner) { /**
if (exclusiveOwner == lockOwner) * Lock a node
return true; * @param node
if (exclusiveOwner != null) * @param lockOwner
return false; * @return true if can be locked
if (lockOwner.getLockPriority() < highestSubscriptionPriority) * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.broker.region.LockOwner)
return false; */
if (lockOwner.isLockExclusive()) { public boolean lock(MessageReference node,LockOwner lockOwner){
exclusiveOwner = lockOwner; synchronized(exclusiveLockMutex){
if(exclusiveOwner==lockOwner){
return true;
}
if(exclusiveOwner!=null){
return false;
}
if(lockOwner.getLockPriority()<highestSubscriptionPriority){
return false;
}
if(lockOwner.isLockExclusive()){
exclusiveOwner=lockOwner;
}
} }
return true; return true;
} }
@ -150,16 +172,13 @@ public class Queue implements Destination {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
sub.add(context, this); sub.add(context, this);
destinationStatistics.getConsumers().increment(); destinationStatistics.getConsumers().increment();
maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
// synchronize with dispatch method so that no new messages are sent
// while
// setting up a subscription. avoid out of order messages, duplicates MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
// etc. try{
dispatchValve.turnOff(); synchronized(consumers){
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
synchronized (consumers) {
if (sub.getConsumerInfo().isExclusive()) { if (sub.getConsumerInfo().isExclusive()) {
// Add to front of list to ensure that an exclusive consumer gets all messages // Add to front of list to ensure that an exclusive consumer gets all messages
// before non-exclusive consumers // before non-exclusive consumers
@ -167,40 +186,37 @@ public class Queue implements Destination {
} else { } else {
consumers.add(sub); consumers.add(sub);
} }
if (sub.getConsumerInfo().getPriority() > highestSubscriptionPriority) {
highestSubscriptionPriority = sub.getConsumerInfo().getPriority();
}
} }
// page in messages
//highestSubscriptionPriority = calcHighestSubscriptionPriority(); doPageIn();
// synchronize with dispatch method so that no new messages are sent
// while
// setting up a subscription. avoid out of order messages, duplicates
// etc.
dispatchValve.turnOff();
if (sub.getConsumerInfo().getPriority() > highestSubscriptionPriority) {
highestSubscriptionPriority = sub.getConsumerInfo().getPriority();
}
msgContext.setDestination(destination); msgContext.setDestination(destination);
synchronized(pagedInMessages){
synchronized (messages) {
// Add all the matching messages in the queue to the // Add all the matching messages in the queue to the
// subscription. // subscription.
messages.reset(); for(Iterator i=pagedInMessages.iterator();i.hasNext();){
while(messages.hasNext()) { QueueMessageReference node=(QueueMessageReference)i.next();
if(node.isDropped()){
QueueMessageReference node = (QueueMessageReference) messages.next();
if (node.isDropped()) {
continue; continue;
} }
try{
try {
msgContext.setMessageReference(node); msgContext.setMessageReference(node);
if (sub.matches(node, msgContext)) { if(sub.matches(node,msgContext)){
sub.add(node); sub.add(node);
} }
} }catch(IOException e){
catch (IOException e) { log.warn("Could not load message: "+e,e);
log.warn("Could not load message: " + e, e);
} }
} }
} }
}finally{
}
finally {
msgContext.clear(); msgContext.clear();
dispatchValve.turnOn(); dispatchValve.turnOn();
} }
@ -209,6 +225,7 @@ public class Queue implements Destination {
public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception { public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception {
destinationStatistics.getConsumers().decrement(); destinationStatistics.getConsumers().decrement();
maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
// synchronize with dispatch method so that no new messages are sent // synchronize with dispatch method so that no new messages are sent
// while // while
@ -240,10 +257,9 @@ public class Queue implements Destination {
// lets copy the messages to dispatch to avoid deadlock // lets copy the messages to dispatch to avoid deadlock
List messagesToDispatch = new ArrayList(); List messagesToDispatch = new ArrayList();
synchronized (messages) { synchronized (pagedInMessages) {
messages.reset(); for(Iterator i = pagedInMessages.iterator();i.hasNext();) {
while(messages.hasNext()) { QueueMessageReference node = (QueueMessageReference) i.next();
QueueMessageReference node = (QueueMessageReference) messages.next();
if (node.isDropped()) { if (node.isDropped()) {
continue; continue;
} }
@ -264,7 +280,7 @@ public class Queue implements Destination {
node.incrementRedeliveryCounter(); node.incrementRedeliveryCounter();
node.unlock(); node.unlock();
msgContext.setMessageReference(node); msgContext.setMessageReference(node);
dispatchPolicy.dispatch(context, node, msgContext, consumers); dispatchPolicy.dispatch(node, msgContext, consumers);
} }
} }
finally { finally {
@ -278,40 +294,31 @@ public class Queue implements Destination {
} }
public void send(final ConnectionContext context, final Message message) throws Exception { public void send(final ConnectionContext context,final Message message) throws Exception{
if(context.isProducerFlowControl()){
if (context.isProducerFlowControl()) { if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
} }else{
else {
usageManager.waitForSpace(); usageManager.waitForSpace();
} }
} }
message.setRegionDestination(this); message.setRegionDestination(this);
if (store != null && message.isPersistent()) {
if (store != null && message.isPersistent())
store.addMessage(context, message); store.addMessage(context, message);
final MessageReference node = createMessageReference(message);
try {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
dispatch(context, node, message);
}
});
}
else {
dispatch(context, node, message);
}
} }
finally { if(context.isInTransaction()){
node.decrementReferenceCount(); context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
sendMessage(context,message);
}
});
}else{
sendMessage(context,message);
} }
} }
public void dispose(ConnectionContext context) throws IOException { public void dispose(ConnectionContext context) throws IOException {
if (store != null) { if (store != null) {
@ -324,30 +331,34 @@ public class Queue implements Destination {
dropEvent(false); dropEvent(false);
} }
public void dropEvent(boolean skipGc) { public void dropEvent(boolean skipGc){
// TODO: need to also decrement when messages expire. // TODO: need to also decrement when messages expire.
destinationStatistics.getMessages().decrement(); destinationStatistics.getMessages().decrement();
synchronized (messages) { synchronized(pagedInMessages){
garbageSize++; garbageSize++;
if (!skipGc && garbageSize > garbageSizeBeforeCollection) { }
gc(); if(!skipGc&&garbageSize>garbageSizeBeforeCollection){
} gc();
} }
} }
public void gc() { public void gc() {
synchronized (messages) { synchronized (pagedInMessages) {
messages.resetForGC(); for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
while(messages.hasNext()) {
// Remove dropped messages from the queue. // Remove dropped messages from the queue.
QueueMessageReference node = (QueueMessageReference) messages.next(); QueueMessageReference node = (QueueMessageReference) i.next();
if (node.isDropped()) { if (node.isDropped()) {
garbageSize--; garbageSize--;
messages.remove(); i.remove();
continue; continue;
} }
} }
} }
try{
taskRunner.wakeup();
}catch(InterruptedException e){
log.warn("Task Runner failed to wakeup ",e);
}
} }
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
@ -390,6 +401,9 @@ public class Queue implements Destination {
} }
public void stop() throws Exception { public void stop() throws Exception {
if( taskRunner!=null ) {
taskRunner.shutdown();
}
} }
// Properties // Properties
@ -455,38 +469,12 @@ public class Queue implements Destination {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
private MessageReference createMessageReference(Message message) { private MessageReference createMessageReference(Message message) {
return new IndirectMessageReference(this, store, message); MessageReference result = new IndirectMessageReference(this, store, message);
} result.decrementReferenceCount();
return result;
private void dispatch(ConnectionContext context, MessageReference node, Message message) throws Exception {
dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
synchronized (messages) {
messages.addMessageLast(node);
}
synchronized (consumers) {
if (consumers.isEmpty()) {
log.debug("No subscriptions registered, will not dispatch message at this time.");
return;
}
}
msgContext.setDestination(destination);
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(context, node, msgContext, consumers);
}
finally {
msgContext.clear();
dispatchValve.decrement();
}
} }
private int calcHighestSubscriptionPriority() { private int calcHighestSubscriptionPriority() {
int rc = Integer.MIN_VALUE; int rc = Integer.MIN_VALUE;
synchronized (consumers) { synchronized (consumers) {
@ -506,11 +494,28 @@ public class Queue implements Destination {
public Message[] browse() { public Message[] browse() {
ArrayList l = new ArrayList(); ArrayList l = new ArrayList();
synchronized(pagedInMessages) {
for (Iterator i = pagedInMessages.iterator();i.hasNext();) {
MessageReference r = (MessageReference)i.next();
r.incrementReferenceCount();
try {
Message m = r.getMessage();
if (m != null) {
l.add(m);
}
}catch(IOException e){
log.error("caught an exception brwsing " + this,e);
}
finally {
r.decrementReferenceCount();
}
}
}
synchronized (messages) { synchronized (messages) {
messages.reset(); messages.reset();
while(messages.hasNext()) { while(messages.hasNext()) {
try { try {
MessageReference r = (MessageReference) messages.next(); MessageReference r = messages.next();
r.incrementReferenceCount(); r.incrementReferenceCount();
try { try {
Message m = r.getMessage(); Message m = r.getMessage();
@ -523,6 +528,7 @@ public class Queue implements Destination {
} }
} }
catch (IOException e) { catch (IOException e) {
log.error("caught an exception brwsing " + this,e);
} }
} }
} }
@ -535,7 +541,7 @@ public class Queue implements Destination {
messages.reset(); messages.reset();
while(messages.hasNext()) { while(messages.hasNext()) {
try { try {
MessageReference r = (MessageReference) messages.next(); MessageReference r = messages.next();
if (messageId.equals(r.getMessageId().toString())) { if (messageId.equals(r.getMessageId().toString())) {
r.incrementReferenceCount(); r.incrementReferenceCount();
try { try {
@ -551,19 +557,22 @@ public class Queue implements Destination {
} }
} }
catch (IOException e) { catch (IOException e) {
log.error("got an exception retrieving message " + messageId);
} }
} }
} }
return null; return null;
} }
public void purge() { public void purge() throws Exception {
synchronized (messages) {
doDispatch(doPageIn());
synchronized (pagedInMessages) {
ConnectionContext c = createConnectionContext(); ConnectionContext c = createConnectionContext();
messages.reset(); for(Iterator i = pagedInMessages.iterator(); i.hasNext();){
while(messages.hasNext()) {
try { try {
QueueMessageReference r = (QueueMessageReference) messages.next(); QueueMessageReference r = (QueueMessageReference) i.next();
// We should only delete messages that can be locked. // We should only delete messages that can be locked.
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
@ -618,20 +627,18 @@ public class Queue implements Destination {
* @return the number of messages removed * @return the number of messages removed
*/ */
public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
doDispatch(doPageIn());
int counter = 0; int counter = 0;
synchronized (messages) { synchronized (pagedInMessages) {
ConnectionContext c = createConnectionContext(); ConnectionContext c = createConnectionContext();
messages.reset(); for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
while(messages.hasNext()) { IndirectMessageReference r = (IndirectMessageReference) i.next();
IndirectMessageReference r = (IndirectMessageReference) messages.next();
if (filter.evaluate(c, r)) { if (filter.evaluate(c, r)) {
// We should only delete messages that can be locked. removeMessage(c, r);
if (lockMessage(r)) { if (++counter >= maximumMessages && maximumMessages > 0) {
removeMessage(c, r); break;
if (++counter >= maximumMessages && maximumMessages > 0) {
break;
}
} }
} }
} }
} }
@ -669,11 +676,11 @@ public class Queue implements Destination {
* @return the number of messages copied * @return the number of messages copied
*/ */
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
doDispatch(doPageIn());
int counter = 0; int counter = 0;
synchronized (messages) { synchronized (pagedInMessages) {
messages.reset(); for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
while(messages.hasNext()) { MessageReference r = (MessageReference) i.next();
MessageReference r = (MessageReference) messages.next();
if (filter.evaluate(context, r)) { if (filter.evaluate(context, r)) {
r.incrementReferenceCount(); r.incrementReferenceCount();
try { try {
@ -719,11 +726,11 @@ public class Queue implements Destination {
* Moves the messages matching the given filter up to the maximum number of matched messages * Moves the messages matching the given filter up to the maximum number of matched messages
*/ */
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
doDispatch(doPageIn());
int counter = 0; int counter = 0;
synchronized (messages) { synchronized (pagedInMessages) {
messages.reset(); for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
while(messages.hasNext()) { IndirectMessageReference r = (IndirectMessageReference) i.next();
IndirectMessageReference r = (IndirectMessageReference) messages.next();
if (filter.evaluate(context, r)) { if (filter.evaluate(context, r)) {
// We should only move messages that can be locked. // We should only move messages that can be locked.
if (lockMessage(r)) { if (lockMessage(r)) {
@ -745,6 +752,19 @@ public class Queue implements Destination {
} }
return counter; return counter;
} }
/**
* @return
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate(){
try{
doDispatch(doPageIn(false));
}catch(Exception e){
log.error("Failed to page in more queue messages ",e);
}
return false;
}
protected MessageReferenceFilter createMessageIdFilter(final String messageId) { protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
return new MessageReferenceFilter() { return new MessageReferenceFilter() {
@ -771,6 +791,7 @@ public class Queue implements Destination {
}; };
} }
protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws IOException { protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws IOException {
MessageAck ack = new MessageAck(); MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
@ -790,4 +811,65 @@ public class Queue implements Destination {
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
return answer; return answer;
} }
private void sendMessage(final ConnectionContext context,Message msg) throws Exception{
synchronized(messages){
messages.addMessageLast(msg);
}
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
doDispatch(doPageIn(false));
}
private List doPageIn() throws Exception{
return doPageIn(true);
}
private List doPageIn(boolean force) throws Exception{
final int toPageIn=maximumPagedInMessages-pagedInMessages.size();
List result=null;
if((force || !consumers.isEmpty())&&toPageIn>0){
try{
dispatchValve.increment();
int count=0;
result=new ArrayList(toPageIn);
synchronized(messages){
messages.reset();
while(messages.hasNext()&&count<toPageIn){
MessageReference node=messages.next();
messages.remove();
node=createMessageReference(node.getMessage());
result.add(node);
count++;
}
}
synchronized(pagedInMessages){
pagedInMessages.addAll(result);
}
}finally{
queueMsgConext.clear();
dispatchValve.decrement();
}
}
return result;
}
private void doDispatch(List list) throws Exception{
if(list!=null&&!list.isEmpty()){
try{
dispatchValve.increment();
for(int i=0;i<list.size();i++){
MessageReference node=(MessageReference)list.get(i);
queueMsgConext.setDestination(destination);
queueMsgConext.setMessageReference(node);
dispatchPolicy.dispatch(node,queueMsgConext,consumers);
}
}finally{
queueMsgConext.clear();
dispatchValve.decrement();
}
}
}
} }

View File

@ -410,7 +410,7 @@ public class Topic implements Destination {
msgContext.setDestination(destination); msgContext.setDestination(destination);
msgContext.setMessageReference(message); msgContext.setMessageReference(message);
if (!dispatchPolicy.dispatch(context, message, msgContext, consumers)) { if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
onMessageWithNoConsumers(context, message); onMessageWithNoConsumers(context, message);
} }
} }

View File

@ -92,4 +92,11 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{
public void resetForGC(){ public void resetForGC(){
reset(); reset();
} }
/**
* @param node
* @see org.apache.activemq.broker.region.cursors.PendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/
public void remove(MessageReference node){
}
} }

View File

@ -117,7 +117,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
public void remove(){ public void remove(){
iter.remove(); iter.remove();
} }
public void remove(MessageReference node){
list.remove(node);
}
/** /**
* @return the number of pending messages * @return the number of pending messages
*/ */

View File

@ -119,4 +119,10 @@ public interface PendingMessageCursor extends Service{
* messages from memory only * messages from memory only
*/ */
public void resetForGC(); public void resetForGC();
/**
* remove a node
* @param node
*/
public void remove(MessageReference node);
} }

View File

@ -181,6 +181,13 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
} }
pendingCount--; pendingCount--;
} }
public void remove(MessageReference node){
if(currentCursor!=null){
currentCursor.remove(node);
}
pendingCount--;
}
public synchronized void reset(){ public synchronized void reset(){
for(Iterator i=storePrefetches.iterator();i.hasNext();){ for(Iterator i=storePrefetches.iterator();i.hasNext();){

View File

@ -93,4 +93,19 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor{
public void clear(){ public void clear(){
list.clear(); list.clear();
} }
public void remove(MessageReference node) {
boolean matched = false;
int size = list.size();
for (Iterator i = list.iterator();i.hasNext();) {
MessageReference ref = (MessageReference)i.next();
System.err.println("MATCHIG " + node.getMessageId() + " AGAINST " + ref.getMessageId());
if(node.getMessageId().equals(ref.getMessageId())){
i.remove();
matched = true;
break;
}
}
System.err.println("REMOVED " + node.getMessageId() + " = " + matched + " presize = " + size + " size now = " + list.size());
}
} }

View File

@ -45,6 +45,6 @@ public interface DispatchPolicy {
* *
* @return true if at least one consumer was dispatched or false if there are no active subscriptions that could be dispatched * @return true if at least one consumer was dispatched or false if there are no active subscriptions that could be dispatched
*/ */
boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception; boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception;
} }

View File

@ -18,13 +18,13 @@
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ConnectionContext; import java.util.Iterator;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.commons.logging.Log;
import java.util.Iterator; import org.apache.commons.logging.LogFactory;
import java.util.List;
/** /**
* Simple dispatch policy that sends a message to every subscription that * Simple dispatch policy that sends a message to every subscription that
@ -35,8 +35,17 @@ import java.util.List;
* @version $Revision$ * @version $Revision$
*/ */
public class RoundRobinDispatchPolicy implements DispatchPolicy { public class RoundRobinDispatchPolicy implements DispatchPolicy {
static final Log log=LogFactory.getLog(RoundRobinDispatchPolicy.class);
public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
/**
* @param node
* @param msgContext
* @param consumers
* @return true if dispatched
* @throws Exception
* @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
*/
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
// Big synch here so that only 1 message gets dispatched at a time. Ensures // Big synch here so that only 1 message gets dispatched at a time. Ensures
// Everyone sees the same order and that the consumer list is not used while // Everyone sees the same order and that the consumer list is not used while
@ -59,6 +68,7 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
try { try {
consumers.add(consumers.remove(0)); consumers.add(consumers.remove(0));
} catch (Throwable bestEffort) { } catch (Throwable bestEffort) {
log.error("Caught error rotating consumers");
} }
return count > 0; return count > 0;
} }

View File

@ -18,14 +18,12 @@
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ConnectionContext; import java.util.Iterator;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Iterator;
import java.util.List;
/** /**
* Simple dispatch policy that sends a message to every subscription that * Simple dispatch policy that sends a message to every subscription that
* matches the message. * matches the message.
@ -36,7 +34,7 @@ import java.util.List;
*/ */
public class SimpleDispatchPolicy implements DispatchPolicy { public class SimpleDispatchPolicy implements DispatchPolicy {
public boolean dispatch(ConnectionContext context, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception { public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
int count = 0; int count = 0;
for (Iterator iter = consumers.iterator(); iter.hasNext();) { for (Iterator iter = consumers.iterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next(); Subscription sub = (Subscription) iter.next();

View File

@ -18,14 +18,12 @@
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ConnectionContext; import java.util.Iterator;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import java.util.Iterator;
import java.util.List;
/** /**
* Dispatch policy that causes every subscription to see messages in the same order. * Dispatch policy that causes every subscription to see messages in the same order.
* *
@ -35,7 +33,15 @@ import java.util.List;
*/ */
public class StrictOrderDispatchPolicy implements DispatchPolicy { public class StrictOrderDispatchPolicy implements DispatchPolicy {
public boolean dispatch(ConnectionContext newParam, MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception { /**
* @param node
* @param msgContext
* @param consumers
* @return true if dispatched
* @throws Exception
* @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
*/
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception {
// Big synch here so that only 1 message gets dispatched at a time. Ensures // Big synch here so that only 1 message gets dispatched at a time. Ensures
// Everyone sees the same order. // Everyone sees the same order.
synchronized(consumers) { synchronized(consumers) {