AMQ-7006 Remove STOMP pending acks after client acknowledge

Reworked patch from Avikash Mishra to remove tracked pending acks from a
STOMP subscription that has acked.
(cherry picked from commit 9abbe826ec)
This commit is contained in:
Timothy Bish 2018-07-19 16:58:47 -04:00
parent 63779e2f78
commit 4bcc991d72
2 changed files with 39 additions and 9 deletions

View File

@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -107,15 +108,15 @@ public class ProtocolConverter {
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<>();
private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<>();
private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>(); private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<>();
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>();
private final StompTransport stompTransport; private final StompTransport stompTransport;
private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>(); private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>();
private final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
private final Object commnadIdMutex = new Object(); private final Object commnadIdMutex = new Object();
@ -299,7 +300,7 @@ public class ProtocolConverter {
exception.printStackTrace(stream); exception.printStackTrace(stream);
stream.close(); stream.close();
HashMap<String, String> headers = new HashMap<String, String>(); HashMap<String, String> headers = new HashMap<>();
headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain"); headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
@ -800,7 +801,7 @@ public class ProtocolConverter {
} }
connected.set(true); connected.set(true);
HashMap<String, String> responseHeaders = new HashMap<String, String>(); HashMap<String, String> responseHeaders = new HashMap<>();
responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
@ -1038,4 +1039,27 @@ public class ProtocolConverter {
return result; return result;
} }
/**
* Remove all pending acknowledgement markers that are batched into the single
* client acknowledge operation.
*
* @param subscription
* The STOMP Subscription that has performed a client acknowledge.
* @param msgIdsToRemove
* List of message IDs that are bound to the subscription that has ack'd
*/
protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) {
int count = 0;
for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){
AckEntry actEntry = entry.getValue();
if (msgIdsToRemove.contains(actEntry.messageId)) {
this.pedingAcks.remove(entry.getKey());
count++;
}
}
LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count);
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.stomp; package org.apache.activemq.transport.stomp;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
@ -141,6 +142,8 @@ public class StompSubscription {
ack.setDestination(consumerInfo.getDestination()); ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId()); ack.setConsumerId(consumerInfo.getConsumerId());
final ArrayList<String> acknowledgedMessages = new ArrayList<>();
if (ackMode == CLIENT_ACK) { if (ackMode == CLIENT_ACK) {
if (transactionId == null) { if (transactionId == null) {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
@ -161,6 +164,7 @@ public class StompSubscription {
count++; count++;
} }
} else { } else {
acknowledgedMessages.add(id.toString());
iter.remove(); iter.remove();
count++; count++;
} }
@ -175,6 +179,7 @@ public class StompSubscription {
ack.setTransactionId(transactionId); ack.setTransactionId(transactionId);
} }
this.protocolConverter.afterClientAck(this, acknowledgedMessages);
} else if (ackMode == INDIVIDUAL_ACK) { } else if (ackMode == INDIVIDUAL_ACK) {
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageID(msgId); ack.setMessageID(msgId);
@ -186,6 +191,7 @@ public class StompSubscription {
dispatchedMessage.remove(msgId); dispatchedMessage.remove(msgId);
} }
} }
return ack; return ack;
} }