mirror of https://github.com/apache/activemq.git
AMQ-6940 - Reduce memory footprint for inflight statistics
For the TopicSubscription case we can reduce the inflight statistics
memory footprint by not storing the entire message reference for in
flight messages and instead just a subset of the information needed.
(cherry picked from commit f69fd6f002
)
This commit is contained in:
parent
d36412314f
commit
cae382063e
|
@ -32,6 +32,7 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
|
|||
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
|
||||
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
|
||||
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ConsumerControl;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.Message;
|
||||
|
@ -74,7 +75,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
|
||||
//Used for inflight message size calculations
|
||||
protected final Object dispatchLock = new Object();
|
||||
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
|
||||
protected final List<DispatchedNode> dispatched = new ArrayList<>();
|
||||
|
||||
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
|
||||
super(broker, context, info);
|
||||
|
@ -257,7 +258,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
synchronized(dispatchLock) {
|
||||
matched.remove();
|
||||
getSubscriptionStatistics().getDispatched().increment();
|
||||
dispatched.add(node);
|
||||
dispatched.add(new DispatchedNode(node));
|
||||
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
|
||||
node.decrementReferenceCount();
|
||||
}
|
||||
|
@ -383,8 +384,8 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
private void updateStatsOnAck(final MessageAck ack) {
|
||||
synchronized(dispatchLock) {
|
||||
boolean inAckRange = false;
|
||||
List<MessageReference> removeList = new ArrayList<MessageReference>();
|
||||
for (final MessageReference node : dispatched) {
|
||||
List<DispatchedNode> removeList = new ArrayList<>();
|
||||
for (final DispatchedNode node : dispatched) {
|
||||
MessageId messageId = node.getMessageId();
|
||||
if (ack.getFirstMessageId() == null
|
||||
|| ack.getFirstMessageId().equals(messageId)) {
|
||||
|
@ -398,17 +399,21 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
}
|
||||
}
|
||||
|
||||
for (final MessageReference node : removeList) {
|
||||
for (final DispatchedNode node : removeList) {
|
||||
dispatched.remove(node);
|
||||
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
|
||||
getSubscriptionStatistics().getDequeues().increment();
|
||||
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
|
||||
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
|
||||
if (info.isNetworkSubscription()) {
|
||||
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
|
||||
}
|
||||
if (ack.isExpiredAck()) {
|
||||
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
|
||||
|
||||
final Destination destination = node.getDestination();
|
||||
if (destination != null) {
|
||||
destination.getDestinationStatistics().getDequeues().increment();
|
||||
destination.getDestinationStatistics().getInflight().decrement();
|
||||
if (info.isNetworkSubscription()) {
|
||||
destination.getDestinationStatistics().getForwards().increment();
|
||||
}
|
||||
if (ack.isExpiredAck()) {
|
||||
destination.getDestinationStatistics().getExpired().increment();
|
||||
}
|
||||
}
|
||||
if (!ack.isInTransaction()) {
|
||||
contractPrefetchExtension(1);
|
||||
|
@ -648,7 +653,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
|
||||
synchronized(dispatchLock) {
|
||||
getSubscriptionStatistics().getDispatched().increment();
|
||||
dispatched.add(node);
|
||||
dispatched.add(new DispatchedNode(node));
|
||||
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
|
||||
}
|
||||
|
||||
|
@ -759,4 +764,30 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
}
|
||||
}
|
||||
|
||||
private static class DispatchedNode {
|
||||
private final int size;
|
||||
private final MessageId messageId;
|
||||
private final Destination destination;
|
||||
|
||||
public DispatchedNode(final MessageReference node) {
|
||||
super();
|
||||
this.size = node.getSize();
|
||||
this.messageId = node.getMessageId();
|
||||
this.destination = node.getRegionDestination() instanceof Destination ?
|
||||
((Destination)node.getRegionDestination()) : null;
|
||||
}
|
||||
|
||||
public long getSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public MessageId getMessageId() {
|
||||
return messageId;
|
||||
}
|
||||
|
||||
public Destination getDestination() {
|
||||
return destination;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue