git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490007 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-24 08:57:34 +00:00
parent 3be0113228
commit d8674a0fed
1 changed files with 62 additions and 48 deletions

View File

@ -18,12 +18,12 @@
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -56,11 +56,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
final protected LinkedList dispatched=new LinkedList(); final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0; protected int prefetchExtension=0;
boolean dispatching=false;
long enqueueCounter; protected long enqueueCounter;
long dispatchCounter; protected long dispatchCounter;
long dequeueCounter; protected long dequeueCounter;
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor) public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor)
throws InvalidSelectorException{ throws InvalidSelectorException{
@ -123,8 +122,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
public void add(MessageReference node) throws Exception{ public void add(MessageReference node) throws Exception{
try { boolean pendingEmpty=false;
boolean pendingEmpty = false;
synchronized(pending){ synchronized(pending){
pendingEmpty=pending.isEmpty(); pendingEmpty=pending.isEmpty();
enqueueCounter++; enqueueCounter++;
@ -134,18 +132,17 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}else{ }else{
optimizePrefetch(); optimizePrefetch();
synchronized(pending){ synchronized(pending){
if(log.isDebugEnabled() && pending.isEmpty()){ if(pending.isEmpty()&&log.isDebugEnabled()){
log.debug("Prefetch limit."); log.debug("Prefetch limit.");
} }
pending.addMessageLast(node); pending.addMessageLast(node);
} }
} //we might be able to dispatch messages (i.e. not full() anymore)
}catch(Throwable e) { dispatchMatched();
e.printStackTrace();
} }
} }
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{ public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{
synchronized(pending){ synchronized(pending){
try{ try{
@ -169,6 +166,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{ public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case. // Handle the standard acknowledgment case.
boolean callDispatchMatched=false;
synchronized(dispatched){ synchronized(dispatched){
if(ack.isStandardAck()){ if(ack.isStandardAck()){
// Acknowledge all dispatched messages up till the message id of the acknowledgment. // Acknowledge all dispatched messages up till the message id of the acknowledgment.
@ -216,13 +214,15 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}else{ }else{
prefetchExtension=Math.max(0,prefetchExtension-(index+1)); prefetchExtension=Math.max(0,prefetchExtension-(index+1));
} }
dispatchMatched(); callDispatchMatched=true;
return; break;
} }
} }
} }
//this only happens after a reconnect - get an ack which is not valid // this only happens after a reconnect - get an ack which is not valid
log.info("Could not correlate acknowledgment with dispatched message: "+ack); if(!callDispatchMatched){
log.info("Could not correlate acknowledgment with dispatched message: "+ack);
}
}else if(ack.isDeliveredAck()){ }else if(ack.isDeliveredAck()){
// Message was delivered but not acknowledged: update pre-fetch counters. // Message was delivered but not acknowledged: update pre-fetch counters.
// Acknowledge all dispatched messages up till the message id of the acknowledgment. // Acknowledge all dispatched messages up till the message id of the acknowledgment.
@ -231,11 +231,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
final MessageReference node=(MessageReference)iter.next(); final MessageReference node=(MessageReference)iter.next();
if(ack.getLastMessageId().equals(node.getMessageId())){ if(ack.getLastMessageId().equals(node.getMessageId())){
prefetchExtension=Math.max(prefetchExtension,index+1); prefetchExtension=Math.max(prefetchExtension,index+1);
dispatchMatched(); callDispatchMatched=true;
return; break;
} }
} }
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); if(!callDispatchMatched){
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
}
}else if(ack.isPoisonAck()){ }else if(ack.isPoisonAck()){
// TODO: what if the message is already in a DLQ??? // TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a DLQ // Handle the poison ACK case: we need to send the message to a DLQ
@ -259,14 +261,19 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
acknowledge(context,ack,node); acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){ if(ack.getLastMessageId().equals(messageId)){
prefetchExtension=Math.max(0,prefetchExtension-(index+1)); prefetchExtension=Math.max(0,prefetchExtension-(index+1));
dispatchMatched(); callDispatchMatched=true;
return; break;
} }
} }
} }
if(!callDispatchMatched){
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
}
} }
}
if(callDispatchMatched){
dispatchMatched();
}else{
if(isSlaveBroker()){ if(isSlaveBroker()){
throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack
+") was not in the dispatch list: "+dispatched); +") was not in the dispatch list: "+dispatched);
@ -366,40 +373,47 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
*/ */
} }
public void add(ConnectionContext context, Destination destination) throws Exception { public void add(ConnectionContext context,Destination destination) throws Exception{
super.add(context,destination); super.add(context,destination);
pending.add(context,destination); synchronized(pending){
pending.add(context,destination);
}
} }
public void remove(ConnectionContext context, Destination destination) throws Exception { public void remove(ConnectionContext context,Destination destination) throws Exception{
super.remove(context,destination); super.remove(context,destination);
pending.remove(context,destination); synchronized(pending){
pending.remove(context,destination);
}
} }
protected void dispatchMatched() throws IOException{ protected void dispatchMatched() throws IOException{
List toDispatch=null;
synchronized(pending){ synchronized(pending){
if(!dispatching){ try{
dispatching=true; pending.reset();
try{ while(pending.hasNext()&&!isFull()){
pending.reset(); MessageReference node=pending.next();
while(pending.hasNext()&&!isFull()){ pending.remove();
MessageReference node=pending.next(); // Message may have been sitting in the pending list a while
pending.remove(); // waiting for the consumer to ak the message.
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
// Message may have been sitting in the pending list a while continue; // just drop it.
// waiting for the consumer to ak the message.
if( node != QueueMessageReference.NULL_MESSAGE && node.isExpired() ) {
continue; // just drop it.
}
dispatch(node);
} }
}finally{ if(toDispatch==null){
pending.release(); toDispatch=new ArrayList();
dispatching=false; }
toDispatch.add(node);
} }
}finally{
pending.release();
}
}
if(toDispatch!=null){
for(int i=0;i<toDispatch.size();i++){
MessageReference node=(MessageReference)toDispatch.get(i);
dispatch(node);
} }
} }
} }