mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-28 14:29:31 +00:00
This closes #2078
This commit is contained in:
commit
512d675049
@ -220,12 +220,14 @@ public final class SharedNothingBackupActivation extends Activation {
|
||||
clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getA());
|
||||
} catch (Exception e) {
|
||||
logger.debug(e.getMessage(), e);
|
||||
if (possibleLive.getB() != null) {
|
||||
if (possibleLive != null && possibleLive.getB() != null) {
|
||||
try {
|
||||
clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB());
|
||||
} catch (Exception e1) {
|
||||
clusterControl = null;
|
||||
}
|
||||
} else {
|
||||
clusterControl = null;
|
||||
}
|
||||
}
|
||||
if (clusterControl == null) {
|
||||
|
@ -1137,7 +1137,6 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||
for (TopologyMemberImpl member : topology.getMembers()) {
|
||||
if (member.getLive() != null) {
|
||||
liveNodesCount++;
|
||||
ActiveMQServerLogger.LOGGER.info("Found live server connected to " + server.getNodeID());
|
||||
}
|
||||
if (member.getBackup() != null) {
|
||||
backupNodesCount++;
|
||||
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.junit.Wait;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
@ -226,22 +227,9 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase {
|
||||
protected void waitForDistribution(SimpleString address, ActiveMQServer server, int messageCount) throws Exception {
|
||||
ActiveMQServerLogger.LOGGER.debug("waiting for distribution of messages on server " + server);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
long timeout = 5000;
|
||||
|
||||
Queue q = (Queue) server.getPostOffice().getBinding(address).getBindable();
|
||||
|
||||
do {
|
||||
Wait.waitFor(() -> getMessageCount(q) >= messageCount);
|
||||
|
||||
if (getMessageCount(q) >= messageCount) {
|
||||
return;
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
}
|
||||
while (System.currentTimeMillis() - start < timeout);
|
||||
|
||||
throw new Exception();
|
||||
}
|
||||
}
|
||||
|
@ -18,19 +18,29 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.junit.Wait;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ReplicatedMultipleServerFailoverExtraBackupsTest extends ReplicatedMultipleServerFailoverTest {
|
||||
|
||||
private void waitForSync(ActiveMQServer server) throws Exception {
|
||||
Wait.waitFor(server::isReplicaSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
public void testStartLiveFirst() throws Exception {
|
||||
@ -40,8 +50,8 @@ public class ReplicatedMultipleServerFailoverExtraBackupsTest extends Replicated
|
||||
startServers(liveServers);
|
||||
backupServers.get(0).start();
|
||||
backupServers.get(1).start();
|
||||
waitForRemoteBackupSynchronization(backupServers.get(0).getServer());
|
||||
waitForRemoteBackupSynchronization(backupServers.get(1).getServer());
|
||||
waitForSync(backupServers.get(0).getServer());
|
||||
waitForSync(backupServers.get(1).getServer());
|
||||
|
||||
// wait to start the other 2 backups so the first 2 can sync with the 2 live servers
|
||||
backupServers.get(2).start();
|
||||
@ -112,10 +122,16 @@ public class ReplicatedMultipleServerFailoverExtraBackupsTest extends Replicated
|
||||
}
|
||||
}
|
||||
|
||||
CountDownLatch failoverHappened = new CountDownLatch(1);
|
||||
|
||||
session0.addFailoverListener((FailoverEventType type) -> failoverHappened.countDown());
|
||||
|
||||
for (TestableServer testableServer : toCrash) {
|
||||
testableServer.crash();
|
||||
testableServer.crash().await(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
Assert.assertTrue(failoverHappened.await(10, TimeUnit.SECONDS));
|
||||
|
||||
ClientConsumer consumer0 = session0.createConsumer(ADDRESS);
|
||||
ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
|
||||
session0.start();
|
||||
@ -125,7 +141,7 @@ public class ReplicatedMultipleServerFailoverExtraBackupsTest extends Replicated
|
||||
ClientMessage message = consumer0.receive(1000);
|
||||
assertNotNull("expecting durable msg " + i, message);
|
||||
message.acknowledge();
|
||||
consumer1.receive(1000);
|
||||
message = consumer1.receive(1000);
|
||||
assertNotNull("expecting durable msg " + i, message);
|
||||
message.acknowledge();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user