ARTEMIS-2868 Protect Topology Updates from Split Brain on broker shutdown as well
This commit is contained in:
parent
1945a1765d
commit
c551df770c
|
@ -24,6 +24,7 @@ import org.apache.activemq.artemis.dto.BrokerDTO;
|
|||
@Command(name = "stop", description = "stops the broker instance")
|
||||
public class Stop extends Configurable {
|
||||
|
||||
public static final String STOP_FILE_NAME = "STOP_ME";
|
||||
@Override
|
||||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
|
@ -31,7 +32,7 @@ public class Stop extends Configurable {
|
|||
|
||||
File file = broker.server.getConfigurationFile().getParentFile();
|
||||
|
||||
File stopFile = new File(file, "STOP_ME");
|
||||
File stopFile = new File(file, STOP_FILE_NAME);
|
||||
|
||||
stopFile.createNewFile();
|
||||
|
||||
|
|
|
@ -310,6 +310,12 @@ public final class Topology {
|
|||
boolean removeMember(final long uniqueEventID, final String nodeId) {
|
||||
TopologyMemberImpl member;
|
||||
|
||||
|
||||
if (manager != null && !manager.removeMember(uniqueEventID, nodeId)) {
|
||||
logger.debugf("TopologyManager rejected the update towards %s", nodeId);
|
||||
return false;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
member = topology.get(nodeId);
|
||||
if (member != null) {
|
||||
|
|
|
@ -19,4 +19,5 @@ package org.apache.activemq.artemis.core.client.impl;
|
|||
|
||||
public interface TopologyManager {
|
||||
boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImpl memberInput);
|
||||
boolean removeMember(long uniqueEventID, String nodeId);
|
||||
}
|
||||
|
|
|
@ -521,17 +521,29 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
@Override
|
||||
public boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImpl memberInput) {
|
||||
if (splitBrainDetection && nodeId.equals(nodeManager.getNodeId().toString())) {
|
||||
TopologyMemberImpl member = topology.getMember(nodeId);
|
||||
if (member != null) {
|
||||
if (member.getLive() != null && memberInput.getLive() != null && !member.getLive().isSameParams(connector)) {
|
||||
if (memberInput.getLive() != null && !memberInput.getLive().isSameParams(connector)) {
|
||||
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId, memberInput.toString());
|
||||
}
|
||||
}
|
||||
memberInput.setLive(connector);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* From topologyManager
|
||||
* @param uniqueEventID
|
||||
* @param nodeId
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public boolean removeMember(final long uniqueEventID, final String nodeId) {
|
||||
if (splitBrainDetection && nodeId.equals(nodeManager.getNodeId().toString())) {
|
||||
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId, nodeId);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSplitBrainDetection(boolean splitBrainDetection) {
|
||||
this.splitBrainDetection = splitBrainDetection;
|
||||
|
|
|
@ -18,12 +18,15 @@
|
|||
package org.apache.activemq.artemis.tests.smoke.common;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.cli.commands.Stop;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.util.ServerUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
||||
public class SmokeTestBase extends ActiveMQTestBase {
|
||||
Set<Process> processes = new HashSet<>();
|
||||
|
@ -51,6 +54,13 @@ public class SmokeTestBase extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
protected static void stopServerWithFile(String serverLocation) throws IOException {
|
||||
File serverPlace = new File(serverLocation);
|
||||
File etcPlace = new File(serverPlace, "etc");
|
||||
File stopMe = new File(etcPlace, Stop.STOP_FILE_NAME);
|
||||
Assert.assertTrue(stopMe.createNewFile());
|
||||
}
|
||||
|
||||
public static String getServerLocation(String serverName) {
|
||||
return basedir + "/target/" + serverName;
|
||||
}
|
||||
|
|
|
@ -663,13 +663,25 @@ public class DNSSwitchTest extends SmokeTestBase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testWithoutPing() throws Throwable {
|
||||
spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP));
|
||||
public void testWithoutPingKill() throws Throwable {
|
||||
spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithoutPingRestart() throws Throwable {
|
||||
spawnRun(serverLocation, "testWithoutPing", getServerLocation(SERVER_LIVE), getServerLocation(SERVER_BACKUP), "0");
|
||||
}
|
||||
/**
|
||||
* arg[0] = constant "testWithoutPing" to be used on reflection through main(String arg[])
|
||||
* arg[1] = serverlive
|
||||
* arg[2] = server backup
|
||||
* arg[3] = 1 | 0 (kill the backup = 1, stop the backup = 0);
|
||||
* @param args
|
||||
* @throws Throwable
|
||||
*/
|
||||
public static void testWithoutPing(String[] args) throws Throwable {
|
||||
boolean killTheBackup = Integer.parseInt(args[3]) == 1;
|
||||
NetUtil.netUp(FIRST_IP, "lo:first");
|
||||
NetUtil.netUp(SECOND_IP, "lo:second");
|
||||
|
||||
|
@ -719,7 +731,13 @@ public class DNSSwitchTest extends SmokeTestBase {
|
|||
System.out.println("Forcing backup down and restarting it");
|
||||
System.out.println("*******************************************************************************************************************************");
|
||||
|
||||
if (killTheBackup) {
|
||||
serverBackup.destroyForcibly();
|
||||
} else {
|
||||
String serverLocation = args[2];
|
||||
stopServerWithFile(serverLocation);
|
||||
Assert.assertTrue(serverBackup.waitFor(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
cleanupData(SERVER_BACKUP);
|
||||
|
||||
|
@ -740,6 +758,7 @@ public class DNSSwitchTest extends SmokeTestBase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
private static void connectAndWaitBackup() throws Exception {
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://FIRST:61616?ha=true");
|
||||
Assert.assertTrue(connectionFactory.getServerLocator().isHA());
|
||||
|
|
Loading…
Reference in New Issue