release the connection even if broker communication fails

This commit is contained in:
Romain Manni-Bucau 2015-11-19 07:48:56 -08:00
parent 64aac4ce73
commit 934f3cea7e
1 changed files with 26 additions and 24 deletions

View File

@ -678,34 +678,36 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.activeTempDestinations.clear(); this.activeTempDestinations.clear();
if (isConnectionInfoSentToBroker) { try {
// If we announced ourselves to the broker.. Try to let the broker if (isConnectionInfoSentToBroker) {
// know that the connection is being shutdown. // If we announced ourselves to the broker.. Try to let the broker
RemoveInfo removeCommand = info.createRemoveCommand(); // know that the connection is being shutdown.
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); RemoveInfo removeCommand = info.createRemoveCommand();
try { removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
doSyncSendPacket(removeCommand, closeTimeout); try {
} catch (JMSException e) { doSyncSendPacket(removeCommand, closeTimeout);
if (e.getCause() instanceof RequestTimedOutIOException) { } catch (JMSException e) {
// expected if (e.getCause() instanceof RequestTimedOutIOException) {
} else { // expected
throw e; } else {
throw e;
}
} }
doAsyncSendPacket(new ShutdownInfo());
} }
doAsyncSendPacket(new ShutdownInfo()); } finally { // release anyway even if previous communication fails
} started.set(false);
started.set(false); // TODO if we move the TaskRunnerFactory to the connection
// factory
// TODO if we move the TaskRunnerFactory to the connection // then we may need to call
// factory // factory.onConnectionClose(this);
// then we may need to call if (sessionTaskRunner != null) {
// factory.onConnectionClose(this); sessionTaskRunner.shutdown();
if (sessionTaskRunner != null) { }
sessionTaskRunner.shutdown(); closed.set(true);
closing.set(false);
} }
closed.set(true);
closing.set(false);
} }
} }
} finally { } finally {