mirror of https://github.com/apache/activemq.git
Ensure that pendingAcks map is cleaned up either on a single message ACK / NACK or on TX commit or rollback
This commit is contained in:
parent
7d136de422
commit
52d95ee01c
|
@ -369,7 +369,7 @@ public class ProtocolConverter {
|
||||||
boolean nacked = false;
|
boolean nacked = false;
|
||||||
|
|
||||||
if (ackId != null) {
|
if (ackId != null) {
|
||||||
AckEntry pendingAck = this.pedingAcks.get(ackId);
|
AckEntry pendingAck = this.pedingAcks.remove(ackId);
|
||||||
if (pendingAck != null) {
|
if (pendingAck != null) {
|
||||||
messageId = pendingAck.getMessageId();
|
messageId = pendingAck.getMessageId();
|
||||||
MessageAck ack = pendingAck.onMessageNack(activemqTx);
|
MessageAck ack = pendingAck.onMessageNack(activemqTx);
|
||||||
|
@ -425,8 +425,7 @@ public class ProtocolConverter {
|
||||||
boolean acked = false;
|
boolean acked = false;
|
||||||
|
|
||||||
if (ackId != null) {
|
if (ackId != null) {
|
||||||
|
AckEntry pendingAck = this.pedingAcks.remove(ackId);
|
||||||
AckEntry pendingAck = this.pedingAcks.get(ackId);
|
|
||||||
if (pendingAck != null) {
|
if (pendingAck != null) {
|
||||||
messageId = pendingAck.getMessageId();
|
messageId = pendingAck.getMessageId();
|
||||||
MessageAck ack = pendingAck.onMessageAck(activemqTx);
|
MessageAck ack = pendingAck.onMessageAck(activemqTx);
|
||||||
|
@ -437,7 +436,6 @@ public class ProtocolConverter {
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (subscriptionId != null) {
|
} else if (subscriptionId != null) {
|
||||||
|
|
||||||
StompSubscription sub = this.subscriptions.get(subscriptionId);
|
StompSubscription sub = this.subscriptions.get(subscriptionId);
|
||||||
if (sub != null) {
|
if (sub != null) {
|
||||||
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
|
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
|
||||||
|
@ -446,13 +444,10 @@ public class ProtocolConverter {
|
||||||
acked = true;
|
acked = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// STOMP v1.0: acking with just a message id is very bogus since the same message id
|
// STOMP v1.0: acking with just a message id is very bogus since the same message id
|
||||||
// could have been sent to 2 different subscriptions on the same Stomp connection.
|
// could have been sent to 2 different subscriptions on the same Stomp connection.
|
||||||
// For example, when 2 subs are created on the same topic.
|
// For example, when 2 subs are created on the same topic.
|
||||||
|
|
||||||
for (StompSubscription sub : subscriptionsByConsumerId.values()) {
|
for (StompSubscription sub : subscriptionsByConsumerId.values()) {
|
||||||
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
|
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
|
||||||
if (ack != null) {
|
if (ack != null) {
|
||||||
|
@ -513,6 +508,8 @@ public class ProtocolConverter {
|
||||||
sub.onStompCommit(activemqTx);
|
sub.onStompCommit(activemqTx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pedingAcks.clear();
|
||||||
|
|
||||||
TransactionInfo tx = new TransactionInfo();
|
TransactionInfo tx = new TransactionInfo();
|
||||||
tx.setConnectionId(connectionId);
|
tx.setConnectionId(connectionId);
|
||||||
tx.setTransactionId(activemqTx);
|
tx.setTransactionId(activemqTx);
|
||||||
|
@ -542,6 +539,8 @@ public class ProtocolConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pedingAcks.clear();
|
||||||
|
|
||||||
TransactionInfo tx = new TransactionInfo();
|
TransactionInfo tx = new TransactionInfo();
|
||||||
tx.setConnectionId(connectionId);
|
tx.setConnectionId(connectionId);
|
||||||
tx.setTransactionId(activemqTx);
|
tx.setTransactionId(activemqTx);
|
||||||
|
|
Loading…
Reference in New Issue