This commit is contained in:
Clebert Suconic 2019-02-19 16:10:40 -05:00
commit 8ff0cba3d6
4 changed files with 79 additions and 1 deletions

View File

@ -37,6 +37,7 @@ public final class TopologyMemberImpl implements TopologyMember {
private final String scaleDownGroupName;
/**
* transient to avoid serialization changes
*/
@ -155,4 +156,35 @@ public final class TopologyMemberImpl implements TopologyMember {
public String toString() {
return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TopologyMemberImpl that = (TopologyMemberImpl) o;
// note the uniqueEventId is not park of the equals and hashmap key
if (connector != null ? !connector.equals(that.connector) : that.connector != null)
return false;
if (backupGroupName != null ? !backupGroupName.equals(that.backupGroupName) : that.backupGroupName != null)
return false;
if (scaleDownGroupName != null ? !scaleDownGroupName.equals(that.scaleDownGroupName) : that.scaleDownGroupName != null)
return false;
return nodeId != null ? nodeId.equals(that.nodeId) : that.nodeId == null;
}
@Override
public int hashCode() {
// note the uniqueEventId is not park of the equals and hashmap key
int result = connector != null ? connector.hashCode() : 0;
result = 31 * result + (backupGroupName != null ? backupGroupName.hashCode() : 0);
result = 31 * result + (scaleDownGroupName != null ? scaleDownGroupName.hashCode() : 0);
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
return result;
}
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -135,6 +136,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// on cases where sub-classes need a consumer
protected volatile ClientSessionInternal sessionConsumer;
// this will happen if a disconnect happened
// upon reconnection we need to send the nodeUP back into the topology
protected volatile boolean disconnectedAndDown = false;
protected String targetNodeID;
protected TopologyMember targetNode;
@ -853,6 +859,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
//we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
if (targetNodeID != null) {
this.disconnectedAndDown = true;
serverLocator.notifyNodeDown(System.currentTimeMillis(), targetNodeID);
}
if (queue != null) {
@ -874,6 +881,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
/* Hook for doing extra stuff after connection */
protected void afterConnect() throws Exception {
if (disconnectedAndDown && targetNodeID != null && targetNode != null) {
serverLocator.notifyNodeUp(System.currentTimeMillis(), targetNodeID, targetNode.getBackupGroupName(), targetNode.getScaleDownGroupName(),
new Pair<>(targetNode.getLive(), targetNode.getBackup()), false);
disconnectedAndDown = false;
}
retryCount = 0;
reconnectAttemptsInUse = reconnectAttempts;
if (futureScheduledReconnection != null) {

View File

@ -1077,6 +1077,9 @@ public class BridgeTest extends ActiveMQTestBase {
sf1.close();
SimpleString queueName1Str = new SimpleString(queueName1);
Wait.assertTrue(() -> server1.locateQueue(queueName1Str) == null);
server1.stop();
session0.close();
@ -1084,7 +1087,8 @@ public class BridgeTest extends ActiveMQTestBase {
sf0.close();
closeFields();
assertEquals(0, loadQueues(server0).size());
Wait.assertEquals(0, () -> loadQueues(server0).size());
}

View File

@ -16,15 +16,20 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import java.util.ArrayList;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -81,6 +86,9 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0];
ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge();
Wait.assertEquals(2, bridge.getSessionFactory().getServerLocator().getTopology().getMembers()::size);
ArrayList<TopologyMemberImpl> originalmembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session0.createMessage(true);
producer.send(msg);
@ -91,6 +99,28 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
}
}
Wait.assertEquals(2, bridge.getSessionFactory().getServerLocator().getTopology().getMembers()::size);
ArrayList<TopologyMemberImpl> afterReconnectedMembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
boolean allFound = true;
for (TopologyMemberImpl originalMember : originalmembers) {
boolean found = false;
for (TopologyMember reconnectedMember : afterReconnectedMembers) {
if (originalMember.equals(reconnectedMember)) {
found = true;
break;
}
}
if (!found) {
allFound = false;
}
}
Assert.assertTrue("The topology is slightly different after a reconnect", allFound);
int cons0Count = 0, cons1Count = 0;
while (true) {