AMQ-6940 - Add flag to disable TopicSubscription in flight stats

To save memory usage in some use cases add a new flag to PolicyEntry
called useTopicSubscriptionInflightStats to allow disabling the
inflight stats

(cherry picked from commit 65b0f2ad0d)
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-03-29 13:23:33 -04:00
parent cae382063e
commit 21a594c8e8
6 changed files with 143 additions and 52 deletions

View File

@ -32,7 +32,6 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@ -72,6 +71,7 @@ public class TopicSubscription extends AbstractSubscription {
protected ActiveMQMessageAudit audit;
protected boolean active = false;
protected boolean discarding = false;
private boolean useTopicSubscriptionInflightStats = true;
//Used for inflight message size calculations
protected final Object dispatchLock = new Object();
@ -258,8 +258,10 @@ public class TopicSubscription extends AbstractSubscription {
synchronized(dispatchLock) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
if (isUseTopicSubscriptionInflightStats()) {
dispatched.add(new DispatchedNode(node));
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
node.decrementReferenceCount();
}
break;
@ -382,6 +384,8 @@ public class TopicSubscription extends AbstractSubscription {
* @param ack
*/
private void updateStatsOnAck(final MessageAck ack) {
//Allow disabling inflight stats to save memory usage
if (isUseTopicSubscriptionInflightStats()) {
synchronized(dispatchLock) {
boolean inAckRange = false;
List<DispatchedNode> removeList = new ArrayList<>();
@ -402,24 +406,34 @@ public class TopicSubscription extends AbstractSubscription {
for (final DispatchedNode node : removeList) {
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
getSubscriptionStatistics().getDequeues().increment();
final Destination destination = node.getDestination();
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
destination.getDestinationStatistics().getInflight().decrement();
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().increment();
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().increment();
}
}
incrementStatsOnAck(destination, ack, 1);
if (!ack.isInTransaction()) {
contractPrefetchExtension(1);
}
}
}
} else {
if (singleDestination && destination != null) {
incrementStatsOnAck(destination, ack, ack.getMessageCount());
}
if (!ack.isInTransaction()) {
contractPrefetchExtension(ack.getMessageCount());
}
}
}
private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
getSubscriptionStatistics().getDequeues().add(count);
destination.getDestinationStatistics().getDequeues().add(count);
destination.getDestinationStatistics().getInflight().subtract(count);
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(count);
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
}
}
@Override
@ -653,9 +667,11 @@ public class TopicSubscription extends AbstractSubscription {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
synchronized(dispatchLock) {
getSubscriptionStatistics().getDispatched().increment();
if (isUseTopicSubscriptionInflightStats()) {
dispatched.add(new DispatchedNode(node));
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
}
// Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
@ -764,6 +780,14 @@ public class TopicSubscription extends AbstractSubscription {
}
}
public boolean isUseTopicSubscriptionInflightStats() {
return useTopicSubscriptionInflightStats;
}
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}
private static class DispatchedNode {
private final int size;
private final MessageId messageId;

View File

@ -103,6 +103,7 @@ public class PolicyEntry extends DestinationMapEntry {
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;
/*
* percentage of in-flight messages above which optimize message store is disabled
@ -315,6 +316,7 @@ public class PolicyEntry extends DestinationMapEntry {
configurePrefetch(subscription);
subscription.setUsePrefetchExtension(isUsePrefetchExtension());
subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
subscription.setUseTopicSubscriptionInflightStats(isUseTopicSubscriptionInflightStats());
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
@ -1100,4 +1102,12 @@ public class PolicyEntry extends DestinationMapEntry {
public String toString() {
return "PolicyEntry [" + destination + "]";
}
}
public boolean isUseTopicSubscriptionInflightStats() {
return useTopicSubscriptionInflightStats;
}
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}

View File

@ -38,6 +38,8 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
import org.junit.After;
@ -59,6 +61,7 @@ public abstract class AbstractInflightMessageSizeTest {
protected Destination amqDestination;
protected MessageConsumer consumer;
protected int prefetch = 100;
protected boolean useTopicSubscriptionInflightStats;
final protected int ackType;
final protected boolean optimizeAcknowledge;
final protected String destName = "testDest";
@ -66,20 +69,29 @@ public abstract class AbstractInflightMessageSizeTest {
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ActiveMQSession.SESSION_TRANSACTED, true},
{ActiveMQSession.AUTO_ACKNOWLEDGE, true},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, true},
{ActiveMQSession.SESSION_TRANSACTED, false},
{ActiveMQSession.AUTO_ACKNOWLEDGE, false},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, false}
{ActiveMQSession.SESSION_TRANSACTED, true, true},
{ActiveMQSession.AUTO_ACKNOWLEDGE, true, true},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true, true},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, true, true},
{ActiveMQSession.SESSION_TRANSACTED, false, true},
{ActiveMQSession.AUTO_ACKNOWLEDGE, false, true},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false, true},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, false, true},
{ActiveMQSession.SESSION_TRANSACTED, true, false},
{ActiveMQSession.AUTO_ACKNOWLEDGE, true, false},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true, false},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, true, false},
{ActiveMQSession.SESSION_TRANSACTED, false, false},
{ActiveMQSession.AUTO_ACKNOWLEDGE, false, false},
{ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false, false},
{ActiveMQSession.CLIENT_ACKNOWLEDGE, false, false}
});
}
public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) {
this.ackType = ackType;
this.optimizeAcknowledge = optimizeAcknowledge;
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}
@Before
@ -88,6 +100,12 @@ public abstract class AbstractInflightMessageSizeTest {
brokerService.setDeleteAllMessagesOnStartup(true);
TransportConnector tcp = brokerService
.addConnector("tcp://localhost:0");
PolicyEntry policy = new PolicyEntry();
policy.setUseTopicSubscriptionInflightStats(useTopicSubscriptionInflightStats);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(pMap);
brokerService.start();
//used to test optimizeAcknowledge works
String optAckString = optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : "";
@ -129,6 +147,8 @@ public abstract class AbstractInflightMessageSizeTest {
*/
@Test(timeout=15000)
public void testInflightMessageSize() throws Exception {
Assume.assumeTrue(useTopicSubscriptionInflightStats);
final long size = sendMessages(10);
assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
@ -155,6 +175,8 @@ public abstract class AbstractInflightMessageSizeTest {
*/
@Test(timeout=15000)
public void testInflightMessageSizePrefetchFilled() throws Exception {
Assume.assumeTrue(useTopicSubscriptionInflightStats);
final long size = sendMessages(prefetch);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@ -187,6 +209,8 @@ public abstract class AbstractInflightMessageSizeTest {
*/
@Test(timeout=15000)
public void testInflightMessageSizePrefetchNotFilled() throws Exception {
Assume.assumeTrue(useTopicSubscriptionInflightStats);
final long size = sendMessages(prefetch - 10);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@ -227,6 +251,7 @@ public abstract class AbstractInflightMessageSizeTest {
*/
@Test(timeout=15000)
public void testInflightMessageSizeRollback() throws Exception {
Assume.assumeTrue(useTopicSubscriptionInflightStats);
Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED);
final long size = sendMessages(10);

View File

@ -34,8 +34,9 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge,
boolean useTopicSubscriptionInflightStats) {
super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
}
@Override

View File

@ -34,8 +34,9 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge,
boolean useTopicSubscriptionInflightStats) {
super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.statistics;
import static org.junit.Assert.assertTrue;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@ -23,6 +25,9 @@ import javax.jms.MessageConsumer;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -33,8 +38,8 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
super(ackType, optimizeAcknowledge);
public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) {
super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
}
@Override
@ -57,4 +62,29 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe
return new ActiveMQTopic(destName);
}
@Test(timeout=15000)
public void testInflightMessageSizeDisabled() throws Exception {
Assume.assumeFalse(useTopicSubscriptionInflightStats);
sendMessages(10);
Thread.sleep(1000);
assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
receiveMessages(10);
Thread.sleep(1000);
assertTrue("Inflight message size should still be 0", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getSubscription().getInFlightMessageSize() == 0;
}
}));
}
}