From 3fe7760e4a01cd1effc4b3c5e2546385c57cd2a8 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 6 Oct 2008 14:11:51 +0000 Subject: [PATCH] Applying AMQ-1957.. Thanks for the patch. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@702152 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/IndirectMessageReference.java | 2 +- .../broker/region/PrefetchSubscription.java | 55 +-- .../apache/activemq/broker/region/Queue.java | 56 +-- .../SubscriptionAddRemoveQueueTest.java | 343 ------------------ 4 files changed, 67 insertions(+), 389 deletions(-) delete mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index eaeb2269dc..309db4ace6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -88,7 +88,7 @@ public class IndirectMessageReference implements QueueMessageReference { public boolean lock(LockOwner subscription) { synchronized (this) { - if (dropped || (lockOwner != null && lockOwner != subscription)) { + if (dropped || lockOwner != null) { return false; } lockOwner = subscription; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index cc0e3ca73c..6db933e3a0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -157,9 +157,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription { while (pending.hasNext()) { MessageReference node = pending.next(); if (node.getMessageId().equals(mdn.getMessageId())) { - pending.remove(); - createMessageDispatch(node, node.getMessage()); + // Synchronize between dispatched list and removal of messages from pending list + // related to remove subscription action synchronized(dispatchLock) { + pending.remove(); + createMessageDispatch(node, node.getMessage()); dispatched.add(node); } return; @@ -532,11 +534,18 @@ public abstract class PrefetchSubscription extends AbstractSubscription { List rc = new ArrayList(); synchronized(pendingLock) { super.remove(context, destination); - for (MessageReference r : dispatched) { - if( r.getRegionDestination() == destination ) { - rc.add((QueueMessageReference)r); - } + // Synchronized to DispatchLock + synchronized(dispatchLock) { + for (MessageReference r : dispatched) { + if( r.getRegionDestination() == destination) { + rc.add((QueueMessageReference)r); + } + } } + // TODO Dispatched messages should be decremented from Inflight stat + // Here is a potential problem concerning Inflight stat: + // Messages not already committed or rolled back may not be removed from dispatched list at the moment + // Except if each commit or rollback callback action comes before remove of subscriber. rc.addAll(pending.remove(context, destination)); } return rc; @@ -559,19 +568,23 @@ public abstract class PrefetchSubscription extends AbstractSubscription { break; } - pending.remove(); - if( !isDropped(node) && canDispatch(node)) { + // Synchronize between dispatched list and remove of messageg from pending list + // related to remove subscription action + synchronized(dispatchLock) { + pending.remove(); + if( !isDropped(node) && canDispatch(node)) { - // Message may have been sitting in the pending - // list a while waiting for the consumer to ak the message. - if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { - //increment number to dispatch - numberToDispatch++; - node.getRegionDestination().messageExpired(context, this, node); - continue; + // Message may have been sitting in the pending + // list a while waiting for the consumer to ak the message. + if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { + //increment number to dispatch + numberToDispatch++; + node.getRegionDestination().messageExpired(context, this, node); + continue; + } + dispatch(node); + count++; } - dispatch(node); - count++; } } }else { @@ -596,10 +609,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription { final Message message = node.getMessage(); if (message == null) { return false; - } - // Make sure we can dispatch a message. - if (canDispatch(node) && !isSlave()) { - + } + // No reentrant lock - Patch needed to IndirectMessageReference on method lock + if (!isSlave()) { + MessageDispatch md = createMessageDispatch(node, message); // NULL messages don't count... they don't get Acked. if (node != QueueMessageReference.NULL_MESSAGE) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 830dd22f07..6f594fb895 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -959,6 +959,10 @@ public class Queue extends BaseDestination implements Task { if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) { msgContext.setMessageReference(node); if (rd.subscription.matches(node, msgContext)) { + // Log showing message dispatching + if (LOG.isDebugEnabled()) { + LOG.debug(destination.getQualifiedName() + " - Recovery - Message pushed '" + node.hashCode() + " - " + node + "' to subscription: '" + rd.subscription + "'"); + } rd.subscription.add(node); } else { // make sure it gets queued for dispatched again @@ -1063,23 +1067,26 @@ public class Queue extends BaseDestination implements Task { protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException { reference.setAcked(true); // This sends the ack the the journal.. - acknowledge(context, sub, ack, reference); - if (!ack.isInTransaction()) { + acknowledge(context, sub, ack, reference); dropMessage(reference); wakeup(); } else { - context.getTransaction().addSynchronization(new Synchronization() { + try { + acknowledge(context, sub, ack, reference); + } finally { + context.getTransaction().addSynchronization(new Synchronization() { - public void afterCommit() throws Exception { - dropMessage(reference); - wakeup(); - } + public void afterCommit() throws Exception { + dropMessage(reference); + wakeup(); + } - public void afterRollback() throws Exception { - reference.setAcked(false); - } - }); + public void afterRollback() throws Exception { + reference.setAcked(false); + } + }); + } } } @@ -1153,18 +1160,11 @@ public class Queue extends BaseDestination implements Task { private List doPageIn(boolean force) throws Exception { List result = null; + List resultList = null; dispatchLock.lock(); try{ - - int toPageIn = 0; - if (force) { - toPageIn = getMaxPageSize(); - } else { - toPageIn = (getMaxPageSize() + (int) destinationStatistics - .getInflight().getCount()) - - pagedInMessages.size(); - toPageIn = Math.min(toPageIn, getMaxPageSize()); - } + int toPageIn = getMaxPageSize() + Math.max(0, (int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size(); + toPageIn = Math.max(0, Math.min(toPageIn, getMaxPageSize())); if (isLazyDispatch()&& !force) { // Only page in the minimum number of messages which can be dispatched immediately. toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); @@ -1193,16 +1193,24 @@ public class Queue extends BaseDestination implements Task { messages.release(); } } + // Only add new messages, not already pagedIn to avoid multiple dispatch attempts synchronized (pagedInMessages) { - for(QueueMessageReference ref:result) { - pagedInMessages.put(ref.getMessageId(), ref); + resultList = new ArrayList(result.size()); + for(QueueMessageReference ref : result) { + if (!pagedInMessages.containsKey(ref.getMessageId())) { + pagedInMessages.put(ref.getMessageId(), ref); + resultList.add(ref); + } } } + } else { + // Avoid return null list, if condition is not validated + resultList = new ArrayList(); } }finally { dispatchLock.unlock(); } - return result; + return resultList; } private void doDispatch(List list) throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java deleted file mode 100644 index 9ba14c3292..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java +++ /dev/null @@ -1,343 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.region; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import javax.jms.InvalidSelectorException; -import javax.management.ObjectName; -import junit.framework.TestCase; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatchNotification; -import org.apache.activemq.command.MessagePull; -import org.apache.activemq.command.Response; -import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.thread.TaskRunnerFactory; - -public class SubscriptionAddRemoveQueueTest extends TestCase { - - Queue queue; - Message msg = new ActiveMQMessage(); - ConsumerInfo info = new ConsumerInfo(); - List subs = new ArrayList(); - ConnectionContext context = new ConnectionContext(); - int numSubscriptions = 1000; - boolean working = true; - int senders = 20; - - - @Override - public void setUp() throws Exception { - BrokerService brokerService = new BrokerService(); - brokerService.start(); - ActiveMQDestination destination = new ActiveMQQueue("TEST"); - DestinationStatistics parentStats = new DestinationStatistics(); - parentStats.setEnabled(true); - - TaskRunnerFactory taskFactory = null; - MessageStore store = null; - - msg.setDestination(destination); - info.setDestination(destination); - info.setPrefetchSize(100); - - queue = new Queue(brokerService, destination, store, parentStats, taskFactory); - queue.initialize(); - } - - public void testNoDispatchToRemovedConsumers() throws Exception { - Runnable sender = new Runnable() { - public void run() { - while (working) { - try { - queue.sendMessage(context, msg); - } catch (Exception e) { - e.printStackTrace(); - fail("unexpected exception in sendMessage, ex:" + e); - } - } - } - }; - - Runnable subRemover = new Runnable() { - public void run() { - for (Subscription sub : subs) { - try { - queue.removeSubscription(context, sub); - } catch (Exception e) { - e.printStackTrace(); - fail("unexpected exception in removeSubscription, ex:" + e); - } - } - } - }; - - for (int i=0;i result = executor.submit(subRemover); - result.get(); - working = false; - assertEquals("there are no subscriptions", 0, queue.getDestinationStatistics().getConsumers().getCount()); - - for (SimpleImmediateDispatchSubscription sub : subs) { - assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched)); - } - - } - - private boolean hasSomeLocks(List dispatched) { - boolean hasLock = false; - for (MessageReference mr: dispatched) { - QueueMessageReference qmr = (QueueMessageReference) mr; - if (qmr.getLockOwner() != null) { - hasLock = true; - break; - } - } - return hasLock; - } - - public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner { - - List dispatched = - Collections.synchronizedList(new ArrayList()); - - public void acknowledge(ConnectionContext context, MessageAck ack) - throws Exception { - // TODO Auto-generated method stub - - } - - public void add(MessageReference node) throws Exception { - // immediate dispatch - QueueMessageReference qmr = (QueueMessageReference)node; - qmr.lock(this); - dispatched.add(qmr); - } - - public ConnectionContext getContext() { - // TODO - return null; - } - - public void add(ConnectionContext context, Destination destination) - throws Exception { - // TODO Auto-generated method stub - - } - - public void destroy() { - // TODO Auto-generated method stub - - } - - public void gc() { - // TODO Auto-generated method stub - - } - - public ConsumerInfo getConsumerInfo() { - return info; - } - - public long getDequeueCounter() { - // TODO Auto-generated method stub - return 0; - } - - public long getDispatchedCounter() { - // TODO Auto-generated method stub - return 0; - } - - public int getDispatchedQueueSize() { - // TODO Auto-generated method stub - return 0; - } - - public long getEnqueueCounter() { - // TODO Auto-generated method stub - return 0; - } - - public int getInFlightSize() { - // TODO Auto-generated method stub - return 0; - } - - public int getInFlightUsage() { - // TODO Auto-generated method stub - return 0; - } - - public ObjectName getObjectName() { - // TODO Auto-generated method stub - return null; - } - - public int getPendingQueueSize() { - // TODO Auto-generated method stub - return 0; - } - - public int getPrefetchSize() { - // TODO Auto-generated method stub - return 0; - } - - public String getSelector() { - // TODO Auto-generated method stub - return null; - } - - public boolean isBrowser() { - // TODO Auto-generated method stub - return false; - } - - public boolean isFull() { - // TODO Auto-generated method stub - return false; - } - - public boolean isHighWaterMark() { - // TODO Auto-generated method stub - return false; - } - - public boolean isLowWaterMark() { - // TODO Auto-generated method stub - return false; - } - - public boolean isRecoveryRequired() { - // TODO Auto-generated method stub - return false; - } - - public boolean isSlave() { - // TODO Auto-generated method stub - return false; - } - - public boolean matches(MessageReference node, - MessageEvaluationContext context) throws IOException { - return true; - } - - public boolean matches(ActiveMQDestination destination) { - // TODO Auto-generated method stub - return false; - } - - public void processMessageDispatchNotification( - MessageDispatchNotification mdn) throws Exception { - // TODO Auto-generated method stub - - } - - public Response pullMessage(ConnectionContext context, MessagePull pull) - throws Exception { - // TODO Auto-generated method stub - return null; - } - - public List remove(ConnectionContext context, - Destination destination) throws Exception { - return new ArrayList(dispatched); - } - - public void setObjectName(ObjectName objectName) { - // TODO Auto-generated method stub - - } - - public void setSelector(String selector) - throws InvalidSelectorException, UnsupportedOperationException { - // TODO Auto-generated method stub - - } - - public void updateConsumerPrefetch(int newPrefetch) { - // TODO Auto-generated method stub - - } - - public boolean addRecoveredMessage(ConnectionContext context, - MessageReference message) throws Exception { - // TODO Auto-generated method stub - return false; - } - - public ActiveMQDestination getActiveMQDestination() { - // TODO Auto-generated method stub - return null; - } - - public int getLockPriority() { - // TODO Auto-generated method stub - return 0; - } - - public boolean isLockExclusive() { - // TODO Auto-generated method stub - return false; - } - - public void addDestination(Destination destination) { - } - - public void removeDestination(Destination destination) { - } - - /* (non-Javadoc) - * @see org.apache.activemq.broker.region.Subscription#countBeforeFull() - */ - public int countBeforeFull() { - // TODO Auto-generated method stub - return 10; - } - - } -}