diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index fe96eab3ab..d3fefb39a2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -179,7 +179,7 @@ class AmqpProtocolConverter { long nextProducerId = 0; long nextConsumerId = 0; - final LinkedList consumers = new LinkedList(); + final Map consumers = new HashMap(); public AmqpSessionContext(ConnectionId connectionId, long id) { sessionId = new SessionId(connectionId, id); @@ -656,7 +656,7 @@ class AmqpProtocolConverter { } AmqpSessionContext context = (AmqpSessionContext) receiver.getSession().getContext(); - for (ConsumerContext consumer : context.consumers) { + for (ConsumerContext consumer : context.consumers.values()) { if (operation == TransactionInfo.ROLLBACK) { consumer.doRollback(); } else { @@ -682,7 +682,7 @@ class AmqpProtocolConverter { } }); - for (ConsumerContext consumer : context.consumers) { + for (ConsumerContext consumer : context.consumers.values()) { if (operation == TransactionInfo.ROLLBACK) { consumer.pumpOutbound(); } @@ -815,6 +815,12 @@ class AmqpProtocolConverter { public void onClose() throws Exception { if (!closed) { closed = true; + + AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext(); + if (session != null) { + session.consumers.remove(info.getConsumerId()); + } + sendToActiveMQ(new RemoveInfo(consumerId), null); } } @@ -1188,7 +1194,7 @@ class AmqpProtocolConverter { subscriptionsByConsumerId.remove(id); sender.close(); } else { - sessionContext.consumers.add(consumerContext); + sessionContext.consumers.put(id, consumerContext); sender.open(); } pumpProtonToSocket();