https://issues.apache.org/jira/browse/AMQ-3193 - fix regression with spring listener test. IndirectMessageRef cannot do equals bc it will break composite destination delivery to a single consumer, possible to break inflight count as ack of one message removes others from dispatched when message ref matches in error.

Fix is to ensure message ref is passed about such that there is a single ref per message. rework some internals to pass ref rather than message.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1077886 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-03-04 11:31:01 +00:00
parent 2f47288da2
commit 1b45e3be04
6 changed files with 40 additions and 49 deletions

View File

@ -22,6 +22,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.BrokerSupport;
@ -35,7 +36,8 @@ public class QueueView extends DestinationView implements QueueViewMBean {
}
public CompositeData getMessage(String messageId) throws OpenDataException {
Message rc = ((Queue)destination).getMessage(messageId);
QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
Message rc = ref.getMessage();
if (rc == null) {
return null;
}
@ -99,14 +101,13 @@ public class QueueView extends DestinationView implements QueueViewMBean {
*/
public boolean retryMessage(String messageId) throws Exception {
Queue queue = (Queue) destination;
Message rc = queue.getMessage(messageId);
QueueMessageReference ref = queue.getMessage(messageId);
Message rc = ref.getMessage();
if (rc != null) {
rc = rc.copy();
rc.getMessage().setRedeliveryCounter(0);
ActiveMQDestination originalDestination = rc.getOriginalDestination();
if (originalDestination != null) {
ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
return queue.moveMessageTo(context, rc, originalDestination);
return queue.moveMessageTo(context, ref, originalDestination);
}
else {
throw new JMSException("No original destination for message: "+ messageId);

View File

@ -72,17 +72,6 @@ public class IndirectMessageReference implements QueueMessageReference {
return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
}
@Override
public boolean equals(Object obj) {
return this == obj || (obj instanceof IndirectMessageReference &&
message.getMessageId().equals(((IndirectMessageReference)obj).getMessage().getMessageId()));
}
@Override
public int hashCode() {
return message.hashCode();
}
public void incrementRedeliveryCounter() {
message.incrementRedeliveryCounter();
}

View File

@ -353,6 +353,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: "
+ getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+ getDestinationStatistics().getDispatched().getCount() + ", inflight: "
+ getDestinationStatistics().getInflight().getCount());
}
super.addSubscription(context, sub);
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
@ -427,7 +434,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
pagedInPendingDispatchLock.writeLock().lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
+ getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+ getDestinationStatistics().getDispatched().getCount() + ", inflight: "
+ getDestinationStatistics().getInflight().getCount());
@ -1058,13 +1065,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
public Message getMessage(String id) {
public QueueMessageReference getMessage(String id) {
MessageId msgId = new MessageId(id);
pagedInMessagesLock.readLock().lock();
try{
QueueMessageReference r = this.pagedInMessages.get(msgId);
if (r != null) {
return r.getMessage();
QueueMessageReference ref = this.pagedInMessages.get(msgId);
if (ref != null) {
return ref;
}
}finally {
pagedInMessagesLock.readLock().unlock();
@ -1074,15 +1081,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
try {
messages.reset();
while (messages.hasNext()) {
MessageReference r = messages.next();
r.decrementReferenceCount();
messages.rollback(r.getMessageId());
if (msgId.equals(r.getMessageId())) {
Message m = r.getMessage();
if (m != null) {
return m;
}
break;
MessageReference mr = messages.next();
QueueMessageReference qmr = createMessageReference(mr.getMessage());
qmr.decrementReferenceCount();
messages.rollback(qmr.getMessageId());
if (msgId.equals(qmr.getMessageId())) {
return qmr;
}
}
} finally {
@ -1261,22 +1265,21 @@ public class Queue extends BaseDestination implements Task, UsageListener {
/**
* Move a message
*
*
* @param context
* connection context
* @param m
* message
* QueueMessageReference
* @param dest
* ActiveMQDestination
* @throws Exception
*/
public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception {
QueueMessageReference r = createMessageReference(m);
BrokerSupport.resend(context, m, dest);
removeMessage(context, r);
public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
BrokerSupport.resend(context, m.getMessage(), dest);
removeMessage(context, m);
messagesLock.writeLock().lock();
try{
messages.rollback(r.getMessageId());
messages.rollback(m.getMessageId());
}finally {
messagesLock.writeLock().unlock();
}
@ -1317,7 +1320,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
Set<QueueMessageReference> set = new CopyOnWriteArraySet<QueueMessageReference>();
do {
doPageIn(true);
pagedInMessagesLock.readLock().lock();
@ -1326,13 +1329,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}finally {
pagedInMessagesLock.readLock().unlock();
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
for (QueueMessageReference ref : list) {
if (filter.evaluate(context, ref)) {
// We should only move messages that can be locked.
moveMessageTo(context, ref.getMessage(), dest);
set.remove(r);
moveMessageTo(context, ref, dest);
set.remove(ref);
if (++movedCounter >= maximumMessages && maximumMessages > 0) {
return movedCounter;
}

View File

@ -106,7 +106,7 @@ public class InactivityMonitor extends TransportFilter {
public void run() {
long now = System.currentTimeMillis();
if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check.");
LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
}
lastRunTime = now;
@ -142,7 +142,7 @@ public class InactivityMonitor extends TransportFilter {
if (!commandSent.get() && useKeepAlive) {
if (LOG.isTraceEnabled()) {
LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
}
ASYNC_TASKS.execute(new Runnable() {
public void run() {
@ -160,7 +160,7 @@ public class InactivityMonitor extends TransportFilter {
});
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message sent since last write check, resetting flag");
LOG.trace(this + " message sent since last write check, resetting flag");
}
}

View File

@ -55,6 +55,7 @@ public final class BrokerSupport {
message.setDestination(deadLetterDestination);
message.setTransactionId(null);
message.setMemoryUsage(null);
message.setRedeliveryCounter(0);
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);

View File

@ -279,8 +279,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("browse queue size", initialQueueSize, actualCount);
assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
assertEquals("dlq still has memory usage", dlqMemUsage, dlq.getMemoryPercentUsage());
}
public void testMoveMessagesBySelector() throws Exception {