This closes #218

This commit is contained in:
Clebert Suconic 2015-10-26 14:08:11 -04:00
commit 7f60ff20a5
1 changed files with 53 additions and 28 deletions

View File

@ -32,6 +32,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery; import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
@ -48,7 +50,7 @@ import org.apache.activemq.artemis.utils.VersionLoader;
/** /**
* The message handler * The message handler
*/ */
public class ActiveMQMessageHandler implements MessageHandler { public class ActiveMQMessageHandler implements MessageHandler, FailoverEventListener {
/** /**
* Trace enabled * Trace enabled
@ -80,6 +82,8 @@ public class ActiveMQMessageHandler implements MessageHandler {
private ClientSessionFactory cf; private ClientSessionFactory cf;
private volatile boolean connected;
public ActiveMQMessageHandler(final ActiveMQActivation activation, public ActiveMQMessageHandler(final ActiveMQActivation activation,
final TransactionManager tm, final TransactionManager tm,
final ClientSessionInternal session, final ClientSessionInternal session,
@ -187,6 +191,8 @@ public class ActiveMQMessageHandler implements MessageHandler {
endpoint = endpointFactory.createEndpoint(null); endpoint = endpointFactory.createEndpoint(null);
useXA = false; useXA = false;
} }
connected = true;
session.addFailoverListener(this);
consumer.setMessageHandler(this); consumer.setMessageHandler(this);
} }
@ -224,39 +230,53 @@ public class ActiveMQMessageHandler implements MessageHandler {
ActiveMQRALogger.LOGGER.debug("Error releasing endpoint " + endpoint, t); ActiveMQRALogger.LOGGER.debug("Error releasing endpoint " + endpoint, t);
} }
try { //only do this if we haven't been disconnected at some point whilst failing over
consumer.close(); if (connected) {
if (activation.getTopicTemporaryQueue() != null) { try {
// We need to delete temporary topics when the activation is stopped or messages will build up on the server consumer.close();
SimpleString tmpQueue = activation.getTopicTemporaryQueue(); if (activation.getTopicTemporaryQueue() != null) {
QueueQuery subResponse = session.queueQuery(tmpQueue); // We need to delete temporary topics when the activation is stopped or messages will build up on the server
if (subResponse.getConsumerCount() == 0) { SimpleString tmpQueue = activation.getTopicTemporaryQueue();
// This is optional really, since we now use temporaryQueues, we could simply ignore this QueueQuery subResponse = session.queueQuery(tmpQueue);
// and the server temporary queue would remove this as soon as the queue was removed if (subResponse.getConsumerCount() == 0) {
session.deleteQueue(tmpQueue); // This is optional really, since we now use temporaryQueues, we could simply ignore this
// and the server temporary queue would remove this as soon as the queue was removed
session.deleteQueue(tmpQueue);
}
} }
} }
} catch (Throwable t) {
catch (Throwable t) { ActiveMQRALogger.LOGGER.debug("Error closing core-queue consumer", t);
ActiveMQRALogger.LOGGER.debug("Error closing core-queue consumer", t); }
}
try { try {
if (session != null) { if (session != null) {
session.close(); session.close();
}
}
catch (Throwable t) {
ActiveMQRALogger.LOGGER.debug("Error releasing session " + session, t);
}
try {
if (cf != null) {
cf.close();
}
}
catch (Throwable t) {
ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t);
} }
} }
catch (Throwable t) { else {
ActiveMQRALogger.LOGGER.debug("Error releasing session " + session, t); //otherwise we just clean up
} try {
if (cf != null) {
try { cf.cleanup();
if (cf != null) { }
cf.close();
} }
} catch (Throwable t) {
catch (Throwable t) { ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t);
ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t); }
} }
} }
@ -364,4 +384,9 @@ public class ActiveMQMessageHandler implements MessageHandler {
public void start() throws ActiveMQException { public void start() throws ActiveMQException {
session.start(); session.start();
} }
@Override
public void failoverEvent(FailoverEventType eventType) {
connected = eventType == FailoverEventType.FAILOVER_COMPLETED;
}
} }