This commit is contained in:
Clebert Suconic 2018-04-03 16:22:13 -04:00
commit a8e81f2267
3 changed files with 71 additions and 21 deletions

View File

@ -93,7 +93,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final SimpleString name;
private final Queue queue;
protected final Queue queue;
private final Filter filter;
@ -1010,6 +1010,27 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
scheduleRetryConnectFixedTimeout(timeout);
}
// To be called by the topology update
// This logic will be updated on the cluster connection
protected void nodeUP(TopologyMember member, boolean last) {
ClientSessionInternal sessionToUse = session;
RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
if (member != null && this.targetNodeID != null && this.targetNodeID.equals(member.getNodeId())) {
// this could be an update of the topology say after a backup started
BridgeImpl.this.targetNode = member;
} else {
// we don't need synchronization here, but we need to make sure we won't get a NPE on races
if (connectionToUse != null && member.isMember(connectionToUse)) {
this.targetNode = member;
this.targetNodeID = member.getNodeId();
}
}
}
// Inner classes -------------------------------------------------
protected void scheduleRetryConnectFixedTimeout(final long milliseconds) {
@ -1159,20 +1180,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// ClusterListener
@Override
public void nodeUP(TopologyMember member, boolean last) {
ClientSessionInternal sessionToUse = session;
RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
if (member != null && BridgeImpl.this.targetNodeID != null && BridgeImpl.this.targetNodeID.equals(member.getNodeId())) {
// this could be an update of the topology say after a backup started
BridgeImpl.this.targetNode = member;
} else {
// we don't need synchronization here, but we need to make sure we won't get a NPE on races
if (connectionToUse != null && member.isMember(connectionToUse)) {
BridgeImpl.this.targetNode = member;
BridgeImpl.this.targetNodeID = member.getNodeId();
}
}
BridgeImpl.this.nodeUP(member, last);
}
@Override

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@ -58,7 +59,6 @@ import org.jboss.logging.Logger;
* Such as such adding extra properties and setting up notifications between the nodes.
*/
public class ClusterConnectionBridge extends BridgeImpl {
private static final Logger logger = Logger.getLogger(ClusterConnectionBridge.class);
private final ClusterConnection clusterConnection;
@ -127,9 +127,6 @@ public class ClusterConnectionBridge extends BridgeImpl {
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
if (logger.isTraceEnabled()) {
logger.trace("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception("trace"));
}
@ -356,6 +353,20 @@ public class ClusterConnectionBridge extends BridgeImpl {
return filterString;
}
@Override
protected void nodeUP(TopologyMember member, boolean last) {
if (member != null && targetNodeID != null && !this.targetNodeID.equals(member.getNodeId())) {
//A ClusterConnectionBridge (identified by holding an internal queue)
//never re-connects to another node here. It only connects to its original
//target node (from the ClusterConnection) or its backups. That's why
//we put a return here.
return;
}
super.nodeUP(member, last);
}
@Override
protected void afterConnect() throws Exception {
super.afterConnect();

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -32,6 +33,8 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
@ -73,16 +76,18 @@ public class BackupSyncJournalTest extends FailoverTestBase {
return n_msgs;
}
protected final FailoverWaiter failoverWaiter = new FailoverWaiter();
@Override
@Before
public void setUp() throws Exception {
startBackupServer = false;
super.setUp();
setNumberOfMessages(defaultNMsgs);
locator = (ServerLocatorInternal) getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15);
locator = (ServerLocatorInternal) getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15).setRetryInterval(200);
sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
sessionFactory.addFailoverListener(failoverWaiter);
syncDelay = new BackupSyncDelay(backupServer, liveServer);
}
@Test
@ -326,8 +331,13 @@ public class BackupSyncJournalTest extends FailoverTestBase {
liveServer.removeInterceptor(syncDelay);
backupServer.start();
waitForBackup(sessionFactory, BACKUP_WAIT_TIME);
failoverWaiter.reset();
crash(session);
backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
//for some system the retryAttempts and retryInterval may be too small
//so that during failover all attempts have failed before the backup
//server is fully activated.
assertTrue("Session didn't failover, the maxRetryAttempts and retryInterval may be too small", failoverWaiter.waitFailoverComplete());
}
protected void createProducerSendSomeMessages() throws ActiveMQException {
@ -384,4 +394,25 @@ public class BackupSyncJournalTest extends FailoverTestBase {
protected TransportConfiguration getConnectorTransportConfiguration(boolean live) {
return TransportConfigurationUtils.getInVMConnector(live);
}
private class FailoverWaiter implements FailoverEventListener {
private CountDownLatch latch;
public void reset() {
latch = new CountDownLatch(1);
}
@Override
public void failoverEvent(FailoverEventType eventType) {
if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
latch.countDown();
}
}
public boolean waitFailoverComplete() throws InterruptedException {
return latch.await(10, TimeUnit.SECONDS);
}
}
}