diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 238b8b0dbf..131df8fb6f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -316,6 +316,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { Event event = null; while ((event = eventCollector.peek()) != null) { + if (amqpTransport.isTrace()) { + LOG.trace("Processing event: {}", event.getType()); + } switch (event.getType()) { case CONNECTION_REMOTE_OPEN: case CONNECTION_REMOTE_CLOSE: @@ -761,6 +764,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } + public void close() { + closed = true; + } + public boolean isAnonymous() { return anonymous; } @@ -898,7 +905,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { dest = createDestination(remoteTarget); } - ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous); + final ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous); receiver.setContext(producerContext); receiver.flow(flow); @@ -916,7 +923,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } else { receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); } + producerContext.closed = true; receiver.close(); + receiver.free(); } else { receiver.open(); } @@ -1423,7 +1432,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); } subscriptionsByConsumerId.remove(id); + consumerContext.closed = true; sender.close(); + sender.free(); } else { sessionContext.consumers.put(id, consumerContext); sender.open();