From 2a0be3b0f09a5473e5bd7944fce5286742c80a58 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 30 Jan 2015 10:35:35 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5550 Ensure that the consumer and producer context instances are marked as closed when creation at the broker end fails. --- .../transport/amqp/AmqpProtocolConverter.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 238b8b0dbf..131df8fb6f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -316,6 +316,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { Event event = null; while ((event = eventCollector.peek()) != null) { + if (amqpTransport.isTrace()) { + LOG.trace("Processing event: {}", event.getType()); + } switch (event.getType()) { case CONNECTION_REMOTE_OPEN: case CONNECTION_REMOTE_CLOSE: @@ -761,6 +764,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } + public void close() { + closed = true; + } + public boolean isAnonymous() { return anonymous; } @@ -898,7 +905,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { dest = createDestination(remoteTarget); } - ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous); + final ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous); receiver.setContext(producerContext); receiver.flow(flow); @@ -916,7 +923,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } else { receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); } + producerContext.closed = true; receiver.close(); + receiver.free(); } else { receiver.open(); } @@ -1423,7 +1432,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); } subscriptionsByConsumerId.remove(id); + consumerContext.closed = true; sender.close(); + sender.free(); } else { sessionContext.consumers.put(id, consumerContext); sender.open();