diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 2c8255c8f6..b333152854 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -1,18 +1,15 @@ /** - * + * * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.broker.region; @@ -34,165 +31,157 @@ import org.apache.activemq.memory.UsageManager; import org.apache.activemq.transaction.Synchronization; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; -public class TopicSubscription extends AbstractSubscription { - private static final Log log = LogFactory.getLog(TopicSubscription.class); - final protected LinkedList matched = new LinkedList(); - final protected ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ"); + +public class TopicSubscription extends AbstractSubscription{ + private static final Log log=LogFactory.getLog(TopicSubscription.class); + final protected LinkedList matched=new LinkedList(); + final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ"); final protected UsageManager usageManager; - protected int dispatched=0; - protected int delivered=0; - private int maximumPendingMessages = -1; - - public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException { - super(broker,context, info); + protected AtomicInteger dispatched=new AtomicInteger(); + protected AtomicInteger delivered=new AtomicInteger(); + private int maximumPendingMessages=-1; + private final Object matchedListMutex=new Object(); + + public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager) + throws InvalidSelectorException{ + super(broker,context,info); this.usageManager=usageManager; } - public void add(MessageReference node) throws InterruptedException, IOException { + public void add(MessageReference node) throws InterruptedException,IOException{ node.incrementReferenceCount(); - if( !isFull() && !isSlaveBroker()) { + if(!isFull()&&!isSlaveBroker()){ // if maximumPendingMessages is set we will only discard messages which // have not been dispatched (i.e. we allow the prefetch buffer to be filled) dispatch(node); - } else { - if (maximumPendingMessages != 0) { - synchronized (matched) { + }else{ + if(maximumPendingMessages!=0){ + synchronized(matchedListMutex){ matched.addLast(node); - // NOTE - be careful about the slaveBroker! - if (maximumPendingMessages > 0) { - log.warn("discarding " + (matched.size() - maximumPendingMessages) + " messages for slow consumer"); + if(maximumPendingMessages>0){ // lets discard old messages as we are a slow consumer - while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { - MessageReference oldMessage = (MessageReference) matched.removeFirst(); + while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){ + MessageReference oldMessage=(MessageReference) matched.removeFirst(); oldMessage.decrementReferenceCount(); + if (log.isDebugEnabled()){ + log.debug("Discarding message " + oldMessage); + } } } } } - } + } } - - public void processMessageDispatchNotification(MessageDispatchNotification mdn){ - synchronized(matched){ - for (Iterator i = matched.iterator(); i.hasNext();){ - MessageReference node = (MessageReference)i.next(); - if (node.getMessageId().equals(mdn.getMessageId())){ + + public void processMessageDispatchNotification(MessageDispatchNotification mdn){ + synchronized(matchedListMutex){ + for(Iterator i=matched.iterator();i.hasNext();){ + MessageReference node=(MessageReference) i.next(); + if(node.getMessageId().equals(mdn.getMessageId())){ i.remove(); - dispatched++; + dispatched.incrementAndGet(); node.decrementReferenceCount(); break; } } } } - - public void acknowledge(final ConnectionContext context, final MessageAck ack) throws Throwable { - + + public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Throwable{ // Handle the standard acknowledgment case. - boolean wasFull = isFull(); - if( ack.isStandardAck() || ack.isPoisonAck() ) { - if ( context.isInTransaction() ) { - delivered += ack.getMessageCount(); + boolean wasFull=isFull(); + if(ack.isStandardAck()||ack.isPoisonAck()){ + if(context.isInTransaction()){ + delivered.addAndGet(ack.getMessageCount()); context.getTransaction().addSynchronization(new Synchronization(){ - public void afterCommit() throws Throwable { - synchronized(TopicSubscription.this) { - dispatched -= ack.getMessageCount(); - delivered = Math.max(0, delivered - ack.getMessageCount()); - } + public void afterCommit() throws Throwable{ + dispatched.addAndGet(-ack.getMessageCount()); + delivered.set(Math.max(0,delivered.get()-ack.getMessageCount())); } }); - } else { - dispatched -= ack.getMessageCount(); - delivered = Math.max(0, delivered - ack.getMessageCount()); + }else{ + dispatched.addAndGet(-ack.getMessageCount()); + delivered.set(Math.max(0,delivered.get()-ack.getMessageCount())); } - - if( wasFull && !isFull() ) { + if(wasFull&&!isFull()){ dispatchMatched(); } return; - - } else if( ack.isDeliveredAck() ) { + }else if(ack.isDeliveredAck()){ // Message was delivered but not acknowledged: update pre-fetch counters. - delivered += ack.getMessageCount(); - if( wasFull && !isFull() ) { + delivered.addAndGet(ack.getMessageCount()); + if(wasFull&&!isFull()){ dispatchMatched(); } return; } - throw new JMSException("Invalid acknowledgment: "+ack); } - + public int pending(){ - return matched.size() - dispatched; + return matched.size()-dispatched.get(); } - + public int dispatched(){ - return dispatched; + return dispatched.get(); } - + public int delivered(){ - return delivered; + return delivered.get(); } - - public int getMaximumPendingMessages() { + + public int getMaximumPendingMessages(){ return maximumPendingMessages; } /** - * Sets the maximum number of pending messages that can be matched against this consumer - * before old messages are discarded. + * Sets the maximum number of pending messages that can be matched against this consumer before old messages are + * discarded. */ - public void setMaximumPendingMessages(int maximumPendingMessages) { - this.maximumPendingMessages = maximumPendingMessages; + public void setMaximumPendingMessages(int maximumPendingMessages){ + this.maximumPendingMessages=maximumPendingMessages; } - private boolean isFull() { - return dispatched-delivered >= info.getPrefetchSize(); + private boolean isFull(){ + return dispatched.get()-delivered.get()>=info.getPrefetchSize(); } - - private void dispatchMatched() throws IOException { - for (Iterator iter = matched.iterator(); iter.hasNext() && !isFull();) { - MessageReference message = (MessageReference) iter.next(); - iter.remove(); - dispatch(message); + + private void dispatchMatched() throws IOException{ + synchronized(matchedListMutex){ + for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){ + MessageReference message=(MessageReference) iter.next(); + iter.remove(); + dispatch(message); + } } } - private void dispatch(final MessageReference node) throws IOException { - - Message message = (Message) node; - - // Make sure we can dispatch a message. - MessageDispatch md = new MessageDispatch(); + private void dispatch(final MessageReference node) throws IOException{ + Message message=(Message) node; + // Make sure we can dispatch a message. + MessageDispatch md=new MessageDispatch(); md.setMessage(message); - md.setConsumerId( info.getConsumerId() ); - md.setDestination( node.getRegionDestination().getActiveMQDestination() ); - - dispatched++; - if( info.isDispatchAsync() ) { + md.setConsumerId(info.getConsumerId()); + md.setDestination(node.getRegionDestination().getActiveMQDestination()); + dispatched.incrementAndGet(); + if(info.isDispatchAsync()){ md.setConsumer(new Runnable(){ - public void run() { + public void run(){ node.decrementReferenceCount(); } }); context.getConnection().dispatchAsync(md); - } else { - context.getConnection().dispatchSync(md); + }else{ + context.getConnection().dispatchSync(md); node.decrementReferenceCount(); - } - } - - public String toString() { - return - "TopicSubscription:" + - " consumer="+info.getConsumerId()+ - ", destinations="+destinations.size()+ - ", dispatched="+dispatched+ - ", delivered="+this.delivered+ - ", matched="+this.matched.size(); + } } + public String toString(){ + return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size() + +", dispatched="+dispatched+", delivered="+this.delivered+", matched="+this.matched.size(); + } }