ARTEMIS-2868 Protect Topology Updates from Split Brain
This commit is contained in:
parent
9842f45a49
commit
27cb9b37b1
|
@ -445,6 +445,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
|
||||
int pos = loadBalancingPolicy.select(initialConnectors.length);
|
||||
|
||||
if (initialConnectors.length == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new Pair(initialConnectors[pos], null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,8 @@ public final class Topology {
|
|||
*/
|
||||
private volatile Object owner;
|
||||
|
||||
private final TopologyManager manager;
|
||||
|
||||
/**
|
||||
* topology describes the other cluster nodes that this server knows about:
|
||||
*
|
||||
|
@ -80,6 +82,11 @@ public final class Topology {
|
|||
}
|
||||
this.executor = executor;
|
||||
this.owner = owner;
|
||||
if (owner instanceof TopologyManager) {
|
||||
manager = (TopologyManager)owner;
|
||||
} else {
|
||||
manager = null;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception("trace"));
|
||||
}
|
||||
|
@ -196,6 +203,11 @@ public final class Topology {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (manager != null && !manager.updateMember(uniqueEventID, nodeId, memberInput)) {
|
||||
logger.debugf("TopologyManager rejected the update towards %s", memberInput);
|
||||
return false;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
TopologyMemberImpl currentMember = topology.get(nodeId);
|
||||
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.client.impl;
|
||||
|
||||
public interface TopologyManager {
|
||||
boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImpl memberInput);
|
||||
}
|
|
@ -323,6 +323,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
|
|||
}
|
||||
});
|
||||
} catch (RejectedExecutionException ignored) {
|
||||
logger.debug(ignored.getMessage(), ignored);
|
||||
// this could happen during a shutdown and we don't care, if we lost a nodeDown during a shutdown
|
||||
// what can we do anyways?
|
||||
}
|
||||
|
|
|
@ -1700,6 +1700,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
@Message(id = 222292, value = "The metrics-plugin element is ignored because the metrics element is defined", format = Message.Format.MESSAGE_FORMAT)
|
||||
void metricsPluginElementIgnored();
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN) // I really want emphasis on this logger, so adding the stars
|
||||
@Message(id = 222294, value = "\n**************************************************************************************************************************************************************************************************************************************************************\n" +
|
||||
"There is a possible split brain on nodeID {0}, coming from connectors {1}. Topology update ignored.\n" +
|
||||
"**************************************************************************************************************************************************************************************************************************************************************", format = Message.Format.MESSAGE_FORMAT)
|
||||
void possibleSplitBrain(String nodeID, String connectionPairInformation);
|
||||
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||
|
|
|
@ -88,11 +88,15 @@ public class BackupManager implements ActiveMQComponent {
|
|||
return;
|
||||
//deploy the backup connectors using the cluster configuration
|
||||
for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations()) {
|
||||
logger.debug("deploy backup config " + config);
|
||||
deployBackupConnector(config);
|
||||
}
|
||||
//start each connector and if we are backup and shared store announce ourselves. NB with replication we don't do this
|
||||
//as we wait for replication to start and be notified by the replication manager.
|
||||
for (BackupConnector conn : backupConnectors) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debugf("****** BackupManager connecting to %s", conn);
|
||||
}
|
||||
conn.start();
|
||||
if (server.getHAPolicy().isBackup() && server.getHAPolicy().isSharedStore()) {
|
||||
conn.informTopology();
|
||||
|
@ -192,6 +196,11 @@ public class BackupManager implements ActiveMQComponent {
|
|||
private boolean announcingBackup;
|
||||
private boolean backupAnnounced = false;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BackupConnector{" + "name='" + name + '\'' + ", connector=" + connector + '}';
|
||||
}
|
||||
|
||||
private BackupConnector(String name,
|
||||
TransportConfiguration connector,
|
||||
long retryInterval,
|
||||
|
|
|
@ -47,6 +47,13 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
|
|||
|
||||
void removeClusterTopologyListener(ClusterTopologyListener listener);
|
||||
|
||||
/**
|
||||
* This is needed on replication, however we don't need it on shared storage.
|
||||
* */
|
||||
void setSplitBrainDetection(boolean splitBrainDetection);
|
||||
|
||||
boolean isSplitBrainDetection();
|
||||
|
||||
/**
|
||||
* Only used for tests?
|
||||
*
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
|
|||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
||||
import org.apache.activemq.artemis.core.client.impl.Topology;
|
||||
import org.apache.activemq.artemis.core.client.impl.TopologyManager;
|
||||
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
|
@ -74,7 +75,7 @@ import org.apache.activemq.artemis.utils.FutureLatch;
|
|||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public final class ClusterConnectionImpl implements ClusterConnection, AfterConnectInternalListener {
|
||||
public final class ClusterConnectionImpl implements ClusterConnection, AfterConnectInternalListener, TopologyManager {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ClusterConnectionImpl.class);
|
||||
|
||||
|
@ -174,6 +175,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
private final String storeAndForwardPrefix;
|
||||
|
||||
private boolean splitBrainDetection;
|
||||
|
||||
public ClusterConnectionImpl(final ClusterManager manager,
|
||||
final TransportConfiguration[] staticTranspConfigs,
|
||||
final TransportConfiguration connector,
|
||||
|
@ -508,6 +511,37 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
}
|
||||
}
|
||||
|
||||
/** This is the implementation of TopologyManager. It is used to reject eventual updates from a split brain server.
|
||||
*
|
||||
* @param uniqueEventID
|
||||
* @param nodeId
|
||||
* @param memberInput
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImpl memberInput) {
|
||||
if (splitBrainDetection && nodeId.equals(nodeManager.getNodeId().toString())) {
|
||||
TopologyMemberImpl member = topology.getMember(nodeId);
|
||||
if (member != null) {
|
||||
if (member.getLive() != null && memberInput.getLive() != null && !member.getLive().isSameParams(connector)) {
|
||||
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId, memberInput.toString());
|
||||
}
|
||||
}
|
||||
memberInput.setLive(connector);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSplitBrainDetection(boolean splitBrainDetection) {
|
||||
this.splitBrainDetection = splitBrainDetection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSplitBrainDetection() {
|
||||
return splitBrainDetection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnection(ClientSessionFactoryInternal sf) {
|
||||
TopologyMember localMember = getLocalMember();
|
||||
|
|
|
@ -141,6 +141,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
|
|||
import org.apache.activemq.artemis.core.server.ServiceComponent;
|
||||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
|
||||
import org.apache.activemq.artemis.core.server.federation.FederationManager;
|
||||
|
@ -3080,8 +3081,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
return fileStoreMonitor;
|
||||
}
|
||||
|
||||
public void completeActivation() throws Exception {
|
||||
public void completeActivation(boolean replicated) throws Exception {
|
||||
setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
|
||||
if (replicated) {
|
||||
if (getClusterManager() != null) {
|
||||
for (ClusterConnection clusterConnection : getClusterManager().getClusterConnections()) {
|
||||
// we need to avoid split brain on topology for replication
|
||||
clusterConnection.setSplitBrainDetection(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
getRemotingService().startAcceptors();
|
||||
activationLatch.countDown();
|
||||
callActivationCompleteCallbacks();
|
||||
|
|
|
@ -75,7 +75,7 @@ public class LiveOnlyActivation extends Activation {
|
|||
|
||||
activeMQServer.initialisePart2(false);
|
||||
|
||||
activeMQServer.completeActivation();
|
||||
activeMQServer.completeActivation(false);
|
||||
|
||||
if (activeMQServer.getIdentity() != null) {
|
||||
ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
|
||||
|
|
|
@ -126,9 +126,7 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
return;
|
||||
}
|
||||
|
||||
logger.trace("Waiting for a synchronize now...");
|
||||
synchronized (this) {
|
||||
logger.trace("Entered a synchronized");
|
||||
if (closed)
|
||||
return;
|
||||
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait(), attemptFailBack);
|
||||
|
@ -154,24 +152,16 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
|
||||
clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));
|
||||
|
||||
// nodeManager.startBackup();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Starting backup manager");
|
||||
}
|
||||
logger.debug("Starting backup manager");
|
||||
activeMQServer.getBackupManager().start();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Set backup Quorum");
|
||||
}
|
||||
logger.debug("Set backup Quorum");
|
||||
replicationEndpoint.setBackupQuorum(backupQuorum);
|
||||
|
||||
replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
|
||||
EndpointConnector endpointConnector = new EndpointConnector();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Starting Backup Server");
|
||||
}
|
||||
|
||||
logger.debug("Starting Backup Server");
|
||||
ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(), activeMQServer.getNodeManager().getNodeId());
|
||||
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
|
||||
|
||||
|
@ -180,11 +170,8 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
|
||||
SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
|
||||
do {
|
||||
|
||||
if (closed) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Activation is closed, so giving up");
|
||||
}
|
||||
logger.debug("Activation is closed, so giving up");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -195,15 +182,13 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
nodeLocator.locateNode();
|
||||
Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();
|
||||
nodeID = nodeLocator.getNodeID();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("nodeID = " + nodeID);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Connecting towards a possible live, connection information=" + possibleLive + ", nodeID=" + nodeID);
|
||||
}
|
||||
|
||||
//in a normal (non failback) scenario if we couldn't find our live server we should fail
|
||||
if (!attemptFailBack) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("attemptFailback=false, nodeID=" + nodeID);
|
||||
}
|
||||
logger.debug("attemptFailback=false, nodeID=" + nodeID);
|
||||
|
||||
//this shouldn't happen
|
||||
if (nodeID == null) {
|
||||
|
@ -348,7 +333,7 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
|
||||
logger.trace("completeActivation at the end");
|
||||
|
||||
activeMQServer.completeActivation();
|
||||
activeMQServer.completeActivation(true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
|
|
@ -98,6 +98,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// Tell cluster connections to not accept split brains updates on the topology
|
||||
if (replicatedPolicy.isCheckForLiveServer() && isNodeIdUsed()) {
|
||||
//set for when we failback
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -116,7 +117,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
|||
|
||||
activeMQServer.initialisePart2(false);
|
||||
|
||||
activeMQServer.completeActivation();
|
||||
activeMQServer.completeActivation(true);
|
||||
|
||||
if (activeMQServer.getIdentity() != null) {
|
||||
ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
|
||||
|
|
|
@ -89,7 +89,7 @@ public final class SharedStoreBackupActivation extends Activation {
|
|||
|
||||
activeMQServer.initialisePart2(scalingDown);
|
||||
|
||||
activeMQServer.completeActivation();
|
||||
activeMQServer.completeActivation(false);
|
||||
|
||||
if (scalingDown) {
|
||||
ActiveMQServerLogger.LOGGER.backupServerScaledDown();
|
||||
|
|
|
@ -82,7 +82,7 @@ public final class SharedStoreLiveActivation extends LiveActivation {
|
|||
|
||||
activeMQServer.initialisePart2(false);
|
||||
|
||||
activeMQServer.completeActivation();
|
||||
activeMQServer.completeActivation(false);
|
||||
|
||||
ActiveMQServerLogger.LOGGER.serverIsLive();
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -650,8 +650,83 @@ public class DNSSwitchTest extends SmokeTestBase {
|
|||
|
||||
Assert.assertTrue(ok);
|
||||
|
||||
//connectAndWaitBackup();
|
||||
} finally {
|
||||
if (serverBackup != null) {
|
||||
serverBackup.destroyForcibly();
|
||||
}
|
||||
if (serverLive != null) {
|
||||
serverLive.destroyForcibly();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testWithoutPing() throws Throwable {
|
||||
spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
|
||||
}
|
||||
|
||||
public static void testWithoutPing(String[] args) throws Throwable {
|
||||
NetUtil.netUp(FIRST_IP, "lo:first");
|
||||
NetUtil.netUp(SECOND_IP, "lo:second");
|
||||
|
||||
// notice there's no THIRD_IP anywhere
|
||||
saveConf(hostsFile, FIRST_IP, "FIRST", SECOND_IP, "SECOND");
|
||||
|
||||
Process serverLive = null;
|
||||
Process serverBackup = null;
|
||||
|
||||
try {
|
||||
serverLive = ServerUtil.startServer(args[1], "live", "tcp://FIRST:61616", 0);
|
||||
ActiveMQServerControl liveControl = getServerControl(liveURI, liveNameBuilder, 20_000);
|
||||
|
||||
Wait.assertTrue(liveControl::isStarted);
|
||||
|
||||
// notice the first server does not know about this server at all
|
||||
serverBackup = ServerUtil.startServer(args[2], "backup", "tcp://SECOND:61716", 0);
|
||||
ActiveMQServerControl backupControl = getServerControl(backupURI, backupNameBuilder, 20_000);
|
||||
|
||||
Wait.assertTrue(backupControl::isStarted);
|
||||
Wait.assertTrue(backupControl::isReplicaSync);
|
||||
|
||||
logger.debug("shutdown the Network now");
|
||||
|
||||
// this will remove all the DNS information
|
||||
// I need the pingers to stop responding.
|
||||
// That will only happen if I stop both devices on Linux.
|
||||
// On mac that works regardless
|
||||
NetUtil.netDown(FIRST_IP, "lo:first", false);
|
||||
NetUtil.netDown(SECOND_IP, "lo:second", false);
|
||||
saveConf(hostsFile);
|
||||
|
||||
Wait.assertTrue(backupControl::isActive);
|
||||
|
||||
logger.debug("Starting the network");
|
||||
|
||||
NetUtil.netUp(FIRST_IP, "lo:first");
|
||||
NetUtil.netUp(SECOND_IP, "lo:second");
|
||||
saveConf(hostsFile, FIRST_IP, "FIRST", SECOND_IP, "SECOND");
|
||||
|
||||
// I must wait some time for the backup to have a chance to retry here
|
||||
Thread.sleep(2000);
|
||||
|
||||
logger.debug("Going down now");
|
||||
|
||||
System.out.println("*******************************************************************************************************************************");
|
||||
System.out.println("Forcing backup down and restarting it");
|
||||
System.out.println("*******************************************************************************************************************************");
|
||||
|
||||
serverBackup.destroyForcibly();
|
||||
|
||||
cleanupData(SERVER_BACKUP);
|
||||
|
||||
serverBackup = ServerUtil.startServer(args[2], "backup", "tcp://SECOND:61716", 0);
|
||||
backupControl = getServerControl(backupURI, backupNameBuilder, 20_000);
|
||||
Wait.assertTrue(backupControl::isStarted);
|
||||
Wait.assertTrue(backupControl::isReplicaSync);
|
||||
} finally {
|
||||
if (serverBackup != null) {
|
||||
serverBackup.destroyForcibly();
|
||||
|
|
Loading…
Reference in New Issue