diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index c01bbab844..4e0798c064 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -1,27 +1,22 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.broker.region; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -45,91 +40,86 @@ import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + /** * A subscription that honors the pre-fetch option of the ConsumerInfo. * * @version $Revision: 1.15 $ */ abstract public class PrefetchSubscription extends AbstractSubscription{ - + static private final Log log=LogFactory.getLog(PrefetchSubscription.class); final protected PendingMessageCursor pending; final protected LinkedList dispatched=new LinkedList(); - protected int prefetchExtension=0; - protected long enqueueCounter; protected long dispatchCounter; protected long dequeueCounter; - private AtomicBoolean dispatching = new AtomicBoolean(); - - public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor) - throws InvalidSelectorException{ + private AtomicBoolean dispatching=new AtomicBoolean(); + + public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,PendingMessageCursor cursor) + throws InvalidSelectorException{ super(broker,context,info); - pending = cursor; - } - - public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info) - throws InvalidSelectorException{ - this(broker,context,info,new VMPendingMessageCursor()); + pending=cursor; + } + + public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info) + throws InvalidSelectorException{ + this(broker,context,info,new VMPendingMessageCursor()); } - /** * Allows a message to be pulled on demand by a client */ - public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { - // The slave should not deliver pull messages. TODO: when the slave becomes a master, - // He should send a NULL message to all the consumers to 'wake them up' in case - // they were waiting for a message. - if (getPrefetchSize() == 0 && !isSlaveBroker()) { + public synchronized Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception{ + // The slave should not deliver pull messages. TODO: when the slave becomes a master, + // He should send a NULL message to all the consumers to 'wake them up' in case + // they were waiting for a message. + if(getPrefetchSize()==0&&!isSlaveBroker()){ prefetchExtension++; - - final long dispatchCounterBeforePull = dispatchCounter; + final long dispatchCounterBeforePull=dispatchCounter; dispatchMatched(); - - // If there was nothing dispatched.. we may need to setup a timeout. - if( dispatchCounterBeforePull == dispatchCounter ) { - // imediate timeout used by receiveNoWait() - if( pull.getTimeout() == -1 ) { - // Send a NULL message. - add(QueueMessageReference.NULL_MESSAGE); - dispatchMatched(); - } - if( pull.getTimeout() > 0 ) { - Scheduler.executeAfterDelay(new Runnable(){ - public void run() { - pullTimeout(dispatchCounterBeforePull); - } - }, pull.getTimeout()); - } - } + // If there was nothing dispatched.. we may need to setup a timeout. + if(dispatchCounterBeforePull==dispatchCounter){ + // imediate timeout used by receiveNoWait() + if(pull.getTimeout()==-1){ + // Send a NULL message. + add(QueueMessageReference.NULL_MESSAGE); + dispatchMatched(); + } + if(pull.getTimeout()>0){ + Scheduler.executeAfterDelay(new Runnable(){ + + public void run(){ + pullTimeout(dispatchCounterBeforePull); + } + },pull.getTimeout()); + } + } } return null; } - + /** - * Occurs when a pull times out. If nothing has been dispatched - * since the timeout was setup, then send the NULL message. + * Occurs when a pull times out. If nothing has been dispatched since the timeout was setup, then send the NULL + * message. */ - private void pullTimeout(long dispatchCounterBeforePull) { - if( dispatchCounterBeforePull == dispatchCounter ) { - try { - add(QueueMessageReference.NULL_MESSAGE); - dispatchMatched(); - } catch (Exception e) { - context.getConnection().serviceException(e); - } - } - } - - public void add(MessageReference node) throws Exception{ - boolean pendingEmpty=false; - - synchronized(pending){ - pendingEmpty=pending.isEmpty(); - enqueueCounter++; + private void pullTimeout(long dispatchCounterBeforePull){ + if(dispatchCounterBeforePull==dispatchCounter){ + try{ + add(QueueMessageReference.NULL_MESSAGE); + dispatchMatched(); + }catch(Exception e){ + context.getConnection().serviceException(e); + } } + } + + public synchronized void add(MessageReference node) throws Exception{ + boolean pendingEmpty=false; + pendingEmpty=pending.isEmpty(); + enqueueCounter++; + if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){ dispatch(node); }else{ @@ -137,142 +127,133 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ synchronized(pending){ if(pending.isEmpty()&&log.isDebugEnabled()){ log.debug("Prefetch limit."); - } + } pending.addMessageLast(node); } - //we might be able to dispatch messages (i.e. not full() anymore) - dispatchMatched(); } } - - public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{ - synchronized(pending){ - try{ - pending.reset(); - while(pending.hasNext()){ - MessageReference node=pending.next(); - if(node.getMessageId().equals(mdn.getMessageId())){ - pending.remove(); - createMessageDispatch(node,node.getMessage()); - dispatched.addLast(node); - - return; - } + public synchronized void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{ + try{ + pending.reset(); + while(pending.hasNext()){ + MessageReference node=pending.next(); + if(node.getMessageId().equals(mdn.getMessageId())){ + pending.remove(); + createMessageDispatch(node,node.getMessage()); + dispatched.addLast(node); + return; } - }finally{ - pending.release(); } - throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId() - +") was not in the pending list"); + }finally{ + pending.release(); } + throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId() + +") was not in the pending list"); } - public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{ + public synchronized void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{ // Handle the standard acknowledgment case. boolean callDispatchMatched=false; - synchronized(dispatched){ - if(ack.isStandardAck()){ - // Acknowledge all dispatched messages up till the message id of the acknowledgment. - int index=0; - boolean inAckRange=false; - for(Iterator iter=dispatched.iterator();iter.hasNext();){ - final MessageReference node=(MessageReference)iter.next(); - MessageId messageId=node.getMessageId(); - if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ - inAckRange=true; - } - if(inAckRange){ - // Don't remove the nodes until we are committed. - if(!context.isInTransaction()){ - dequeueCounter++; - node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); - iter.remove(); - }else{ - // setup a Synchronization to remove nodes from the dispatched list. - context.getTransaction().addSynchronization(new Synchronization(){ + if(ack.isStandardAck()){ + // Acknowledge all dispatched messages up till the message id of the acknowledgment. + int index=0; + boolean inAckRange=false; + for(Iterator iter=dispatched.iterator();iter.hasNext();){ + final MessageReference node=(MessageReference)iter.next(); + MessageId messageId=node.getMessageId(); + if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ + inAckRange=true; + } + if(inAckRange){ + // Don't remove the nodes until we are committed. + if(!context.isInTransaction()){ + dequeueCounter++; + node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); + iter.remove(); + }else{ + // setup a Synchronization to remove nodes from the dispatched list. + context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Exception{ - synchronized(PrefetchSubscription.this){ - dequeueCounter++; - dispatched.remove(node); - node.getRegionDestination().getDestinationStatistics().getDequeues() - .increment(); - prefetchExtension--; - } + public void afterCommit() throws Exception{ + synchronized(PrefetchSubscription.this){ + dequeueCounter++; + dispatched.remove(node); + node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); + prefetchExtension--; } - - public void afterRollback() throws Exception{ - super.afterRollback(); - } - }); - } - index++; - acknowledge(context,ack,node); - if(ack.getLastMessageId().equals(messageId)){ - if(context.isInTransaction()){ - // extend prefetch window only if not a pulling consumer - if(getPrefetchSize()!=0){ - prefetchExtension=Math.max(prefetchExtension,index+1); - } - }else{ - prefetchExtension=Math.max(0,prefetchExtension-(index+1)); } - callDispatchMatched=true; - break; - } + + public void afterRollback() throws Exception{ + super.afterRollback(); + } + }); } - } - // this only happens after a reconnect - get an ack which is not valid - if(!callDispatchMatched){ - log.info("Could not correlate acknowledgment with dispatched message: "+ack); - } - }else if(ack.isDeliveredAck()){ - // Message was delivered but not acknowledged: update pre-fetch counters. - // Acknowledge all dispatched messages up till the message id of the acknowledgment. - int index=0; - for(Iterator iter=dispatched.iterator();iter.hasNext();index++){ - final MessageReference node=(MessageReference)iter.next(); - if(ack.getLastMessageId().equals(node.getMessageId())){ - prefetchExtension=Math.max(prefetchExtension,index+1); + index++; + acknowledge(context,ack,node); + if(ack.getLastMessageId().equals(messageId)){ + if(context.isInTransaction()){ + // extend prefetch window only if not a pulling consumer + if(getPrefetchSize()!=0){ + prefetchExtension=Math.max(prefetchExtension,index+1); + } + }else{ + prefetchExtension=Math.max(0,prefetchExtension-(index+1)); + } callDispatchMatched=true; break; } } - if(!callDispatchMatched){ - throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); + } + // this only happens after a reconnect - get an ack which is not valid + if(!callDispatchMatched){ + log.info("Could not correlate acknowledgment with dispatched message: "+ack); + } + }else if(ack.isDeliveredAck()){ + // Message was delivered but not acknowledged: update pre-fetch counters. + // Acknowledge all dispatched messages up till the message id of the acknowledgment. + int index=0; + for(Iterator iter=dispatched.iterator();iter.hasNext();index++){ + final MessageReference node=(MessageReference)iter.next(); + if(ack.getLastMessageId().equals(node.getMessageId())){ + prefetchExtension=Math.max(prefetchExtension,index+1); + callDispatchMatched=true; + break; } - }else if(ack.isPoisonAck()){ - // TODO: what if the message is already in a DLQ??? - // Handle the poison ACK case: we need to send the message to a DLQ - if(ack.isInTransaction()) - throw new JMSException("Poison ack cannot be transacted: "+ack); - // Acknowledge all dispatched messages up till the message id of the acknowledgment. - int index=0; - boolean inAckRange=false; - for(Iterator iter=dispatched.iterator();iter.hasNext();){ - final MessageReference node=(MessageReference)iter.next(); - MessageId messageId=node.getMessageId(); - if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ - inAckRange=true; - } - if(inAckRange){ - sendToDLQ(context,node); - node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); - iter.remove(); - dequeueCounter++; - index++; - acknowledge(context,ack,node); - if(ack.getLastMessageId().equals(messageId)){ - prefetchExtension=Math.max(0,prefetchExtension-(index+1)); - callDispatchMatched=true; - break; - } + } + if(!callDispatchMatched){ + throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); + } + }else if(ack.isPoisonAck()){ + // TODO: what if the message is already in a DLQ??? + // Handle the poison ACK case: we need to send the message to a DLQ + if(ack.isInTransaction()) + throw new JMSException("Poison ack cannot be transacted: "+ack); + // Acknowledge all dispatched messages up till the message id of the acknowledgment. + int index=0; + boolean inAckRange=false; + for(Iterator iter=dispatched.iterator();iter.hasNext();){ + final MessageReference node=(MessageReference)iter.next(); + MessageId messageId=node.getMessageId(); + if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){ + inAckRange=true; + } + if(inAckRange){ + sendToDLQ(context,node); + node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); + iter.remove(); + dequeueCounter++; + index++; + acknowledge(context,ack,node); + if(ack.getLastMessageId().equals(messageId)){ + prefetchExtension=Math.max(0,prefetchExtension-(index+1)); + callDispatchMatched=true; + break; } } - if(!callDispatchMatched){ - throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); - } + } + if(!callDispatchMatched){ + throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); } } if(callDispatchMatched){ @@ -293,7 +274,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ * @throws IOException * @throws Exception */ - protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception { + protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException,Exception{ // Send the message to the DLQ Message message=node.getMessage(); if(message!=null){ @@ -301,142 +282,118 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ // sent, // it is only populated if the message is routed to another destination like the DLQ DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy(); - ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination()); - BrokerSupport.resend(context, message, deadLetterDestination); - + ActiveMQDestination deadLetterDestination=deadLetterStrategy + .getDeadLetterQueueFor(message.getDestination()); + BrokerSupport.resend(context,message,deadLetterDestination); } } /** * Used to determine if the broker can dispatch to the consumer. + * * @return */ - protected boolean isFull(){ - return isSlaveBroker() || dispatched.size()-prefetchExtension>=info.getPrefetchSize(); + protected synchronized boolean isFull(){ + return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize(); } - + /** * @return true when 60% or more room is left for dispatching messages */ public boolean isLowWaterMark(){ - return (dispatched.size()-prefetchExtension) <= (info.getPrefetchSize() *.4); + return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4); } - + /** * @return true when 10% or less room is left for dispatching messages */ public boolean isHighWaterMark(){ - return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9); + return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9); } - - public int countBeforeFull() { - return info.getPrefetchSize() + prefetchExtension - dispatched.size(); + + public synchronized int countBeforeFull(){ + return info.getPrefetchSize()+prefetchExtension-dispatched.size(); } - + public int getPendingQueueSize(){ - synchronized(pending) { - return pending.size(); - } + synchronized(pending){ + return pending.size(); + } } - + public int getDispatchedQueueSize(){ synchronized(dispatched){ return dispatched.size(); } } - + synchronized public long getDequeueCounter(){ return dequeueCounter; } - - synchronized public long getDispatchedCounter() { + + synchronized public long getDispatchedCounter(){ return dispatchCounter; } - - synchronized public long getEnqueueCounter() { + + synchronized public long getEnqueueCounter(){ return enqueueCounter; } - + public boolean isRecoveryRequired(){ return pending.isRecoveryRequired(); } - + /** * optimize message consumer prefetch if the consumer supports it - * + * */ public void optimizePrefetch(){ - /* - if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null - &&context.getConnection().isManageable()){ - if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){ - info.setCurrentPrefetchSize(info.getPrefetchSize()); - updateConsumerPrefetch(info.getPrefetchSize()); - }else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){ - // want to purge any outstanding acks held by the consumer - info.setCurrentPrefetchSize(1); - updateConsumerPrefetch(1); - } - } - */ + /* + * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null + * &&context.getConnection().isManageable()){ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && + * isLowWaterMark()){ info.setCurrentPrefetchSize(info.getPrefetchSize()); + * updateConsumerPrefetch(info.getPrefetchSize()); }else + * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){ // want to purge any + * outstanding acks held by the consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } } + */ } - - public void add(ConnectionContext context,Destination destination) throws Exception{ + + public synchronized void add(ConnectionContext context,Destination destination) throws Exception{ super.add(context,destination); - synchronized(pending){ - pending.add(context,destination); - } + pending.add(context,destination); } - public void remove(ConnectionContext context,Destination destination) throws Exception{ + public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{ super.remove(context,destination); - synchronized(pending){ - pending.remove(context,destination); - } + pending.remove(context,destination); } - protected void dispatchMatched() throws IOException{ - if(!broker.isSlaveBroker() && dispatching.compareAndSet(false,true)){ + if(!broker.isSlaveBroker()&&dispatching.compareAndSet(false,true)){ try{ - List toDispatch=null; - synchronized(pending){ - try{ - int numberToDispatch=countBeforeFull(); - if(numberToDispatch>0){ - int count=0; - pending.reset(); - while(pending.hasNext()&&!isFull()&&count0){ + int count=0; + pending.reset(); + while(pending.hasNext()&&!isFull()&&count