diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java index 8c4db9a139..3eba5d8ce7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -27,6 +27,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ConnectionState; @@ -64,7 +65,7 @@ public class ConnectionContext { private XATransactionId xid; public ConnectionContext() { - this.messageEvaluationContext = new MessageEvaluationContext(); + this.messageEvaluationContext = new NonCachedMessageEvaluationContext(); } public ConnectionContext(MessageEvaluationContext messageEvaluationContext) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 769d796a03..3055b574ed 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -51,7 +51,7 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.Message; import org.apache.activemq.filter.BooleanExpression; -import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageStore; import org.apache.activemq.util.URISupport; @@ -213,7 +213,7 @@ public class DestinationView implements DestinationViewMBean { Message[] messages = destination.browse(); ArrayList c = new ArrayList(); - MessageEvaluationContext ctx = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); ctx.setDestination(destination.getActiveMQDestination()); BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); @@ -256,7 +256,7 @@ public class DestinationView implements DestinationViewMBean { Message[] messages = destination.browse(); ArrayList answer = new ArrayList(); - MessageEvaluationContext ctx = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); ctx.setDestination(destination.getActiveMQDestination()); BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); @@ -297,7 +297,7 @@ public class DestinationView implements DestinationViewMBean { TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" }); TabularDataSupport rc = new TabularDataSupport(tt); - MessageEvaluationContext ctx = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); ctx.setDestination(destination.getActiveMQDestination()); BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index aa2f7b52d6..10841dd6a7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -841,7 +841,7 @@ public abstract class BaseDestination implements Destination { } public ConnectionContext createConnectionContext() { - ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); + ConnectionContext answer = new ConnectionContext(); answer.setBroker(this.broker); answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 20b2bc534d..f23d8172b4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -484,7 +484,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple private void discardExpiredMessage(MessageReference reference) { LOG.debug("Discarding expired message {}", reference); if (reference.isExpired() && broker.isExpired(reference)) { - ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); + ConnectionContext context = new ConnectionContext(); context.setBroker(broker); ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 394cccdf88..7ce0339bd6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -86,7 +86,7 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; @@ -1303,13 +1303,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // for durable subs, suppression via filter leaves dangling acks so we // need to check here and allow the ack irrespective if (sub.getLocalInfo().isDurable()) { - MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext messageEvalContext = new NonCachedMessageEvaluationContext(); messageEvalContext.setMessageReference(md.getMessage()); messageEvalContext.setDestination(md.getDestination()); suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); - //AMQ-6465 - Need to decrement the reference count after checking matches() as - //the call above will increment the reference count by 1 - messageEvalContext.getMessageReference().decrementReferenceCount(); } return suppress; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 0ccf1cb478..8ecfee3c32 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -452,15 +452,19 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination); - queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); int movedSize = MESSAGE_COUNT-3; - assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); + assertEquals("Unexpected number of messages ",movedSize,queueNew.getQueueSize()); // now lets remove them by selector - queue.removeMatchingMessages("counter > 2"); + queueNew.removeMatchingMessages("counter > 2"); - assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); - assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueNew.getQueueSize()); + assertEquals("dest has no memory usage", 0, queueNew.getMemoryPercentUsage()); + assertEquals("dest has 0 memory usage", 0, queueNew.getMemoryUsageByteCount()); + + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); } public void testCopyMessagesBySelector() throws Exception { @@ -478,17 +482,47 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination); - queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + QueueViewMBean queueTwo = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); - assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); + LOG.info("Queue: " + queueViewMBeanName + " now has: " + queueTwo.getQueueSize() + " message(s)"); + assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queueTwo.getQueueSize()); // now lets remove them by selector - queue.removeMatchingMessages("counter > 2"); + queueTwo.removeMatchingMessages("counter > 2"); - assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); - assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueTwo.getQueueSize()); + assertEquals("dest has no memory usage", 0, queueTwo.getMemoryPercentUsage()); + assertEquals("dest has 0 memory usage", 0, queueTwo.getMemoryUsageByteCount()); + + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); } + public void testSelectorBrowseUsage() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + final String someSelectorExp = "JMSType = '22'"; + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + queue.browse(someSelectorExp); + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); + + connection.close(); + connection = connectionFactory.createConnection(); + useConnection(connection); + queue.browseMessages(someSelectorExp); + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); + + connection.close(); + connection = connectionFactory.createConnection(); + useConnection(connection); + queue.browseAsTable(someSelectorExp); + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); + } public void testCopyPurgeCopyBack() throws Exception { connection = connectionFactory.createConnection(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java index c65674de23..66a7af4d94 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java @@ -405,7 +405,9 @@ public class SelectorTest extends TestCase { MessageEvaluationContext context = new MessageEvaluationContext(); context.setMessageReference((org.apache.activemq.command.Message)message); boolean value = selector.matches(context); + context.clear(); assertEquals("Selector for: " + text, expected, value); + assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount()); } protected Message createMessage(String subject) throws JMSException { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java index 5c9a8eecf8..f580b2e8ba 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java @@ -25,7 +25,7 @@ import javax.jms.Message; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.filter.BooleanExpression; -import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.junit.Before; import org.junit.Test; @@ -170,10 +170,11 @@ public class UnknownHandlingSelectorTest { protected void assertSelector(String text, boolean matches) throws JMSException { BooleanExpression selector = SelectorParser.parse(text); assertTrue("Created a valid selector", selector != null); - MessageEvaluationContext context = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext context = new NonCachedMessageEvaluationContext(); context.setMessageReference((org.apache.activemq.command.Message)message); boolean value = selector.matches(context); assertEquals("Selector for: " + text, matches, value); + assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount()); } private static String not(String selector) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java index f69f1b7c69..dbd3f61dee 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java @@ -84,8 +84,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport { Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( destination); - ConnectionContext context = new ConnectionContext( - new NonCachedMessageEvaluationContext()); + ConnectionContext context = new ConnectionContext(); context.setBroker(broker.getBroker()); context.getMessageEvaluationContext().setDestination(destination); @@ -133,8 +132,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport { Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( destination); - ConnectionContext context = new ConnectionContext( - new NonCachedMessageEvaluationContext()); + ConnectionContext context = new ConnectionContext(); context.setBroker(broker.getBroker()); context.getMessageEvaluationContext().setDestination(destination); @@ -179,8 +177,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport { Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( destination); - ConnectionContext context = new ConnectionContext( - new NonCachedMessageEvaluationContext()); + ConnectionContext context = new ConnectionContext(); context.setBroker(broker.getBroker()); context.getMessageEvaluationContext().setDestination(destination);