ensure consumers get removed from the session on close. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1509291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-08-01 15:25:23 +00:00
parent 5cadb04ad3
commit 0f6561bae0
1 changed files with 10 additions and 4 deletions

View File

@ -179,7 +179,7 @@ class AmqpProtocolConverter {
long nextProducerId = 0; long nextProducerId = 0;
long nextConsumerId = 0; long nextConsumerId = 0;
final LinkedList<ConsumerContext> consumers = new LinkedList<ConsumerContext>(); final Map<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
public AmqpSessionContext(ConnectionId connectionId, long id) { public AmqpSessionContext(ConnectionId connectionId, long id) {
sessionId = new SessionId(connectionId, id); sessionId = new SessionId(connectionId, id);
@ -656,7 +656,7 @@ class AmqpProtocolConverter {
} }
AmqpSessionContext context = (AmqpSessionContext) receiver.getSession().getContext(); AmqpSessionContext context = (AmqpSessionContext) receiver.getSession().getContext();
for (ConsumerContext consumer : context.consumers) { for (ConsumerContext consumer : context.consumers.values()) {
if (operation == TransactionInfo.ROLLBACK) { if (operation == TransactionInfo.ROLLBACK) {
consumer.doRollback(); consumer.doRollback();
} else { } else {
@ -682,7 +682,7 @@ class AmqpProtocolConverter {
} }
}); });
for (ConsumerContext consumer : context.consumers) { for (ConsumerContext consumer : context.consumers.values()) {
if (operation == TransactionInfo.ROLLBACK) { if (operation == TransactionInfo.ROLLBACK) {
consumer.pumpOutbound(); consumer.pumpOutbound();
} }
@ -815,6 +815,12 @@ class AmqpProtocolConverter {
public void onClose() throws Exception { public void onClose() throws Exception {
if (!closed) { if (!closed) {
closed = true; closed = true;
AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
if (session != null) {
session.consumers.remove(info.getConsumerId());
}
sendToActiveMQ(new RemoveInfo(consumerId), null); sendToActiveMQ(new RemoveInfo(consumerId), null);
} }
} }
@ -1188,7 +1194,7 @@ class AmqpProtocolConverter {
subscriptionsByConsumerId.remove(id); subscriptionsByConsumerId.remove(id);
sender.close(); sender.close();
} else { } else {
sessionContext.consumers.add(consumerContext); sessionContext.consumers.put(id, consumerContext);
sender.open(); sender.open();
} }
pumpProtonToSocket(); pumpProtonToSocket();