Ensure that the consumer and producer context instances are marked as
closed when creation at the broker end fails.
This commit is contained in:
Timothy Bish 2015-01-30 10:35:35 -05:00
parent ae595c95b1
commit 2a0be3b0f0
1 changed files with 12 additions and 1 deletions

View File

@ -316,6 +316,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
Event event = null; Event event = null;
while ((event = eventCollector.peek()) != null) { while ((event = eventCollector.peek()) != null) {
if (amqpTransport.isTrace()) {
LOG.trace("Processing event: {}", event.getType());
}
switch (event.getType()) { switch (event.getType()) {
case CONNECTION_REMOTE_OPEN: case CONNECTION_REMOTE_OPEN:
case CONNECTION_REMOTE_CLOSE: case CONNECTION_REMOTE_CLOSE:
@ -761,6 +764,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
} }
public void close() {
closed = true;
}
public boolean isAnonymous() { public boolean isAnonymous() {
return anonymous; return anonymous;
} }
@ -898,7 +905,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
dest = createDestination(remoteTarget); dest = createDestination(remoteTarget);
} }
ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous); final ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous);
receiver.setContext(producerContext); receiver.setContext(producerContext);
receiver.flow(flow); receiver.flow(flow);
@ -916,7 +923,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} else { } else {
receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
} }
producerContext.closed = true;
receiver.close(); receiver.close();
receiver.free();
} else { } else {
receiver.open(); receiver.open();
} }
@ -1423,7 +1432,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
} }
subscriptionsByConsumerId.remove(id); subscriptionsByConsumerId.remove(id);
consumerContext.closed = true;
sender.close(); sender.close();
sender.free();
} else { } else {
sessionContext.consumers.put(id, consumerContext); sessionContext.consumers.put(id, consumerContext);
sender.open(); sender.open();