Applying AMQ-1957.. Thanks for the patch.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@702152 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-10-06 14:11:51 +00:00
parent 44953e41ac
commit 3fe7760e4a
4 changed files with 67 additions and 389 deletions

View File

@ -88,7 +88,7 @@ public class IndirectMessageReference implements QueueMessageReference {
public boolean lock(LockOwner subscription) {
synchronized (this) {
if (dropped || (lockOwner != null && lockOwner != subscription)) {
if (dropped || lockOwner != null) {
return false;
}
lockOwner = subscription;

View File

@ -157,9 +157,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
while (pending.hasNext()) {
MessageReference node = pending.next();
if (node.getMessageId().equals(mdn.getMessageId())) {
pending.remove();
createMessageDispatch(node, node.getMessage());
// Synchronize between dispatched list and removal of messages from pending list
// related to remove subscription action
synchronized(dispatchLock) {
pending.remove();
createMessageDispatch(node, node.getMessage());
dispatched.add(node);
}
return;
@ -532,11 +534,18 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
List<MessageReference> rc = new ArrayList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
for (MessageReference r : dispatched) {
if( r.getRegionDestination() == destination ) {
rc.add((QueueMessageReference)r);
}
// Synchronized to DispatchLock
synchronized(dispatchLock) {
for (MessageReference r : dispatched) {
if( r.getRegionDestination() == destination) {
rc.add((QueueMessageReference)r);
}
}
}
// TODO Dispatched messages should be decremented from Inflight stat
// Here is a potential problem concerning Inflight stat:
// Messages not already committed or rolled back may not be removed from dispatched list at the moment
// Except if each commit or rollback callback action comes before remove of subscriber.
rc.addAll(pending.remove(context, destination));
}
return rc;
@ -559,19 +568,23 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
break;
}
pending.remove();
if( !isDropped(node) && canDispatch(node)) {
// Synchronize between dispatched list and remove of messageg from pending list
// related to remove subscription action
synchronized(dispatchLock) {
pending.remove();
if( !isDropped(node) && canDispatch(node)) {
// Message may have been sitting in the pending
// list a while waiting for the consumer to ak the message.
if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
//increment number to dispatch
numberToDispatch++;
node.getRegionDestination().messageExpired(context, this, node);
continue;
// Message may have been sitting in the pending
// list a while waiting for the consumer to ak the message.
if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
//increment number to dispatch
numberToDispatch++;
node.getRegionDestination().messageExpired(context, this, node);
continue;
}
dispatch(node);
count++;
}
dispatch(node);
count++;
}
}
}else {
@ -596,10 +609,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
final Message message = node.getMessage();
if (message == null) {
return false;
}
// Make sure we can dispatch a message.
if (canDispatch(node) && !isSlave()) {
}
// No reentrant lock - Patch needed to IndirectMessageReference on method lock
if (!isSlave()) {
MessageDispatch md = createMessageDispatch(node, message);
// NULL messages don't count... they don't get Acked.
if (node != QueueMessageReference.NULL_MESSAGE) {

View File

@ -959,6 +959,10 @@ public class Queue extends BaseDestination implements Task {
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
if (rd.subscription.matches(node, msgContext)) {
// Log showing message dispatching
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getQualifiedName() + " - Recovery - Message pushed '" + node.hashCode() + " - " + node + "' to subscription: '" + rd.subscription + "'");
}
rd.subscription.add(node);
} else {
// make sure it gets queued for dispatched again
@ -1063,23 +1067,26 @@ public class Queue extends BaseDestination implements Task {
protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
reference.setAcked(true);
// This sends the ack the the journal..
acknowledge(context, sub, ack, reference);
if (!ack.isInTransaction()) {
acknowledge(context, sub, ack, reference);
dropMessage(reference);
wakeup();
} else {
context.getTransaction().addSynchronization(new Synchronization() {
try {
acknowledge(context, sub, ack, reference);
} finally {
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
dropMessage(reference);
wakeup();
}
public void afterCommit() throws Exception {
dropMessage(reference);
wakeup();
}
public void afterRollback() throws Exception {
reference.setAcked(false);
}
});
public void afterRollback() throws Exception {
reference.setAcked(false);
}
});
}
}
}
@ -1153,18 +1160,11 @@ public class Queue extends BaseDestination implements Task {
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
List<QueueMessageReference> result = null;
List<QueueMessageReference> resultList = null;
dispatchLock.lock();
try{
int toPageIn = 0;
if (force) {
toPageIn = getMaxPageSize();
} else {
toPageIn = (getMaxPageSize() + (int) destinationStatistics
.getInflight().getCount())
- pagedInMessages.size();
toPageIn = Math.min(toPageIn, getMaxPageSize());
}
int toPageIn = getMaxPageSize() + Math.max(0, (int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
toPageIn = Math.max(0, Math.min(toPageIn, getMaxPageSize()));
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@ -1193,16 +1193,24 @@ public class Queue extends BaseDestination implements Task {
messages.release();
}
}
// Only add new messages, not already pagedIn to avoid multiple dispatch attempts
synchronized (pagedInMessages) {
for(QueueMessageReference ref:result) {
pagedInMessages.put(ref.getMessageId(), ref);
resultList = new ArrayList<QueueMessageReference>(result.size());
for(QueueMessageReference ref : result) {
if (!pagedInMessages.containsKey(ref.getMessageId())) {
pagedInMessages.put(ref.getMessageId(), ref);
resultList.add(ref);
}
}
}
} else {
// Avoid return null list, if condition is not validated
resultList = new ArrayList<QueueMessageReference>();
}
}finally {
dispatchLock.unlock();
}
return result;
return resultList;
}
private void doDispatch(List<QueueMessageReference> list) throws Exception {

View File

@ -1,343 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
public class SubscriptionAddRemoveQueueTest extends TestCase {
Queue queue;
Message msg = new ActiveMQMessage();
ConsumerInfo info = new ConsumerInfo();
List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
ConnectionContext context = new ConnectionContext();
int numSubscriptions = 1000;
boolean working = true;
int senders = 20;
@Override
public void setUp() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.start();
ActiveMQDestination destination = new ActiveMQQueue("TEST");
DestinationStatistics parentStats = new DestinationStatistics();
parentStats.setEnabled(true);
TaskRunnerFactory taskFactory = null;
MessageStore store = null;
msg.setDestination(destination);
info.setDestination(destination);
info.setPrefetchSize(100);
queue = new Queue(brokerService, destination, store, parentStats, taskFactory);
queue.initialize();
}
public void testNoDispatchToRemovedConsumers() throws Exception {
Runnable sender = new Runnable() {
public void run() {
while (working) {
try {
queue.sendMessage(context, msg);
} catch (Exception e) {
e.printStackTrace();
fail("unexpected exception in sendMessage, ex:" + e);
}
}
}
};
Runnable subRemover = new Runnable() {
public void run() {
for (Subscription sub : subs) {
try {
queue.removeSubscription(context, sub);
} catch (Exception e) {
e.printStackTrace();
fail("unexpected exception in removeSubscription, ex:" + e);
}
}
}
};
for (int i=0;i<numSubscriptions; i++) {
SimpleImmediateDispatchSubscription sub = new SimpleImmediateDispatchSubscription();
subs.add(sub);
queue.addSubscription(context, sub);
}
assertEquals("there are X subscriptions", numSubscriptions, queue.getDestinationStatistics().getConsumers().getCount());
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0; i<senders ; i++) {
executor.submit(sender);
}
Thread.sleep(1000);
for (SimpleImmediateDispatchSubscription sub : subs) {
assertTrue("There are some locked messages in the subscription", hasSomeLocks(sub.dispatched));
}
Future<?> result = executor.submit(subRemover);
result.get();
working = false;
assertEquals("there are no subscriptions", 0, queue.getDestinationStatistics().getConsumers().getCount());
for (SimpleImmediateDispatchSubscription sub : subs) {
assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched));
}
}
private boolean hasSomeLocks(List<MessageReference> dispatched) {
boolean hasLock = false;
for (MessageReference mr: dispatched) {
QueueMessageReference qmr = (QueueMessageReference) mr;
if (qmr.getLockOwner() != null) {
hasLock = true;
break;
}
}
return hasLock;
}
public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
List<MessageReference> dispatched =
Collections.synchronizedList(new ArrayList<MessageReference>());
public void acknowledge(ConnectionContext context, MessageAck ack)
throws Exception {
// TODO Auto-generated method stub
}
public void add(MessageReference node) throws Exception {
// immediate dispatch
QueueMessageReference qmr = (QueueMessageReference)node;
qmr.lock(this);
dispatched.add(qmr);
}
public ConnectionContext getContext() {
// TODO
return null;
}
public void add(ConnectionContext context, Destination destination)
throws Exception {
// TODO Auto-generated method stub
}
public void destroy() {
// TODO Auto-generated method stub
}
public void gc() {
// TODO Auto-generated method stub
}
public ConsumerInfo getConsumerInfo() {
return info;
}
public long getDequeueCounter() {
// TODO Auto-generated method stub
return 0;
}
public long getDispatchedCounter() {
// TODO Auto-generated method stub
return 0;
}
public int getDispatchedQueueSize() {
// TODO Auto-generated method stub
return 0;
}
public long getEnqueueCounter() {
// TODO Auto-generated method stub
return 0;
}
public int getInFlightSize() {
// TODO Auto-generated method stub
return 0;
}
public int getInFlightUsage() {
// TODO Auto-generated method stub
return 0;
}
public ObjectName getObjectName() {
// TODO Auto-generated method stub
return null;
}
public int getPendingQueueSize() {
// TODO Auto-generated method stub
return 0;
}
public int getPrefetchSize() {
// TODO Auto-generated method stub
return 0;
}
public String getSelector() {
// TODO Auto-generated method stub
return null;
}
public boolean isBrowser() {
// TODO Auto-generated method stub
return false;
}
public boolean isFull() {
// TODO Auto-generated method stub
return false;
}
public boolean isHighWaterMark() {
// TODO Auto-generated method stub
return false;
}
public boolean isLowWaterMark() {
// TODO Auto-generated method stub
return false;
}
public boolean isRecoveryRequired() {
// TODO Auto-generated method stub
return false;
}
public boolean isSlave() {
// TODO Auto-generated method stub
return false;
}
public boolean matches(MessageReference node,
MessageEvaluationContext context) throws IOException {
return true;
}
public boolean matches(ActiveMQDestination destination) {
// TODO Auto-generated method stub
return false;
}
public void processMessageDispatchNotification(
MessageDispatchNotification mdn) throws Exception {
// TODO Auto-generated method stub
}
public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
// TODO Auto-generated method stub
return null;
}
public List<MessageReference> remove(ConnectionContext context,
Destination destination) throws Exception {
return new ArrayList<MessageReference>(dispatched);
}
public void setObjectName(ObjectName objectName) {
// TODO Auto-generated method stub
}
public void setSelector(String selector)
throws InvalidSelectorException, UnsupportedOperationException {
// TODO Auto-generated method stub
}
public void updateConsumerPrefetch(int newPrefetch) {
// TODO Auto-generated method stub
}
public boolean addRecoveredMessage(ConnectionContext context,
MessageReference message) throws Exception {
// TODO Auto-generated method stub
return false;
}
public ActiveMQDestination getActiveMQDestination() {
// TODO Auto-generated method stub
return null;
}
public int getLockPriority() {
// TODO Auto-generated method stub
return 0;
}
public boolean isLockExclusive() {
// TODO Auto-generated method stub
return false;
}
public void addDestination(Destination destination) {
}
public void removeDestination(Destination destination) {
}
/* (non-Javadoc)
* @see org.apache.activemq.broker.region.Subscription#countBeforeFull()
*/
public int countBeforeFull() {
// TODO Auto-generated method stub
return 10;
}
}
}