ARTEMIS-1779 Small refactoring to logic on BridgeImpl::nodeUp logic

This commit is contained in:
Clebert Suconic 2018-04-03 08:58:39 -04:00
parent 262990fa67
commit f3e1ab337c
2 changed files with 38 additions and 22 deletions

View File

@ -93,7 +93,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final SimpleString name; private final SimpleString name;
private final Queue queue; protected final Queue queue;
private final Filter filter; private final Filter filter;
@ -1010,6 +1010,27 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
scheduleRetryConnectFixedTimeout(timeout); scheduleRetryConnectFixedTimeout(timeout);
} }
// To be called by the topology update
// This logic will be updated on the cluster connection
protected void nodeUP(TopologyMember member, boolean last) {
ClientSessionInternal sessionToUse = session;
RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
if (member != null && this.targetNodeID != null && this.targetNodeID.equals(member.getNodeId())) {
// this could be an update of the topology say after a backup started
BridgeImpl.this.targetNode = member;
} else {
// we don't need synchronization here, but we need to make sure we won't get a NPE on races
if (connectionToUse != null && member.isMember(connectionToUse)) {
this.targetNode = member;
this.targetNodeID = member.getNodeId();
}
}
}
// Inner classes ------------------------------------------------- // Inner classes -------------------------------------------------
protected void scheduleRetryConnectFixedTimeout(final long milliseconds) { protected void scheduleRetryConnectFixedTimeout(final long milliseconds) {
@ -1159,27 +1180,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// ClusterListener // ClusterListener
@Override @Override
public void nodeUP(TopologyMember member, boolean last) { public void nodeUP(TopologyMember member, boolean last) {
if (BridgeImpl.this.queue.isInternalQueue() && member != null && BridgeImpl.this.targetNodeID != null && !BridgeImpl.this.targetNodeID.equals(member.getNodeId())) { BridgeImpl.this.nodeUP(member, last);
//A ClusterConnectionBridge (identified by holding an internal queue)
//never re-connects to another node here. It only connects to its original
//target node (from the ClusterConnection) or its backups. That's why
//we put a return here.
return;
}
ClientSessionInternal sessionToUse = session;
RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
if (member != null && BridgeImpl.this.targetNodeID != null && BridgeImpl.this.targetNodeID.equals(member.getNodeId())) {
// this could be an update of the topology say after a backup started
BridgeImpl.this.targetNode = member;
} else {
// we don't need synchronization here, but we need to make sure we won't get a NPE on races
if (connectionToUse != null && member.isMember(connectionToUse)) {
BridgeImpl.this.targetNode = member;
BridgeImpl.this.targetNodeID = member.getNodeId();
}
}
} }
@Override @Override

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
@ -352,6 +353,20 @@ public class ClusterConnectionBridge extends BridgeImpl {
return filterString; return filterString;
} }
@Override
protected void nodeUP(TopologyMember member, boolean last) {
if (member != null && targetNodeID != null && !this.targetNodeID.equals(member.getNodeId())) {
//A ClusterConnectionBridge (identified by holding an internal queue)
//never re-connects to another node here. It only connects to its original
//target node (from the ClusterConnection) or its backups. That's why
//we put a return here.
return;
}
super.nodeUP(member, last);
}
@Override @Override
protected void afterConnect() throws Exception { protected void afterConnect() throws Exception {
super.afterConnect(); super.afterConnect();