diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index 149eea2e12..f6a85355d2 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -32,6 +32,8 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.FailoverEventListener; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; @@ -48,7 +50,7 @@ import org.apache.activemq.artemis.utils.VersionLoader; /** * The message handler */ -public class ActiveMQMessageHandler implements MessageHandler { +public class ActiveMQMessageHandler implements MessageHandler, FailoverEventListener { /** * Trace enabled @@ -80,6 +82,8 @@ public class ActiveMQMessageHandler implements MessageHandler { private ClientSessionFactory cf; + private volatile boolean connected; + public ActiveMQMessageHandler(final ActiveMQActivation activation, final TransactionManager tm, final ClientSessionInternal session, @@ -187,6 +191,8 @@ public class ActiveMQMessageHandler implements MessageHandler { endpoint = endpointFactory.createEndpoint(null); useXA = false; } + connected = true; + session.addFailoverListener(this); consumer.setMessageHandler(this); } @@ -224,39 +230,53 @@ public class ActiveMQMessageHandler implements MessageHandler { ActiveMQRALogger.LOGGER.debug("Error releasing endpoint " + endpoint, t); } - try { - consumer.close(); - if (activation.getTopicTemporaryQueue() != null) { - // We need to delete temporary topics when the activation is stopped or messages will build up on the server - SimpleString tmpQueue = activation.getTopicTemporaryQueue(); - QueueQuery subResponse = session.queueQuery(tmpQueue); - if (subResponse.getConsumerCount() == 0) { - // This is optional really, since we now use temporaryQueues, we could simply ignore this - // and the server temporary queue would remove this as soon as the queue was removed - session.deleteQueue(tmpQueue); + //only do this if we haven't been disconnected at some point whilst failing over + if (connected) { + try { + consumer.close(); + if (activation.getTopicTemporaryQueue() != null) { + // We need to delete temporary topics when the activation is stopped or messages will build up on the server + SimpleString tmpQueue = activation.getTopicTemporaryQueue(); + QueueQuery subResponse = session.queueQuery(tmpQueue); + if (subResponse.getConsumerCount() == 0) { + // This is optional really, since we now use temporaryQueues, we could simply ignore this + // and the server temporary queue would remove this as soon as the queue was removed + session.deleteQueue(tmpQueue); + } } } - } - catch (Throwable t) { - ActiveMQRALogger.LOGGER.debug("Error closing core-queue consumer", t); - } + catch (Throwable t) { + ActiveMQRALogger.LOGGER.debug("Error closing core-queue consumer", t); + } - try { - if (session != null) { - session.close(); + try { + if (session != null) { + session.close(); + } + } + catch (Throwable t) { + ActiveMQRALogger.LOGGER.debug("Error releasing session " + session, t); + } + try { + if (cf != null) { + cf.close(); + } + } + catch (Throwable t) { + ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t); } } - catch (Throwable t) { - ActiveMQRALogger.LOGGER.debug("Error releasing session " + session, t); - } - - try { - if (cf != null) { - cf.close(); + else { + //otherwise we just clean up + try { + if (cf != null) { + cf.cleanup(); + } } - } - catch (Throwable t) { - ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t); + catch (Throwable t) { + ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t); + } + } } @@ -364,4 +384,9 @@ public class ActiveMQMessageHandler implements MessageHandler { public void start() throws ActiveMQException { session.start(); } + + @Override + public void failoverEvent(FailoverEventType eventType) { + connected = eventType == FailoverEventType.FAILOVER_COMPLETED; + } }