From 1b45e3be04db0f382470d435bbd3cce589ae39a3 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 4 Mar 2011 11:31:01 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3193 - fix regression with spring listener test. IndirectMessageRef cannot do equals bc it will break composite destination delivery to a single consumer, possible to break inflight count as ack of one message removes others from dispatched when message ref matches in error. Fix is to ensure message ref is passed about such that there is a single ref per message. rework some internals to pass ref rather than message. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1077886 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/jmx/QueueView.java | 11 ++-- .../region/IndirectMessageReference.java | 11 ---- .../apache/activemq/broker/region/Queue.java | 58 ++++++++++--------- .../activemq/transport/InactivityMonitor.java | 6 +- .../apache/activemq/util/BrokerSupport.java | 1 + .../apache/activemq/broker/jmx/MBeanTest.java | 2 - 6 files changed, 40 insertions(+), 49 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java index 07710ff7b2..2d409b1a93 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java @@ -22,6 +22,7 @@ import javax.jms.JMSException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.util.BrokerSupport; @@ -35,7 +36,8 @@ public class QueueView extends DestinationView implements QueueViewMBean { } public CompositeData getMessage(String messageId) throws OpenDataException { - Message rc = ((Queue)destination).getMessage(messageId); + QueueMessageReference ref = ((Queue)destination).getMessage(messageId); + Message rc = ref.getMessage(); if (rc == null) { return null; } @@ -99,14 +101,13 @@ public class QueueView extends DestinationView implements QueueViewMBean { */ public boolean retryMessage(String messageId) throws Exception { Queue queue = (Queue) destination; - Message rc = queue.getMessage(messageId); + QueueMessageReference ref = queue.getMessage(messageId); + Message rc = ref.getMessage(); if (rc != null) { - rc = rc.copy(); - rc.getMessage().setRedeliveryCounter(0); ActiveMQDestination originalDestination = rc.getOriginalDestination(); if (originalDestination != null) { ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); - return queue.moveMessageTo(context, rc, originalDestination); + return queue.moveMessageTo(context, ref, originalDestination); } else { throw new JMSException("No original destination for message: "+ messageId); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 2e63918dd8..131974c621 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -72,17 +72,6 @@ public class IndirectMessageReference implements QueueMessageReference { return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null); } - @Override - public boolean equals(Object obj) { - return this == obj || (obj instanceof IndirectMessageReference && - message.getMessageId().equals(((IndirectMessageReference)obj).getMessage().getMessageId())); - } - - @Override - public int hashCode() { - return message.hashCode(); - } - public void incrementRedeliveryCounter() { message.incrementRedeliveryCounter(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b0d402a094..f8bc4e739d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -353,6 +353,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { LinkedList browserDispatches = new LinkedList(); public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: " + + getDestinationStatistics().getDequeues().getCount() + ", dispatched: " + + getDestinationStatistics().getDispatched().getCount() + ", inflight: " + + getDestinationStatistics().getInflight().getCount()); + } + super.addSubscription(context, sub); // synchronize with dispatch method so that no new messages are sent // while setting up a subscription. avoid out of order messages, @@ -427,7 +434,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { pagedInPendingDispatchLock.writeLock().lock(); try { if (LOG.isDebugEnabled()) { - LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + ", dispatched: " + getDestinationStatistics().getDispatched().getCount() + ", inflight: " + getDestinationStatistics().getInflight().getCount()); @@ -1058,13 +1065,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - public Message getMessage(String id) { + public QueueMessageReference getMessage(String id) { MessageId msgId = new MessageId(id); pagedInMessagesLock.readLock().lock(); try{ - QueueMessageReference r = this.pagedInMessages.get(msgId); - if (r != null) { - return r.getMessage(); + QueueMessageReference ref = this.pagedInMessages.get(msgId); + if (ref != null) { + return ref; } }finally { pagedInMessagesLock.readLock().unlock(); @@ -1074,15 +1081,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { try { messages.reset(); while (messages.hasNext()) { - MessageReference r = messages.next(); - r.decrementReferenceCount(); - messages.rollback(r.getMessageId()); - if (msgId.equals(r.getMessageId())) { - Message m = r.getMessage(); - if (m != null) { - return m; - } - break; + MessageReference mr = messages.next(); + QueueMessageReference qmr = createMessageReference(mr.getMessage()); + qmr.decrementReferenceCount(); + messages.rollback(qmr.getMessageId()); + if (msgId.equals(qmr.getMessageId())) { + return qmr; } } } finally { @@ -1261,22 +1265,21 @@ public class Queue extends BaseDestination implements Task, UsageListener { /** * Move a message - * + * * @param context * connection context * @param m - * message + * QueueMessageReference * @param dest * ActiveMQDestination * @throws Exception */ - public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception { - QueueMessageReference r = createMessageReference(m); - BrokerSupport.resend(context, m, dest); - removeMessage(context, r); + public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception { + BrokerSupport.resend(context, m.getMessage(), dest); + removeMessage(context, m); messagesLock.writeLock().lock(); try{ - messages.rollback(r.getMessageId()); + messages.rollback(m.getMessageId()); }finally { messagesLock.writeLock().unlock(); } @@ -1317,7 +1320,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int movedCounter = 0; - Set set = new CopyOnWriteArraySet(); + Set set = new CopyOnWriteArraySet(); do { doPageIn(true); pagedInMessagesLock.readLock().lock(); @@ -1326,13 +1329,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { }finally { pagedInMessagesLock.readLock().unlock(); } - List list = new ArrayList(set); - for (MessageReference ref : list) { - IndirectMessageReference r = (IndirectMessageReference) ref; - if (filter.evaluate(context, r)) { + List list = new ArrayList(set); + for (QueueMessageReference ref : list) { + if (filter.evaluate(context, ref)) { // We should only move messages that can be locked. - moveMessageTo(context, ref.getMessage(), dest); - set.remove(r); + moveMessageTo(context, ref, dest); + set.remove(ref); if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 0e10e23d11..1becea8bac 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -106,7 +106,7 @@ public class InactivityMonitor extends TransportFilter { public void run() { long now = System.currentTimeMillis(); if( lastRunTime != 0 && LOG.isDebugEnabled() ) { - LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check."); + LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check."); } lastRunTime = now; @@ -142,7 +142,7 @@ public class InactivityMonitor extends TransportFilter { if (!commandSent.get() && useKeepAlive) { if (LOG.isTraceEnabled()) { - LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); + LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo"); } ASYNC_TASKS.execute(new Runnable() { public void run() { @@ -160,7 +160,7 @@ public class InactivityMonitor extends TransportFilter { }); } else { if (LOG.isTraceEnabled()) { - LOG.trace("Message sent since last write check, resetting flag"); + LOG.trace(this + " message sent since last write check, resetting flag"); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java index 64a8d2db03..f3f3b78544 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java @@ -55,6 +55,7 @@ public final class BrokerSupport { message.setDestination(deadLetterDestination); message.setTransactionId(null); message.setMemoryUsage(null); + message.setRedeliveryCounter(0); boolean originalFlowControl = context.isProducerFlowControl(); try { context.setProducerFlowControl(false); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index dbdcfbc1cd..9cf5363451 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -279,8 +279,6 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertEquals("browse queue size", initialQueueSize, actualCount); assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage()); - assertEquals("dlq still has memory usage", dlqMemUsage, dlq.getMemoryPercentUsage()); - } public void testMoveMessagesBySelector() throws Exception {