ARTEMIS-1779 ClusterConnectionBridge may connect to other nodes than its target

The cluster connection bridge has a TopologyListener and connects to a new node
each time it receives a nodeUp() event. It needs to put a check here to make
sure that the cluster bridge only connects to its target node and it's backups.

This issue shows up when you run LiveToLiveFailoverTest.testConsumerTransacted
test.

Also in this commit improvement of BackupSyncJournalTest so that it runs more
stable.
This commit is contained in:
Howard Gao 2018-04-02 19:16:30 +08:00 committed by Clebert Suconic
parent 650c79ee0f
commit 262990fa67
3 changed files with 40 additions and 6 deletions

View File

@ -1159,6 +1159,13 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// ClusterListener
@Override
public void nodeUP(TopologyMember member, boolean last) {
if (BridgeImpl.this.queue.isInternalQueue() && member != null && BridgeImpl.this.targetNodeID != null && !BridgeImpl.this.targetNodeID.equals(member.getNodeId())) {
//A ClusterConnectionBridge (identified by holding an internal queue)
//never re-connects to another node here. It only connects to its original
//target node (from the ClusterConnection) or its backups. That's why
//we put a return here.
return;
}
ClientSessionInternal sessionToUse = session;
RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;

View File

@ -58,7 +58,6 @@ import org.jboss.logging.Logger;
* Such as such adding extra properties and setting up notifications between the nodes.
*/
public class ClusterConnectionBridge extends BridgeImpl {
private static final Logger logger = Logger.getLogger(ClusterConnectionBridge.class);
private final ClusterConnection clusterConnection;
@ -127,9 +126,6 @@ public class ClusterConnectionBridge extends BridgeImpl {
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
if (logger.isTraceEnabled()) {
logger.trace("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception("trace"));
}

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -32,6 +33,8 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
@ -73,16 +76,18 @@ public class BackupSyncJournalTest extends FailoverTestBase {
return n_msgs;
}
protected final FailoverWaiter failoverWaiter = new FailoverWaiter();
@Override
@Before
public void setUp() throws Exception {
startBackupServer = false;
super.setUp();
setNumberOfMessages(defaultNMsgs);
locator = (ServerLocatorInternal) getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15);
locator = (ServerLocatorInternal) getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15).setRetryInterval(200);
sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
sessionFactory.addFailoverListener(failoverWaiter);
syncDelay = new BackupSyncDelay(backupServer, liveServer);
}
@Test
@ -326,8 +331,13 @@ public class BackupSyncJournalTest extends FailoverTestBase {
liveServer.removeInterceptor(syncDelay);
backupServer.start();
waitForBackup(sessionFactory, BACKUP_WAIT_TIME);
failoverWaiter.reset();
crash(session);
backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
//for some system the retryAttempts and retryInterval may be too small
//so that during failover all attempts have failed before the backup
//server is fully activated.
assertTrue("Session didn't failover, the maxRetryAttempts and retryInterval may be too small", failoverWaiter.waitFailoverComplete());
}
protected void createProducerSendSomeMessages() throws ActiveMQException {
@ -384,4 +394,25 @@ public class BackupSyncJournalTest extends FailoverTestBase {
protected TransportConfiguration getConnectorTransportConfiguration(boolean live) {
return TransportConfigurationUtils.getInVMConnector(live);
}
private class FailoverWaiter implements FailoverEventListener {
private CountDownLatch latch;
public void reset() {
latch = new CountDownLatch(1);
}
@Override
public void failoverEvent(FailoverEventType eventType) {
if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
latch.countDown();
}
}
public boolean waitFailoverComplete() throws InterruptedException {
return latch.await(10, TimeUnit.SECONDS);
}
}
}