mirror of https://github.com/apache/activemq.git
AMQ-7006 Remove STOMP pending acks after client acknowledge
Reworked patch from Avikash Mishra to remove tracked pending acks from a STOMP subscription that has acked.
This commit is contained in:
parent
d34967e019
commit
9abbe826ec
|
@ -22,6 +22,7 @@ import java.io.InputStream;
|
|||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
@ -107,15 +108,15 @@ public class ProtocolConverter {
|
|||
private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
|
||||
private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
|
||||
|
||||
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
|
||||
private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
|
||||
private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
|
||||
private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
|
||||
private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
|
||||
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
|
||||
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<>();
|
||||
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>();
|
||||
private final StompTransport stompTransport;
|
||||
|
||||
private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
|
||||
private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>();
|
||||
private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
|
||||
|
||||
private final Object commnadIdMutex = new Object();
|
||||
|
@ -299,7 +300,7 @@ public class ProtocolConverter {
|
|||
exception.printStackTrace(stream);
|
||||
stream.close();
|
||||
|
||||
HashMap<String, String> headers = new HashMap<String, String>();
|
||||
HashMap<String, String> headers = new HashMap<>();
|
||||
headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
|
||||
headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
|
||||
|
||||
|
@ -800,7 +801,7 @@ public class ProtocolConverter {
|
|||
}
|
||||
|
||||
connected.set(true);
|
||||
HashMap<String, String> responseHeaders = new HashMap<String, String>();
|
||||
HashMap<String, String> responseHeaders = new HashMap<>();
|
||||
|
||||
responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
|
||||
String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
|
||||
|
@ -1038,4 +1039,27 @@ public class ProtocolConverter {
|
|||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all pending acknowledgement markers that are batched into the single
|
||||
* client acknowledge operation.
|
||||
*
|
||||
* @param subscription
|
||||
* The STOMP Subscription that has performed a client acknowledge.
|
||||
* @param msgIdsToRemove
|
||||
* List of message IDs that are bound to the subscription that has ack'd
|
||||
*/
|
||||
protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) {
|
||||
int count = 0;
|
||||
|
||||
for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){
|
||||
AckEntry actEntry = entry.getValue();
|
||||
if (msgIdsToRemove.contains(actEntry.messageId)) {
|
||||
this.pedingAcks.remove(entry.getKey());
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.transport.stomp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
|
@ -141,6 +142,8 @@ public class StompSubscription {
|
|||
ack.setDestination(consumerInfo.getDestination());
|
||||
ack.setConsumerId(consumerInfo.getConsumerId());
|
||||
|
||||
final ArrayList<String> acknowledgedMessages = new ArrayList<>();
|
||||
|
||||
if (ackMode == CLIENT_ACK) {
|
||||
if (transactionId == null) {
|
||||
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
||||
|
@ -161,6 +164,7 @@ public class StompSubscription {
|
|||
count++;
|
||||
}
|
||||
} else {
|
||||
acknowledgedMessages.add(id.toString());
|
||||
iter.remove();
|
||||
count++;
|
||||
}
|
||||
|
@ -175,6 +179,7 @@ public class StompSubscription {
|
|||
ack.setTransactionId(transactionId);
|
||||
}
|
||||
|
||||
this.protocolConverter.afterClientAck(this, acknowledgedMessages);
|
||||
} else if (ackMode == INDIVIDUAL_ACK) {
|
||||
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
|
||||
ack.setMessageID(msgId);
|
||||
|
@ -186,6 +191,7 @@ public class StompSubscription {
|
|||
dispatchedMessage.remove(msgId);
|
||||
}
|
||||
}
|
||||
|
||||
return ack;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue