AMQ-7035 - use NonCachedMessageEvaluationContext in place of MessageEvaluationContext to avoid unnecessary reference count management and subsequent leaks. Rework AMQ-6465 with additional JMX related tests

(cherry picked from commit 50d27e7e54)
This commit is contained in:
gtully 2018-08-15 16:21:57 +01:00 committed by Timothy Bish
parent d1746e4ade
commit 4a99103e70
9 changed files with 63 additions and 31 deletions

View File

@ -27,6 +27,7 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ConnectionState;
@ -64,7 +65,7 @@ public class ConnectionContext {
private XATransactionId xid;
public ConnectionContext() {
this.messageEvaluationContext = new MessageEvaluationContext();
this.messageEvaluationContext = new NonCachedMessageEvaluationContext();
}
public ConnectionContext(MessageEvaluationContext messageEvaluationContext) {

View File

@ -51,7 +51,7 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
@ -213,7 +213,7 @@ public class DestinationView implements DestinationViewMBean {
Message[] messages = destination.browse();
ArrayList<CompositeData> c = new ArrayList<CompositeData>();
MessageEvaluationContext ctx = new MessageEvaluationContext();
NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
ctx.setDestination(destination.getActiveMQDestination());
BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
@ -256,7 +256,7 @@ public class DestinationView implements DestinationViewMBean {
Message[] messages = destination.browse();
ArrayList<Object> answer = new ArrayList<Object>();
MessageEvaluationContext ctx = new MessageEvaluationContext();
NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
ctx.setDestination(destination.getActiveMQDestination());
BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
@ -297,7 +297,7 @@ public class DestinationView implements DestinationViewMBean {
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
TabularDataSupport rc = new TabularDataSupport(tt);
MessageEvaluationContext ctx = new MessageEvaluationContext();
NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
ctx.setDestination(destination.getActiveMQDestination());
BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);

View File

@ -841,7 +841,7 @@ public abstract class BaseDestination implements Destination {
}
public ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
ConnectionContext answer = new ConnectionContext();
answer.setBroker(this.broker);
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);

View File

@ -484,7 +484,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private void discardExpiredMessage(MessageReference reference) {
LOG.debug("Discarding expired message {}", reference);
if (reference.isExpired() && broker.isExpired(reference)) {
ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
}

View File

@ -86,7 +86,7 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
@ -1303,13 +1303,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// for durable subs, suppression via filter leaves dangling acks so we
// need to check here and allow the ack irrespective
if (sub.getLocalInfo().isDurable()) {
MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
NonCachedMessageEvaluationContext messageEvalContext = new NonCachedMessageEvaluationContext();
messageEvalContext.setMessageReference(md.getMessage());
messageEvalContext.setDestination(md.getDestination());
suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
//AMQ-6465 - Need to decrement the reference count after checking matches() as
//the call above will increment the reference count by 1
messageEvalContext.getMessageReference().decrementReferenceCount();
}
return suppress;
}

View File

@ -452,15 +452,19 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
int movedSize = MESSAGE_COUNT-3;
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
assertEquals("Unexpected number of messages ",movedSize,queueNew.getQueueSize());
// now lets remove them by selector
queue.removeMatchingMessages("counter > 2");
queueNew.removeMatchingMessages("counter > 2");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueNew.getQueueSize());
assertEquals("dest has no memory usage", 0, queueNew.getMemoryPercentUsage());
assertEquals("dest has 0 memory usage", 0, queueNew.getMemoryUsageByteCount());
queue.purge();
assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
}
public void testCopyMessagesBySelector() throws Exception {
@ -478,17 +482,47 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
QueueViewMBean queueTwo = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
LOG.info("Queue: " + queueViewMBeanName + " now has: " + queueTwo.getQueueSize() + " message(s)");
assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queueTwo.getQueueSize());
// now lets remove them by selector
queue.removeMatchingMessages("counter > 2");
queueTwo.removeMatchingMessages("counter > 2");
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueTwo.getQueueSize());
assertEquals("dest has no memory usage", 0, queueTwo.getMemoryPercentUsage());
assertEquals("dest has 0 memory usage", 0, queueTwo.getMemoryUsageByteCount());
queue.purge();
assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
}
public void testSelectorBrowseUsage() throws Exception {
connection = connectionFactory.createConnection();
useConnection(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
final String someSelectorExp = "JMSType = '22'";
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
queue.browse(someSelectorExp);
queue.purge();
assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
connection.close();
connection = connectionFactory.createConnection();
useConnection(connection);
queue.browseMessages(someSelectorExp);
queue.purge();
assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
connection.close();
connection = connectionFactory.createConnection();
useConnection(connection);
queue.browseAsTable(someSelectorExp);
queue.purge();
assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
}
public void testCopyPurgeCopyBack() throws Exception {
connection = connectionFactory.createConnection();

View File

@ -405,7 +405,9 @@ public class SelectorTest extends TestCase {
MessageEvaluationContext context = new MessageEvaluationContext();
context.setMessageReference((org.apache.activemq.command.Message)message);
boolean value = selector.matches(context);
context.clear();
assertEquals("Selector for: " + text, expected, value);
assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount());
}
protected Message createMessage(String subject) throws JMSException {

View File

@ -25,7 +25,7 @@ import javax.jms.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.junit.Before;
import org.junit.Test;
@ -170,10 +170,11 @@ public class UnknownHandlingSelectorTest {
protected void assertSelector(String text, boolean matches) throws JMSException {
BooleanExpression selector = SelectorParser.parse(text);
assertTrue("Created a valid selector", selector != null);
MessageEvaluationContext context = new MessageEvaluationContext();
NonCachedMessageEvaluationContext context = new NonCachedMessageEvaluationContext();
context.setMessageReference((org.apache.activemq.command.Message)message);
boolean value = selector.matches(context);
assertEquals("Selector for: " + text, matches, value);
assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount());
}
private static String not(String selector) {

View File

@ -84,8 +84,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
destination);
ConnectionContext context = new ConnectionContext(
new NonCachedMessageEvaluationContext());
ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);
@ -133,8 +132,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
destination);
ConnectionContext context = new ConnectionContext(
new NonCachedMessageEvaluationContext());
ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);
@ -179,8 +177,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
destination);
ConnectionContext context = new ConnectionContext(
new NonCachedMessageEvaluationContext());
ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);