ARTEMIS-2713 Added ReplicatedMultipleFailbackTest

This commit is contained in:
Francesco Nigro 2020-04-08 16:56:14 +02:00 committed by Clebert Suconic
parent ae33b771fa
commit e4c5ea719f
14 changed files with 1114 additions and 68 deletions

View File

@ -101,6 +101,9 @@ public class Wait {
assertTrue(DEFAULT_FAILURE_MESSAGE, () -> !condition.isSatisfied(), duration, sleep);
}
public static void assertTrue(Condition condition, final long duration) {
assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, SLEEP_MILLIS);
}
public static void assertTrue(String failureMessage, Condition condition) {
assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);

View File

@ -18,23 +18,23 @@ package org.apache.activemq.artemis.core.server.cluster.qourum;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.jboss.logging.Logger;
public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {
private TransportConfiguration liveTransportConfiguration;
private static final Logger LOGGER = Logger.getLogger(SharedNothingBackupQuorum.class);
public enum BACKUP_ACTIVATION {
FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP;
@ -46,7 +46,6 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private final NodeManager nodeManager;
private final StorageManager storageManager;
private final ScheduledExecutorService scheduledPool;
private final int quorumSize;
@ -56,8 +55,6 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private final Object voteGuard = new Object();
private CountDownLatch latch;
private ClientSessionFactoryInternal sessionFactory;
private CoreRemotingConnection connection;
@ -67,6 +64,14 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private volatile boolean stopped = false;
private final int quorumVoteWait;
private volatile BACKUP_ACTIVATION signal;
private ScheduledFuture<?> decisionGuard;
private CountDownLatch latch;
private final Object onConnectionFailureGuard = new Object();
/**
* This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage}
* with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with
@ -76,15 +81,13 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
*/
public static final int WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG = 60;
public SharedNothingBackupQuorum(StorageManager storageManager,
NodeManager nodeManager,
public SharedNothingBackupQuorum(NodeManager nodeManager,
ScheduledExecutorService scheduledPool,
NetworkHealthCheck networkHealthCheck,
int quorumSize,
int voteRetries,
long voteRetryWait,
int quorumVoteWait) {
this.storageManager = storageManager;
this.scheduledPool = scheduledPool;
this.quorumSize = quorumSize;
this.latch = new CountDownLatch(1);
@ -95,29 +98,24 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
this.quorumVoteWait = quorumVoteWait;
}
private volatile BACKUP_ACTIVATION signal;
/**
* safety parameter to make _sure_ we get out of await()
*/
private static final int LATCH_TIMEOUT = 30;
private final Object decisionGuard = new Object();
@Override
public String getName() {
return "SharedNothingBackupQuorum";
}
public void decideOnAction(Topology topology) {
//we may get called via multiple paths so need to guard
synchronized (decisionGuard) {
private void onConnectionFailure() {
//we may get called as sessionFactory or connection listener
synchronized (onConnectionFailureGuard) {
if (signal == BACKUP_ACTIVATION.FAIL_OVER) {
LOGGER.debug("Replication connection failure with signal == FAIL_OVER: no need to take any action");
if (networkHealthCheck != null && !networkHealthCheck.check()) {
signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
}
return;
}
//given that we're going to latch.countDown(), there is no need to await any
//scheduled task to complete
stopForcedFailoverAfterDelay();
if (!isLiveDown()) {
//lost connection but don't know if live is down so restart as backup as we can't replicate any more
ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
@ -140,16 +138,13 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
}
}
latch.countDown();
}
latch.countDown();
}
public void liveIDSet(String liveID) {
targetServerID = liveID;
nodeManager.setNodeID(liveID);
liveTransportConfiguration = quorumManager.getLiveTransportConfiguration(targetServerID);
//now we are replicating we can start waiting for disconnect notifications so we can fail over
// sessionFactory.addFailureListener(this);
}
@Override
@ -166,14 +161,12 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
*/
@Override
public void nodeDown(Topology topology, long eventUID, String nodeID) {
if (targetServerID.equals(nodeID)) {
decideOnAction(topology);
}
//noop: we are NOT interested on topology info coming from connections != this.connection
}
@Override
public void nodeUp(Topology topology) {
//noop
//noop: we are NOT interested on topology info coming from connections != this.connection
}
/**
@ -181,7 +174,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
*/
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
decideOnAction(sessionFactory.getServerLocator().getTopology());
onConnectionFailure();
}
@Override
@ -204,10 +197,10 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
*/
public void setSessionFactory(final ClientSessionFactoryInternal sessionFactory) {
this.sessionFactory = sessionFactory;
this.connection = (CoreRemotingConnection) sessionFactory.getConnection();
connection.addFailureListener(this);
//belts and braces, there are circumstances where the connection listener doesn't get called but the session does.
sessionFactory.addFailureListener(this);
this.sessionFactory.addFailureListener(this);
connection = (CoreRemotingConnection) sessionFactory.getConnection();
connection.addFailureListener(this);
}
/**
@ -217,20 +210,19 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
* backup that it should fail-over.
*/
public synchronized void failOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) {
removeListener();
removeListeners();
signal = BACKUP_ACTIVATION.FAIL_OVER;
if (finalMessage == ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER) {
latch.countDown();
}
if (finalMessage == ReplicationLiveIsStoppingMessage.LiveStopping.STOP_CALLED) {
final CountDownLatch localLatch = latch;
scheduledPool.schedule(new Runnable() {
@Override
public void run() {
localLatch.countDown();
}
switch (finalMessage) {
}, WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS);
case STOP_CALLED:
scheduleForcedFailoverAfterDelay(latch);
break;
case FAIL_OVER:
stopForcedFailoverAfterDelay();
latch.countDown();
break;
default:
LOGGER.errorf("unsupported LiveStopping type: %s", finalMessage);
}
}
@ -244,10 +236,13 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
latch.countDown();
}
private void removeListener() {
private void removeListeners() {
if (connection != null) {
connection.removeFailureListener(this);
}
if (sessionFactory != null) {
sessionFactory.removeFailureListener(this);
}
}
/**
@ -271,13 +266,38 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
* @param explicitSignal the state we want to set the quorum manager to return
*/
public synchronized void causeExit(BACKUP_ACTIVATION explicitSignal) {
stopForcedFailoverAfterDelay();
stopped = true;
removeListener();
removeListeners();
this.signal = explicitSignal;
latch.countDown();
}
private synchronized void scheduleForcedFailoverAfterDelay(CountDownLatch signalChanged) {
if (decisionGuard != null) {
if (decisionGuard.isDone()) {
LOGGER.warn("A completed force failover task wasn't cleaned-up: a new one will be scheduled");
} else if (!decisionGuard.cancel(false)) {
LOGGER.warn("Failed to cancel an existing uncompleted force failover task: a new one will be scheduled anyway");
} else {
LOGGER.warn("Cancelled an existing uncompleted force failover task: a new one will be scheduled in its place");
}
}
decisionGuard = scheduledPool.schedule(signalChanged::countDown,
WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS);
}
private synchronized boolean stopForcedFailoverAfterDelay() {
if (decisionGuard == null) {
return false;
}
final boolean stopped = decisionGuard.cancel(false);
decisionGuard = null;
return stopped;
}
public synchronized void reset() {
stopForcedFailoverAfterDelay();
latch = new CountDownLatch(1);
}
@ -287,13 +307,21 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
* @return the voting decision
*/
private boolean isLiveDown() {
//lets assume live is not down
Boolean decision = false;
int voteAttempts = 0;
int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;
// lets assume live is not down
if (stopped) {
return false;
}
final int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;
synchronized (voteGuard) {
while (!stopped && voteAttempts++ < voteRetries) {
for (int voteAttempts = 0; voteAttempts < voteRetries && !stopped; voteAttempts++) {
if (voteAttempts > 0) {
try {
voteGuard.wait(voteRetryWait);
} catch (InterruptedException e) {
//nothing to do here
}
}
//the live is dead so lets vote for quorum
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);
@ -308,19 +336,11 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
quorumManager.voteComplete(quorumVote);
decision = quorumVote.getDecision();
if (decision) {
return decision;
}
try {
voteGuard.wait(voteRetryWait);
} catch (InterruptedException e) {
//nothing to do here
if (quorumVote.getDecision()) {
return true;
}
}
}
return decision;
return false;
}
}

View File

@ -131,7 +131,7 @@ public final class SharedNothingBackupActivation extends Activation {
logger.trace("Entered a synchronized");
if (closed)
return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait());
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait());
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
}

View File

@ -307,6 +307,88 @@
</args>
</configuration>
</execution>
<!-- START JmxReplicatedMultipleFailbackTest -->
<execution>
<phase>test-compile</phase>
<id>create-replicated-failback-master1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<configuration>${basedir}/target/classes/servers/replicated-failback-master1</configuration>
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<instance>${basedir}/target/replicated-failback-master1</instance>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-replicated-failback-master2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<configuration>${basedir}/target/classes/servers/replicated-failback-master2</configuration>
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<instance>${basedir}/target/replicated-failback-master2</instance>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-replicated-failback-master3</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<configuration>${basedir}/target/classes/servers/replicated-failback-master3</configuration>
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<instance>${basedir}/target/replicated-failback-master3</instance>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-replicated-failback-slave1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<configuration>${basedir}/target/classes/servers/replicated-failback-slave1</configuration>
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<instance>${basedir}/target/replicated-failback-slave1</instance>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
</args>
</configuration>
</execution>
<!-- END JmxReplicatedMultipleFailbackTest -->
<execution>
<phase>test-compile</phase>
<id>create-paging</id>

View File

@ -0,0 +1,137 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<name>master1</name>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<ha-policy>
<replication>
<master>
<group-name>a</group-name>
<check-for-live-server>true</check-for-live-server>
<vote-on-replication-failure>true</vote-on-replication-failure>
</master>
</replication>
</ha-policy>
<connectors>
<!-- Connector used to be announced through cluster connections and notifications -->
<connector name="artemis">tcp://localhost:61616</connector>
<connector name="master2">tcp://localhost:61716</connector>
<connector name="master3">tcp://localhost:61816</connector>
<connector name="slave1">tcp://localhost:61916</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="artemis">tcp://localhost:61616</acceptor>
</acceptors>
<cluster-user>admin</cluster-user>
<cluster-password>password</cluster-password>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>artemis</connector-ref>
<message-load-balancing>OFF</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>slave1</connector-ref>
<connector-ref>master2</connector-ref>
<connector-ref>master3</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq, guest"/>
<permission type="deleteNonDurableQueue" roles="amq, guest"/>
<permission type="createDurableQueue" roles="amq, guest"/>
<permission type="deleteDurableQueue" roles="amq, guest"/>
<permission type="createAddress" roles="amq, guest"/>
<permission type="deleteAddress" roles="amq, guest"/>
<permission type="consume" roles="amq, guest"/>
<permission type="browse" roles="amq, guest"/>
<permission type="send" roles="amq, guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>10MB</max-size-bytes>
<page-size-bytes>1MB</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="exampleTopic">
<multicast>
</multicast>
</address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<management-context xmlns="http://activemq.org/schema">
<connector connector-port="10099" connector-host="localhost"/>
</management-context>

View File

@ -0,0 +1,138 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<name>master2</name>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<ha-policy>
<replication>
<master>
<group-name>b</group-name>
<check-for-live-server>true</check-for-live-server>
<vote-on-replication-failure>true</vote-on-replication-failure>
</master>
</replication>
</ha-policy>
<connectors>
<!-- Connector used to be announced through cluster connections and notifications -->
<connector name="artemis">tcp://localhost:61716</connector>
<connector name="master1">tcp://localhost:61616</connector>
<connector name="master3">tcp://localhost:61816</connector>
<connector name="slave1">tcp://localhost:61816</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="artemis">tcp://localhost:61716</acceptor>
</acceptors>
<cluster-user>admin</cluster-user>
<cluster-password>password</cluster-password>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>artemis</connector-ref>
<message-load-balancing>OFF</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>master3</connector-ref>
<connector-ref>master1</connector-ref>
<connector-ref>slave1</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq, guest"/>
<permission type="deleteNonDurableQueue" roles="amq, guest"/>
<permission type="createDurableQueue" roles="amq, guest"/>
<permission type="deleteDurableQueue" roles="amq, guest"/>
<permission type="createAddress" roles="amq, guest"/>
<permission type="deleteAddress" roles="amq, guest"/>
<permission type="consume" roles="amq, guest"/>
<permission type="browse" roles="amq, guest"/>
<permission type="send" roles="amq, guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>10MB</max-size-bytes>
<page-size-bytes>1MB</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="exampleTopic">
<multicast>
</multicast>
</address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<management-context xmlns="http://activemq.org/schema">
<connector connector-port="10199" connector-host="localhost"/>
</management-context>

View File

@ -0,0 +1,138 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<name>master3</name>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<ha-policy>
<replication>
<master>
<group-name>c</group-name>
<check-for-live-server>true</check-for-live-server>
<vote-on-replication-failure>true</vote-on-replication-failure>
</master>
</replication>
</ha-policy>
<connectors>
<!-- Connector used to be announced through cluster connections and notifications -->
<connector name="artemis">tcp://localhost:61816</connector>
<connector name="master1">tcp://localhost:61616</connector>
<connector name="master2">tcp://localhost:61716</connector>
<connector name="slave1">tcp://localhost:61916</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="artemis">tcp://localhost:61816</acceptor>
</acceptors>
<cluster-user>admin</cluster-user>
<cluster-password>password</cluster-password>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>artemis</connector-ref>
<message-load-balancing>OFF</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>master2</connector-ref>
<connector-ref>master1</connector-ref>
<connector-ref>slave1</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq, guest"/>
<permission type="deleteNonDurableQueue" roles="amq, guest"/>
<permission type="createDurableQueue" roles="amq, guest"/>
<permission type="deleteDurableQueue" roles="amq, guest"/>
<permission type="createAddress" roles="amq, guest"/>
<permission type="deleteAddress" roles="amq, guest"/>
<permission type="consume" roles="amq, guest"/>
<permission type="browse" roles="amq, guest"/>
<permission type="send" roles="amq, guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>10MB</max-size-bytes>
<page-size-bytes>1MB</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="exampleTopic">
<multicast>
</multicast>
</address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<management-context xmlns="http://activemq.org/schema">
<connector connector-port="10299" connector-host="localhost"/>
</management-context>

View File

@ -0,0 +1,137 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<name>slave1</name>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<ha-policy>
<replication>
<slave>
<group-name>a</group-name>
<allow-failback>true</allow-failback>
</slave>
</replication>
</ha-policy>
<connectors>
<!-- Connector used to be announced through cluster connections and notifications -->
<connector name="artemis">tcp://localhost:61916</connector>
<connector name="master1">tcp://localhost:61616</connector>
<connector name="master2">tcp://localhost:61716</connector>
<connector name="master3">tcp://localhost:61816</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="artemis">tcp://localhost:61916</acceptor>
</acceptors>
<cluster-user>admin</cluster-user>
<cluster-password>password</cluster-password>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>artemis</connector-ref>
<message-load-balancing>OFF</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>master1</connector-ref>
<connector-ref>master2</connector-ref>
<connector-ref>master3</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq, guest"/>
<permission type="deleteNonDurableQueue" roles="amq, guest"/>
<permission type="createDurableQueue" roles="amq, guest"/>
<permission type="deleteDurableQueue" roles="amq, guest"/>
<permission type="createAddress" roles="amq, guest"/>
<permission type="deleteAddress" roles="amq, guest"/>
<permission type="consume" roles="amq, guest"/>
<permission type="browse" roles="amq, guest"/>
<permission type="send" roles="amq, guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>10MB</max-size-bytes>
<page-size-bytes>1MB</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="exampleTopic">
<multicast>
</multicast>
</address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<management-context xmlns="http://activemq.org/schema">
<connector connector-port="10399" connector-host="localhost"/>
</management-context>

View File

@ -18,14 +18,15 @@
package org.apache.activemq.artemis.tests.smoke.common;
import java.io.File;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.util.ServerUtil;
import org.junit.After;
public class SmokeTestBase extends ActiveMQTestBase {
ArrayList<Process> processes = new ArrayList();
Set<Process> processes = new HashSet<>();
public static final String basedir = System.getProperty("basedir");
@ -38,13 +39,23 @@ public class SmokeTestBase extends ActiveMQTestBase {
e.printStackTrace();
}
}
processes.clear();
}
public String getServerLocation(String serverName) {
public void killServer(Process process) {
processes.remove(process);
try {
ServerUtil.killServer(process);
} catch (Throwable e) {
e.printStackTrace();
}
}
public static String getServerLocation(String serverName) {
return basedir + "/target/" + serverName;
}
public void cleanupData(String serverName) {
public static void cleanupData(String serverName) {
String location = getServerLocation(serverName);
deleteDirectory(new File(location, "data"));
}

View File

@ -0,0 +1,300 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.jmxmultiplefailback;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.StringReader;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ReplicatedMultipleFailbackTest extends SmokeTestBase {
private static final Logger LOGGER = Logger.getLogger(ReplicatedMultipleFailbackTest.class);
@FunctionalInterface
interface ThrowableFunction<T, R> {
R apply(T t) throws Throwable;
}
private static <C, T> Optional<T> queryControl(JMXServiceURL serviceURI,
ObjectName objectName,
ThrowableFunction<C, T> queryControl,
Class<C> controlClass,
Function<Throwable, T> onThrowable) {
try {
try (JMXConnector jmx = JMXConnectorFactory.connect(serviceURI)) {
final C control = MBeanServerInvocationHandler.newProxyInstance(jmx.getMBeanServerConnection(), objectName, controlClass, false);
return Optional.ofNullable(queryControl.apply(control));
}
} catch (Throwable t) {
return Optional.ofNullable(onThrowable.apply(t));
}
}
private static Optional<Boolean> isBackup(JMXServiceURL serviceURI, ObjectNameBuilder builder) throws Exception {
return queryControl(serviceURI, builder.getActiveMQServerObjectName(), ActiveMQServerControl::isBackup, ActiveMQServerControl.class, throwable -> null);
}
private static Optional<String> getNodeID(JMXServiceURL serviceURI, ObjectNameBuilder builder) throws Exception {
return queryControl(serviceURI, builder.getActiveMQServerObjectName(), ActiveMQServerControl::getNodeID, ActiveMQServerControl.class, throwable -> null);
}
private static Optional<String> listNetworkTopology(JMXServiceURL serviceURI,
ObjectNameBuilder builder) throws Exception {
return queryControl(serviceURI, builder.getActiveMQServerObjectName(), ActiveMQServerControl::listNetworkTopology, ActiveMQServerControl.class, throwable -> null);
}
private static Map<String, Pair<String, String>> decodeNetworkTopologyJson(String networkTopologyJson) {
if (networkTopologyJson == null || networkTopologyJson.isEmpty()) {
return Collections.emptyMap();
}
try (JsonReader jsonReader = Json.createReader(new StringReader(networkTopologyJson))) {
final JsonArray nodeIDs = jsonReader.readArray();
final int nodeCount = nodeIDs.size();
Map<String, Pair<String, String>> networkTopology = new HashMap<>(nodeCount);
for (int i = 0; i < nodeCount; i++) {
final JsonObject nodePair = nodeIDs.getJsonObject(i);
final String nodeID = nodePair.getString("nodeID");
final String live = nodePair.getString("live");
final String backup = nodePair.getString("backup", null);
networkTopology.put(nodeID, new Pair<>(live, backup));
}
return networkTopology;
}
}
private static long countMembers(Map<String, Pair<String, String>> networkTopology) {
final long count = networkTopology.values().stream()
.map(Pair::getA).filter(live -> live != null && !live.isEmpty())
.count();
return count;
}
private static long countNodes(Map<String, Pair<String, String>> networkTopology) {
final long count = networkTopology.values().stream()
.flatMap(pair -> Stream.of(pair.getA(), pair.getB()))
.filter(liveOrBackup -> liveOrBackup != null && !liveOrBackup.isEmpty())
.count();
return count;
}
private static boolean validateNetworkTopology(String networkTopologyJson,Predicate<Map<String, Pair<String, String>>> checkTopology) {
final Map<String, Pair<String, String>> networkTopology = decodeNetworkTopologyJson(networkTopologyJson);
return checkTopology.test(networkTopology);
}
private static String backupOf(String nodeID, Map<String, Pair<String, String>> networkTopology) {
return networkTopology.get(nodeID).getB();
}
private static String liveOf(String nodeID, Map<String, Pair<String, String>> networkTopology) {
return networkTopology.get(nodeID).getA();
}
private static Predicate<Map<String, Pair<String, String>>> containsExactNodeIds(String... nodeID) {
Objects.requireNonNull(nodeID);
return topology -> topology.size() == nodeID.length && Stream.of(nodeID).allMatch(topology::containsKey);
}
private static Predicate<Map<String, Pair<String, String>>> withMembers(int count) {
return topology -> countMembers(topology) == count;
}
private static Predicate<Map<String, Pair<String, String>>> withNodes(int count) {
return topology -> countNodes(topology) == count;
}
private static Predicate<Map<String, Pair<String, String>>> withBackup(String nodeId, Predicate<String> compare) {
return topology -> compare.test(backupOf(nodeId, topology));
}
private static Predicate<Map<String, Pair<String, String>>> withLive(String nodeId, Predicate<String> compare) {
return topology -> compare.test(liveOf(nodeId, topology));
}
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_PORT_MASTER_1 = 10099;
private static final int JMX_PORT_MASTER_2 = 10199;
private static final int JMX_PORT_MASTER_3 = 10299;
private static final int JMX_PORT_SLAVE_1 = 10399;
private static final String MASTER_1_DATA_FOLDER = "replicated-failback-master1";
private static final String MASTER_2_DATA_FOLDER = "replicated-failback-master2";
private static final String MASTER_3_DATA_FOLDER = "replicated-failback-master3";
private static final String SLAVE_1_DATA_FOLDER = "replicated-failback-slave1";
private static final int MASTER_1_PORT_ID = 0;
private static final int MASTER_2_PORT_ID = MASTER_1_PORT_ID + 100;
private static final int MASTER_3_PORT_ID = MASTER_2_PORT_ID + 100;
private static final int SLAVE_1_PORT_ID = MASTER_3_PORT_ID + 100;
private enum Broker {
master1(JMX_PORT_MASTER_1, MASTER_1_DATA_FOLDER, MASTER_1_PORT_ID), master2(JMX_PORT_MASTER_2, MASTER_2_DATA_FOLDER, MASTER_2_PORT_ID), master3(JMX_PORT_MASTER_3, MASTER_3_DATA_FOLDER, MASTER_3_PORT_ID), slave1(JMX_PORT_SLAVE_1, SLAVE_1_DATA_FOLDER, SLAVE_1_PORT_ID);
final ObjectNameBuilder objectNameBuilder;
final String dataFolder;
final JMXServiceURL jmxServiceURL;
final int portID;
Broker(int jmxPort, String dataFolder, int portID) {
this.portID = portID;
this.dataFolder = dataFolder;
try {
jmxServiceURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + jmxPort + "/jmxrmi");
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
this.objectNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), name(), true);
}
public Process startServer(SmokeTestBase env, int millisTimeout) throws Exception {
return env.startServer(dataFolder, portID, millisTimeout);
}
public void cleanupData() {
ReplicatedMultipleFailbackTest.cleanupData(dataFolder);
}
public Optional<Boolean> isBackup() throws Exception {
return ReplicatedMultipleFailbackTest.isBackup(jmxServiceURL, objectNameBuilder);
}
public Optional<String> getNodeID() throws Exception {
return ReplicatedMultipleFailbackTest.getNodeID(jmxServiceURL, objectNameBuilder);
}
public Optional<String> listNetworkTopology() throws Exception {
return ReplicatedMultipleFailbackTest.listNetworkTopology(jmxServiceURL, objectNameBuilder);
}
}
@Before
public void before() {
Stream.of(Broker.values()).forEach(Broker::cleanupData);
disableCheckThread();
}
@Test
public void testMultipleFailback() throws Exception {
LOGGER.infof("TEST BOOTSTRAPPING START: STARTING brokers %s", Arrays.toString(Broker.values()));
final int failbackRetries = 10;
final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
Process master1 = Broker.master1.startServer(this, timeout);
Wait.assertTrue(() -> !Broker.master1.isBackup().orElse(true), timeout);
Broker.master2.startServer(this, timeout);
Wait.assertTrue(() -> !Broker.master2.isBackup().orElse(true), timeout);
Broker.master3.startServer(this, timeout);
Wait.assertTrue(() -> !Broker.master3.isBackup().orElse(true), timeout);
Broker.slave1.startServer(this, 0);
Wait.assertTrue(() -> Broker.slave1.isBackup().orElse(false), timeout);
final String nodeIDlive1 = Broker.master1.getNodeID().get();
final String nodeIDlive2 = Broker.master2.getNodeID().get();
final String nodeIDlive3 = Broker.master3.getNodeID().get();
for (Broker broker : Broker.values()) {
LOGGER.infof("CHECKING NETWORK TOPOLOGY FOR %s", broker);
Wait.assertTrue(() -> validateNetworkTopology(broker.listNetworkTopology().orElse(""),
containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3)
.and(withLive(nodeIDlive1, Objects::nonNull))
.and(withBackup(nodeIDlive1, Objects::nonNull))
.and(withMembers(3))
.and(withNodes(4))), timeout);
}
final String urlSlave1 = backupOf(nodeIDlive1, decodeNetworkTopologyJson(Broker.slave1.listNetworkTopology().get()));
Assert.assertNotNull(urlSlave1);
final String urlMaster1 = liveOf(nodeIDlive1, decodeNetworkTopologyJson(Broker.master1.listNetworkTopology().get()));
Assert.assertNotNull(urlMaster1);
Assert.assertNotEquals(urlMaster1, urlSlave1);
LOGGER.infof("Node ID live 1 is %s", nodeIDlive1);
LOGGER.infof("Node ID live 2 is %s", nodeIDlive2);
LOGGER.infof("Node ID live 3 is %s", nodeIDlive3);
LOGGER.infof("%s has url: %s", Broker.master1, urlMaster1);
LOGGER.infof("%s has url: %s", Broker.slave1, urlSlave1);
LOGGER.info("BOOTSTRAPPING ENDED: READ nodeIds and master1/slave1 urls");
for (int i = 0; i < failbackRetries; i++) {
LOGGER.infof("START TEST %d", i + 1);
LOGGER.infof("KILLING master1");
killServer(master1);
// wait until slave1 became live
Wait.assertTrue(() -> !Broker.slave1.isBackup().orElse(true), timeout);
LOGGER.info("slave1 is LIVE");
LOGGER.info("VALIDATE TOPOLOGY OF ALIVE BROKERS");
Stream.of(Broker.master2, Broker.master3, Broker.slave1).forEach(
broker -> Wait.assertTrue(() -> validateNetworkTopology(broker.listNetworkTopology().orElse(""),
containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3)
.and(withLive(nodeIDlive1, urlSlave1::equals))
.and(withBackup(nodeIDlive1, Objects::isNull))
.and(withMembers(3))
.and(withNodes(3))), timeout)
);
// restart master1
LOGGER.info("STARTING master1");
master1 = Broker.master1.startServer(this, 0);
Wait.assertTrue(() -> Broker.slave1.isBackup().orElse(false), timeout);
LOGGER.info("slave1 is BACKUP");
Wait.assertTrue(() -> !Broker.master1.isBackup().orElse(true), timeout);
LOGGER.info("master1 is LIVE");
for (Broker broker : Broker.values()) {
LOGGER.infof("CHECKING NETWORK TOPOLOGY FOR %s", broker);
Wait.assertTrue(() -> validateNetworkTopology(broker.listNetworkTopology().orElse(""),
containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3)
.and(withLive(nodeIDlive1, urlMaster1::equals))
.and(withBackup(nodeIDlive1, urlSlave1::equals))
.and(withMembers(3))
.and(withNodes(4))), timeout);
}
}
LOGGER.info("TEST COMPLETED");
}
}