diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java index 093e3eed0a..6ec3a5e7de 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java @@ -73,6 +73,10 @@ public interface ActiveMQRALogger extends BasicLogger { @Message(id = 151005, value = "awaiting server availability", format = Message.Format.MESSAGE_FORMAT) void awaitingJMSServerCreation(); + @LogMessage(level = Logger.Level.INFO) + @Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections.", format = Message.Format.MESSAGE_FORMAT) + void rebalancingConnections(); + @LogMessage(level = Logger.Level.WARN) @Message(id = 152001, value = "problem resetting xa session after failure", format = Message.Format.MESSAGE_FORMAT) void problemResettingXASession(); diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index 8e55061c14..cb439f9aa3 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -30,9 +30,11 @@ import javax.resource.spi.work.WorkManager; import javax.transaction.xa.XAResource; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -42,6 +44,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; +import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory; @@ -111,8 +115,14 @@ public class ActiveMQActivation { private ActiveMQConnectionFactory factory; + private List nodes = Collections.synchronizedList(new ArrayList()); + + private Map removedNodes = new ConcurrentHashMap(); + + private boolean lastReceived = false; + // Whether we are in the failure recovery loop - private final AtomicBoolean inFailure = new AtomicBoolean(false); + private final AtomicBoolean inReconnect = new AtomicBoolean(false); private XARecoveryConfig resourceRecovery; static { @@ -338,6 +348,9 @@ public class ActiveMQActivation { Map recoveryConfProps = new HashMap(); recoveryConfProps.put(XARecoveryConfig.JNDI_NAME_PROPERTY_KEY, ra.getJndiName()); resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword(), recoveryConfProps); + if (spec.isRebalanceConnections()) { + factory.getServerLocator().addClusterTopologyListener(new RebalancingListener()); + } ActiveMQRALogger.LOGGER.debug("Setup complete " + this); } @@ -431,6 +444,9 @@ public class ActiveMQActivation { factory = null; } + nodes.clear(); + lastReceived = false; + ActiveMQRALogger.LOGGER.debug("Tearing down complete " + this); } @@ -610,27 +626,34 @@ public class ActiveMQActivation { return buffer.toString(); } + public void rebalance() { + ActiveMQRALogger.LOGGER.rebalancingConnections(); + reconnect(null); + } + /** - * Handles any failure by trying to reconnect + * Drops all existing connection-related resources and reconnects * - * @param failure the reason for the failure + * @param failure if reconnecting in the event of a failure */ - public void handleFailure(Throwable failure) { - if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) { - ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination()); - } - else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) { - ActiveMQRALogger.LOGGER.awaitingJMSServerCreation(); - } - else { - ActiveMQRALogger.LOGGER.failureInActivation(failure, spec); + public void reconnect(Throwable failure) { + if (failure != null) { + if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) { + ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination()); + } + else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) { + ActiveMQRALogger.LOGGER.awaitingJMSServerCreation(); + } + else { + ActiveMQRALogger.LOGGER.failureInActivation(failure, spec); + } } int reconnectCount = 0; int setupAttempts = spec.getSetupAttempts(); long setupInterval = spec.getSetupInterval(); - // Only enter the failure loop once - if (inFailure.getAndSet(true)) + // Only enter the reconnect loop once + if (inReconnect.getAndSet(true)) return; try { Throwable lastException = failure; @@ -675,7 +698,7 @@ public class ActiveMQActivation { } finally { // Leaving failure recovery loop - inFailure.set(false); + inReconnect.set(false); } } @@ -693,11 +716,55 @@ public class ActiveMQActivation { setup(); } catch (Throwable t) { - handleFailure(t); + reconnect(t); } } public void release() { } } + + private class RebalancingListener implements ClusterTopologyListener { + @Override + public void nodeUP(TopologyMember member, boolean last) { + boolean newNode = false; + + String id = member.getNodeId(); + if (!nodes.contains(id)) { + if (removedNodes.get(id) == null || (removedNodes.get(id) != null && removedNodes.get(id) < member.getUniqueEventID())) { + nodes.add(id); + newNode = true; + } + } + + if (lastReceived && newNode) { + Runnable runnable = new Runnable() { + @Override + public void run() { + rebalance(); + } + }; + Thread t = new Thread(runnable, "NodeUP Connection Rebalancer"); + t.start(); + } + else if (last) { + lastReceived = true; + } + } + + @Override + public void nodeDown(long eventUID, String nodeID) { + if (nodes.remove(nodeID)) { + removedNodes.put(nodeID, eventUID); + Runnable runnable = new Runnable() { + @Override + public void run() { + rebalance(); + } + }; + Thread t = new Thread(runnable, "NodeDOWN Connection Rebalancer"); + t.start(); + } + } + } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java index f80342b4f4..32d253e3d8 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java @@ -135,6 +135,8 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen // undefined by default, default is specified at the RA level in ActiveMQRAProperties private Long setupInterval; + private Boolean rebalanceConnections = false; + /** * Constructor */ @@ -626,6 +628,14 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen this.localTx = localTx; } + public boolean isRebalanceConnections() { + return rebalanceConnections; + } + + public void setRebalanceConnections(boolean rebalanceConnections) { + this.rebalanceConnections = rebalanceConnections; + } + public int getSetupAttempts() { if (ActiveMQActivationSpec.trace) { ActiveMQRALogger.LOGGER.trace("getSetupAttempts()"); @@ -846,6 +856,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen if (parsedJndiParams != null ? !parsedJndiParams.equals(that.parsedJndiParams) : that.parsedJndiParams != null) return false; if (localTx != null ? !localTx.equals(that.localTx) : that.localTx != null) return false; + if (rebalanceConnections != null ? !rebalanceConnections.equals(that.rebalanceConnections) : that.rebalanceConnections != null) return false; if (setupAttempts != null ? !setupAttempts.equals(that.setupAttempts) : that.setupAttempts != null) return false; return !(setupInterval != null ? !setupInterval.equals(that.setupInterval) : that.setupInterval != null); @@ -873,6 +884,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen result = 31 * result + (jndiParams != null ? jndiParams.hashCode() : 0); result = 31 * result + (parsedJndiParams != null ? parsedJndiParams.hashCode() : 0); result = 31 * result + (localTx != null ? localTx.hashCode() : 0); + result = 31 * result + (rebalanceConnections != null ? rebalanceConnections.hashCode() : 0); result = 31 * result + (setupAttempts != null ? setupAttempts.hashCode() : 0); result = 31 * result + (setupInterval != null ? setupInterval.hashCode() : 0); return result; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 048ab08fdd..76ac5ab0bb 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -810,6 +810,10 @@ public abstract class ActiveMQTestBase extends Assert { deleteDirectory(file); file.mkdirs(); + recreateDataDirectories(testDir1, index, backup); + } + + protected void recreateDataDirectories(String testDir1, int index, boolean backup) { recreateDirectory(getJournalDir(testDir1, index, backup)); recreateDirectory(getBindingsDir(testDir1, index, backup)); recreateDirectory(getPageDir(testDir1, index, backup)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java index fbab7f023a..3ffac2ea24 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java @@ -170,4 +170,76 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase { } } } + + @Test + public void testRebalance() throws Exception { + final int CONSUMER_COUNT = 10; + secondaryJmsServer.createQueue(true, MDBQUEUE, null, true, "/jms/" + MDBQUEUE); + + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + MyBootstrapContext ctx = new MyBootstrapContext(); + qResourceAdapter.start(ctx); + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setResourceAdapter(qResourceAdapter); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Queue"); + spec.setDestination(MDBQUEUE); + spec.setRebalanceConnections(true); + spec.setMaxSession(CONSUMER_COUNT); + spec.setSetupAttempts(5); + spec.setSetupInterval(200); + spec.setHA(true); // if this isn't true then the toplogy listener won't get nodeDown notifications + spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown to occur on the MDB connection + qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY); + CountDownLatch latch = new CountDownLatch(1); + DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch); + DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false); + qResourceAdapter.endpointActivation(endpointFactory, spec); + + Queue primaryQueue = server.locateQueue(MDBQUEUEPREFIXEDSIMPLE); + Queue secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE); + + assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT); + assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT); + assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT); + + ClientSession session = addClientSession(locator.createSessionFactory().createSession()); + ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED); + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeString("test"); + clientProducer.send(message); + + latch.await(5, TimeUnit.SECONDS); + + assertNotNull(endpoint.lastMessage); + assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test"); + + for (int i = 0; i < 10; i++) { + secondaryServer.stop(); + + long mark = System.currentTimeMillis(); + long timeout = 5000; + while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis() - mark) < timeout) { + Thread.sleep(100); + } + + assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT); + + secondaryServer.start(); + waitForServerToStart(secondaryServer); + secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE); + + mark = System.currentTimeMillis(); + while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) < (CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis() - mark) <= timeout) { + Thread.sleep(100); + } + + assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT); + assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT); + assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT); + } + + qResourceAdapter.endpointDeactivation(endpointFactory, spec); + qResourceAdapter.stop(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java index a2e69e1486..c49d38a9cc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java @@ -45,34 +45,49 @@ public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase { params.put(TransportConstants.SERVER_ID_PROP_NAME, "1"); secondaryConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params); - secondaryServer = addServer(ActiveMQServers.newActiveMQServer(createSecondaryDefaultConfig(true, true), mbeanServer, usePersistence())); + secondaryServer = addServer(ActiveMQServers.newActiveMQServer(createSecondaryDefaultConfig(true), mbeanServer, usePersistence())); addServer(secondaryServer); secondaryJmsServer = new JMSServerManagerImpl(secondaryServer); secondaryJmsServer.start(); waitForTopology(secondaryServer, 2); + } protected Configuration createDefaultConfig(boolean netty) throws Exception { - return createSecondaryDefaultConfig(netty, false); + return createSecondaryDefaultConfig(false); } - protected Configuration createSecondaryDefaultConfig(boolean netty, boolean secondary) throws Exception { + protected Configuration createSecondaryDefaultConfig(boolean secondary) throws Exception { HashMap invmMap = new HashMap(); HashMap nettyMap = new HashMap(); String primaryConnectorName = "invm2"; String secondaryConnectorName = "invm"; - String directoryPrefix = "first"; + int index = 0; if (secondary) { invmMap.put(TransportConstants.SERVER_ID_PROP_NAME, "1"); nettyMap.put("port", "5545"); primaryConnectorName = "invm"; secondaryConnectorName = "invm2"; - directoryPrefix = "second"; + index = 1; } - ConfigurationImpl configuration = createBasicConfig().setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).setJournalDirectory(getTestDir() + "/" + directoryPrefix + "Journal/").setBindingsDirectory(getTestDir() + "/" + directoryPrefix + "Bind/").setLargeMessagesDirectory(getTestDir() + "/" + directoryPrefix + "Large/").setPagingDirectory(getTestDir() + "/" + directoryPrefix + "Page/").addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName)); + ConfigurationImpl configuration = createBasicConfig(index) + .setJMXManagementEnabled(false) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)) + .addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)) + .addConnectorConfiguration(secondaryConnectorName, secondaryConnector) + .addConnectorConfiguration(primaryConnectorName, primaryConnector) + .addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0)); + + recreateDataDirectories(getTestDir(), index, false); return configuration; } + + @Override + protected boolean usePersistence() { + return true; + } }