From 4bcc991d72c6b8bccc764ce2a704324ec9544803 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 19 Jul 2018 16:58:47 -0400 Subject: [PATCH] AMQ-7006 Remove STOMP pending acks after client acknowledge Reworked patch from Avikash Mishra to remove tracked pending acks from a STOMP subscription that has acked. (cherry picked from commit 9abbe826ecc47596fb43fd42e094d403c56b158d) --- .../transport/stomp/ProtocolConverter.java | 42 +++++++++++++++---- .../transport/stomp/StompSubscription.java | 6 +++ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 22e63a1cbb..cb799c6786 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -107,15 +108,15 @@ public class ProtocolConverter { private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); - private final ConcurrentMap resposeHandlers = new ConcurrentHashMap(); - private final ConcurrentMap subscriptionsByConsumerId = new ConcurrentHashMap(); - private final ConcurrentMap subscriptions = new ConcurrentHashMap(); - private final ConcurrentMap tempDestinations = new ConcurrentHashMap(); - private final ConcurrentMap tempDestinationAmqToStompMap = new ConcurrentHashMap(); - private final Map transactions = new ConcurrentHashMap(); + private final ConcurrentMap resposeHandlers = new ConcurrentHashMap<>(); + private final ConcurrentMap subscriptionsByConsumerId = new ConcurrentHashMap<>(); + private final ConcurrentMap subscriptions = new ConcurrentHashMap<>(); + private final ConcurrentMap tempDestinations = new ConcurrentHashMap<>(); + private final ConcurrentMap tempDestinationAmqToStompMap = new ConcurrentHashMap<>(); + private final Map transactions = new ConcurrentHashMap<>(); private final StompTransport stompTransport; - private final ConcurrentMap pedingAcks = new ConcurrentHashMap(); + private final ConcurrentMap pedingAcks = new ConcurrentHashMap<>(); private final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); private final Object commnadIdMutex = new Object(); @@ -299,7 +300,7 @@ public class ProtocolConverter { exception.printStackTrace(stream); stream.close(); - HashMap headers = new HashMap(); + HashMap headers = new HashMap<>(); headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain"); @@ -800,7 +801,7 @@ public class ProtocolConverter { } connected.set(true); - HashMap responseHeaders = new HashMap(); + HashMap responseHeaders = new HashMap<>(); responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); @@ -1038,4 +1039,27 @@ public class ProtocolConverter { return result; } + + /** + * Remove all pending acknowledgement markers that are batched into the single + * client acknowledge operation. + * + * @param subscription + * The STOMP Subscription that has performed a client acknowledge. + * @param msgIdsToRemove + * List of message IDs that are bound to the subscription that has ack'd + */ + protected void afterClientAck(StompSubscription subscription, ArrayList msgIdsToRemove) { + int count = 0; + + for (Map.Entry entry : this.pedingAcks.entrySet()){ + AckEntry actEntry = entry.getValue(); + if (msgIdsToRemove.contains(actEntry.messageId)) { + this.pedingAcks.remove(entry.getKey()); + count++; + } + } + + LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count); + } } diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index dbbe871b07..0d8e308aaf 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.stomp; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -141,6 +142,8 @@ public class StompSubscription { ack.setDestination(consumerInfo.getDestination()); ack.setConsumerId(consumerInfo.getConsumerId()); + final ArrayList acknowledgedMessages = new ArrayList<>(); + if (ackMode == CLIENT_ACK) { if (transactionId == null) { ack.setAckType(MessageAck.STANDARD_ACK_TYPE); @@ -161,6 +164,7 @@ public class StompSubscription { count++; } } else { + acknowledgedMessages.add(id.toString()); iter.remove(); count++; } @@ -175,6 +179,7 @@ public class StompSubscription { ack.setTransactionId(transactionId); } + this.protocolConverter.afterClientAck(this, acknowledgedMessages); } else if (ackMode == INDIVIDUAL_ACK) { ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); ack.setMessageID(msgId); @@ -186,6 +191,7 @@ public class StompSubscription { dispatchedMessage.remove(msgId); } } + return ack; }