mirror of https://github.com/apache/activemq.git
Ensure that the subscriptionByConsumerId map gets cleaned up when a sender is closed.
This commit is contained in:
parent
d25c52ccb2
commit
9bd070a8f6
|
@ -985,11 +985,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
public void onClose() throws Exception {
|
public void onClose() throws Exception {
|
||||||
if (!closed) {
|
if (!closed) {
|
||||||
closed = true;
|
closed = true;
|
||||||
|
sender.setContext(null);
|
||||||
|
subscriptionsByConsumerId.remove(consumerId);
|
||||||
|
|
||||||
AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
|
AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
|
||||||
if (session != null) {
|
if (session != null) {
|
||||||
session.consumers.remove(info.getConsumerId());
|
session.consumers.remove(info.getConsumerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoveInfo removeCommand = new RemoveInfo(consumerId);
|
RemoveInfo removeCommand = new RemoveInfo(consumerId);
|
||||||
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
|
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
|
||||||
sendToActiveMQ(removeCommand, null);
|
sendToActiveMQ(removeCommand, null);
|
||||||
|
|
Loading…
Reference in New Issue