ARTEMIS-293 rebalance inflow on topology change

This commit is contained in:
jbertram 2015-10-30 11:20:06 -05:00
parent 63d5f3fc17
commit 3f6089891d
6 changed files with 196 additions and 22 deletions

View File

@ -73,6 +73,10 @@ public interface ActiveMQRALogger extends BasicLogger {
@Message(id = 151005, value = "awaiting server availability", format = Message.Format.MESSAGE_FORMAT)
void awaitingJMSServerCreation();
@LogMessage(level = Logger.Level.INFO)
@Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections.", format = Message.Format.MESSAGE_FORMAT)
void rebalancingConnections();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 152001, value = "problem resetting xa session after failure", format = Message.Format.MESSAGE_FORMAT)
void problemResettingXASession();

View File

@ -30,9 +30,11 @@ import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -42,6 +44,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
@ -111,8 +115,14 @@ public class ActiveMQActivation {
private ActiveMQConnectionFactory factory;
private List<String> nodes = Collections.synchronizedList(new ArrayList<String>());
private Map<String, Long> removedNodes = new ConcurrentHashMap<String, Long>();
private boolean lastReceived = false;
// Whether we are in the failure recovery loop
private final AtomicBoolean inFailure = new AtomicBoolean(false);
private final AtomicBoolean inReconnect = new AtomicBoolean(false);
private XARecoveryConfig resourceRecovery;
static {
@ -338,6 +348,9 @@ public class ActiveMQActivation {
Map<String, String> recoveryConfProps = new HashMap<String, String>();
recoveryConfProps.put(XARecoveryConfig.JNDI_NAME_PROPERTY_KEY, ra.getJndiName());
resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword(), recoveryConfProps);
if (spec.isRebalanceConnections()) {
factory.getServerLocator().addClusterTopologyListener(new RebalancingListener());
}
ActiveMQRALogger.LOGGER.debug("Setup complete " + this);
}
@ -431,6 +444,9 @@ public class ActiveMQActivation {
factory = null;
}
nodes.clear();
lastReceived = false;
ActiveMQRALogger.LOGGER.debug("Tearing down complete " + this);
}
@ -610,12 +626,18 @@ public class ActiveMQActivation {
return buffer.toString();
}
public void rebalance() {
ActiveMQRALogger.LOGGER.rebalancingConnections();
reconnect(null);
}
/**
* Handles any failure by trying to reconnect
* Drops all existing connection-related resources and reconnects
*
* @param failure the reason for the failure
* @param failure if reconnecting in the event of a failure
*/
public void handleFailure(Throwable failure) {
public void reconnect(Throwable failure) {
if (failure != null) {
if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
}
@ -625,12 +647,13 @@ public class ActiveMQActivation {
else {
ActiveMQRALogger.LOGGER.failureInActivation(failure, spec);
}
}
int reconnectCount = 0;
int setupAttempts = spec.getSetupAttempts();
long setupInterval = spec.getSetupInterval();
// Only enter the failure loop once
if (inFailure.getAndSet(true))
// Only enter the reconnect loop once
if (inReconnect.getAndSet(true))
return;
try {
Throwable lastException = failure;
@ -675,7 +698,7 @@ public class ActiveMQActivation {
}
finally {
// Leaving failure recovery loop
inFailure.set(false);
inReconnect.set(false);
}
}
@ -693,11 +716,55 @@ public class ActiveMQActivation {
setup();
}
catch (Throwable t) {
handleFailure(t);
reconnect(t);
}
}
public void release() {
}
}
private class RebalancingListener implements ClusterTopologyListener {
@Override
public void nodeUP(TopologyMember member, boolean last) {
boolean newNode = false;
String id = member.getNodeId();
if (!nodes.contains(id)) {
if (removedNodes.get(id) == null || (removedNodes.get(id) != null && removedNodes.get(id) < member.getUniqueEventID())) {
nodes.add(id);
newNode = true;
}
}
if (lastReceived && newNode) {
Runnable runnable = new Runnable() {
@Override
public void run() {
rebalance();
}
};
Thread t = new Thread(runnable, "NodeUP Connection Rebalancer");
t.start();
}
else if (last) {
lastReceived = true;
}
}
@Override
public void nodeDown(long eventUID, String nodeID) {
if (nodes.remove(nodeID)) {
removedNodes.put(nodeID, eventUID);
Runnable runnable = new Runnable() {
@Override
public void run() {
rebalance();
}
};
Thread t = new Thread(runnable, "NodeDOWN Connection Rebalancer");
t.start();
}
}
}
}

View File

@ -135,6 +135,8 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
// undefined by default, default is specified at the RA level in ActiveMQRAProperties
private Long setupInterval;
private Boolean rebalanceConnections = false;
/**
* Constructor
*/
@ -626,6 +628,14 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
this.localTx = localTx;
}
public boolean isRebalanceConnections() {
return rebalanceConnections;
}
public void setRebalanceConnections(boolean rebalanceConnections) {
this.rebalanceConnections = rebalanceConnections;
}
public int getSetupAttempts() {
if (ActiveMQActivationSpec.trace) {
ActiveMQRALogger.LOGGER.trace("getSetupAttempts()");
@ -846,6 +856,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
if (parsedJndiParams != null ? !parsedJndiParams.equals(that.parsedJndiParams) : that.parsedJndiParams != null)
return false;
if (localTx != null ? !localTx.equals(that.localTx) : that.localTx != null) return false;
if (rebalanceConnections != null ? !rebalanceConnections.equals(that.rebalanceConnections) : that.rebalanceConnections != null) return false;
if (setupAttempts != null ? !setupAttempts.equals(that.setupAttempts) : that.setupAttempts != null) return false;
return !(setupInterval != null ? !setupInterval.equals(that.setupInterval) : that.setupInterval != null);
@ -873,6 +884,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
result = 31 * result + (jndiParams != null ? jndiParams.hashCode() : 0);
result = 31 * result + (parsedJndiParams != null ? parsedJndiParams.hashCode() : 0);
result = 31 * result + (localTx != null ? localTx.hashCode() : 0);
result = 31 * result + (rebalanceConnections != null ? rebalanceConnections.hashCode() : 0);
result = 31 * result + (setupAttempts != null ? setupAttempts.hashCode() : 0);
result = 31 * result + (setupInterval != null ? setupInterval.hashCode() : 0);
return result;

View File

@ -810,6 +810,10 @@ public abstract class ActiveMQTestBase extends Assert {
deleteDirectory(file);
file.mkdirs();
recreateDataDirectories(testDir1, index, backup);
}
protected void recreateDataDirectories(String testDir1, int index, boolean backup) {
recreateDirectory(getJournalDir(testDir1, index, backup));
recreateDirectory(getBindingsDir(testDir1, index, backup));
recreateDirectory(getPageDir(testDir1, index, backup));

View File

@ -170,4 +170,76 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
}
}
}
@Test
public void testRebalance() throws Exception {
final int CONSUMER_COUNT = 10;
secondaryJmsServer.createQueue(true, MDBQUEUE, null, true, "/jms/" + MDBQUEUE);
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
spec.setRebalanceConnections(true);
spec.setMaxSession(CONSUMER_COUNT);
spec.setSetupAttempts(5);
spec.setSetupInterval(200);
spec.setHA(true); // if this isn't true then the toplogy listener won't get nodeDown notifications
spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown to occur on the MDB connection
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(1);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
Queue primaryQueue = server.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
Queue secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);
ClientSession session = addClientSession(locator.createSessionFactory().createSession());
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("test");
clientProducer.send(message);
latch.await(5, TimeUnit.SECONDS);
assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test");
for (int i = 0; i < 10; i++) {
secondaryServer.stop();
long mark = System.currentTimeMillis();
long timeout = 5000;
while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis() - mark) < timeout) {
Thread.sleep(100);
}
assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT);
secondaryServer.start();
waitForServerToStart(secondaryServer);
secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
mark = System.currentTimeMillis();
while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) < (CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis() - mark) <= timeout) {
Thread.sleep(100);
}
assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);
}
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
qResourceAdapter.stop();
}
}

View File

@ -45,34 +45,49 @@ public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase {
params.put(TransportConstants.SERVER_ID_PROP_NAME, "1");
secondaryConnector = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
secondaryServer = addServer(ActiveMQServers.newActiveMQServer(createSecondaryDefaultConfig(true, true), mbeanServer, usePersistence()));
secondaryServer = addServer(ActiveMQServers.newActiveMQServer(createSecondaryDefaultConfig(true), mbeanServer, usePersistence()));
addServer(secondaryServer);
secondaryJmsServer = new JMSServerManagerImpl(secondaryServer);
secondaryJmsServer.start();
waitForTopology(secondaryServer, 2);
}
protected Configuration createDefaultConfig(boolean netty) throws Exception {
return createSecondaryDefaultConfig(netty, false);
return createSecondaryDefaultConfig(false);
}
protected Configuration createSecondaryDefaultConfig(boolean netty, boolean secondary) throws Exception {
protected Configuration createSecondaryDefaultConfig(boolean secondary) throws Exception {
HashMap invmMap = new HashMap();
HashMap nettyMap = new HashMap();
String primaryConnectorName = "invm2";
String secondaryConnectorName = "invm";
String directoryPrefix = "first";
int index = 0;
if (secondary) {
invmMap.put(TransportConstants.SERVER_ID_PROP_NAME, "1");
nettyMap.put("port", "5545");
primaryConnectorName = "invm";
secondaryConnectorName = "invm2";
directoryPrefix = "second";
index = 1;
}
ConfigurationImpl configuration = createBasicConfig().setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).setJournalDirectory(getTestDir() + "/" + directoryPrefix + "Journal/").setBindingsDirectory(getTestDir() + "/" + directoryPrefix + "Bind/").setLargeMessagesDirectory(getTestDir() + "/" + directoryPrefix + "Large/").setPagingDirectory(getTestDir() + "/" + directoryPrefix + "Page/").addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName));
ConfigurationImpl configuration = createBasicConfig(index)
.setJMXManagementEnabled(false)
.clearAcceptorConfigurations()
.addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap))
.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap))
.addConnectorConfiguration(secondaryConnectorName, secondaryConnector)
.addConnectorConfiguration(primaryConnectorName, primaryConnector)
.addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0));
recreateDataDirectories(getTestDir(), index, false);
return configuration;
}
@Override
protected boolean usePersistence() {
return true;
}
}