From 3b34127bb3ce6720ff794551a2877ba9794f0960 Mon Sep 17 00:00:00 2001 From: andytaylor Date: Wed, 5 Sep 2018 08:41:06 +0100 Subject: [PATCH] ARTEMIS-2075 - allow Extra backups to try to replicate more than once https://issues.apache.org/jira/browse/ARTEMIS-2075 --- .../config/ActiveMQDefaultConfiguration.java | 6 +++ .../core/config/ConfigurationUtils.java | 4 +- .../config/ha/ReplicaPolicyConfiguration.java | 10 ++++ .../ha/ReplicatedPolicyConfiguration.java | 10 ++++ .../impl/FileConfigurationParser.java | 4 ++ .../core/server/cluster/ha/ReplicaPolicy.java | 15 +++++- .../server/cluster/ha/ReplicatedPolicy.java | 13 ++++- .../AnyLiveNodeLocatorForReplication.java | 9 +++- .../NamedLiveNodeLocatorForReplication.java | 13 +++-- .../impl/SharedNothingBackupActivation.java | 2 +- .../schema/artemis-configuration.xsd | 14 +++++ .../impl/HAPolicyConfigurationTest.java | 2 + .../resources/replica-hapolicy-config.xml | 1 + .../resources/replicated-hapolicy-config.xml | 1 + .../failover/ReplicatedFailoverTest.java | 54 +++++++++++++++++++ 15 files changed, 147 insertions(+), 11 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 45397f386c..bd7ce5167f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -504,6 +504,8 @@ public final class ActiveMQDefaultConfiguration { //how long we wait for vote result, 30 secs private static int DEFAULT_QUORUM_VOTE_WAIT = 30; + private static long DEFAULT_RETRY_REPLICATION_WAIT = 2000; + public static int DEFAULT_QUORUM_SIZE = -1; public static final boolean DEFAULT_ANALYZE_CRITICAL = true; @@ -1384,4 +1386,8 @@ public final class ActiveMQDefaultConfiguration { public static int getDefaultQuorumVoteWait() { return DEFAULT_QUORUM_VOTE_WAIT; } + + public static long getDefaultRetryReplicationWait() { + return DEFAULT_RETRY_REPLICATION_WAIT; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java index 6d4661b462..a3149473a6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java @@ -73,11 +73,11 @@ public final class ConfigurationUtils { } case REPLICATED: { ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; - return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait()); + return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait()); } case REPLICA: { ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; - return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait()); + return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait()); } case SHARED_STORE_MASTER: { SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java index e02bb5603a..4bf24a7263 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java @@ -49,6 +49,8 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration { private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait(); + private long retryReplicationWait = ActiveMQDefaultConfiguration.getDefaultRetryReplicationWait(); + public ReplicaPolicyConfiguration() { } @@ -170,4 +172,12 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration { public void setQuorumVoteWait(int quorumVoteWait) { this.quorumVoteWait = quorumVoteWait; } + + public long getRetryReplicationWait() { + return retryReplicationWait; + } + + public void setRetryReplicationWait(long retryReplicationWait) { + this.retryReplicationWait = retryReplicationWait; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java index 3acad77767..162f09559c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java @@ -39,6 +39,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration { private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait(); + private Long retryReplicationWait = ActiveMQDefaultConfiguration.getDefaultRetryReplicationWait(); + public ReplicatedPolicyConfiguration() { } @@ -129,4 +131,12 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration { this.quorumVoteWait = quorumVoteWait; return this; } + + public void setRetryReplicationWait(Long retryReplicationWait) { + this.retryReplicationWait = retryReplicationWait; + } + + public Long getRetryReplicationWait() { + return retryReplicationWait; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index b1b2c0ec08..9bc292b8c3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1336,6 +1336,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO)); + configuration.setRetryReplicationWait(getLong(policyNode, "retry-replication-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO)); + configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO)); return configuration; @@ -1367,6 +1369,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO)); + configuration.setRetryReplicationWait(getLong(policyNode, "retry-replication-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO)); + configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO)); return configuration; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java index 7a4b06bc04..36e65f02e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java @@ -59,6 +59,8 @@ public class ReplicaPolicy extends BackupPolicy { private final int quorumVoteWait; + private long retryReplicationWait; + public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck, int quorumVoteWait) { this.networkHealthCheck = networkHealthCheck; this.quorumVoteWait = quorumVoteWait; @@ -84,7 +86,8 @@ public class ReplicaPolicy extends BackupPolicy { int quorumSize, int voteRetries, long voteRetryWait, - int quorumVoteWait) { + int quorumVoteWait, + long retryReplicationWait) { this.clusterName = clusterName; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.groupName = groupName; @@ -94,10 +97,12 @@ public class ReplicaPolicy extends BackupPolicy { this.quorumSize = quorumSize; this.voteRetries = voteRetries; this.voteRetryWait = voteRetryWait; + this.retryReplicationWait = retryReplicationWait; this.scaleDownPolicy = scaleDownPolicy; this.networkHealthCheck = networkHealthCheck; this.voteOnReplicationFailure = voteOnReplicationFailure; this.quorumVoteWait = quorumVoteWait; + this.retryReplicationWait = retryReplicationWait; } public ReplicaPolicy(String clusterName, @@ -247,4 +252,12 @@ public class ReplicaPolicy extends BackupPolicy { public int getQuorumVoteWait() { return quorumVoteWait; } + + public long getRetryReplicationWait() { + return retryReplicationWait; + } + + public void setretryReplicationWait(long retryReplicationWait) { + this.retryReplicationWait = retryReplicationWait; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java index e170429ac4..99b98fec52 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java @@ -54,6 +54,8 @@ public class ReplicatedPolicy implements HAPolicy { private long voteRetryWait; + private long retryReplicationWait; + /* * this are only used as the policy when the server is started as a live after a failover * */ @@ -78,7 +80,8 @@ public class ReplicatedPolicy implements HAPolicy { int quorumSize, int voteRetries, long voteRetryWait, - int quorumVoteWait) { + int quorumVoteWait, + long retryReplicationWait) { this.checkForLiveServer = checkForLiveServer; this.groupName = groupName; this.clusterName = clusterName; @@ -89,6 +92,7 @@ public class ReplicatedPolicy implements HAPolicy { this.voteRetries = voteRetries; this.voteRetryWait = voteRetryWait; this.quorumVoteWait = quorumVoteWait; + this.retryReplicationWait = retryReplicationWait; } public ReplicatedPolicy(boolean checkForLiveServer, @@ -159,6 +163,7 @@ public class ReplicatedPolicy implements HAPolicy { replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure); replicaPolicy.setVoteRetries(voteRetries); replicaPolicy.setVoteRetryWait(voteRetryWait); + replicaPolicy.setretryReplicationWait(retryReplicationWait); if (clusterName != null && clusterName.length() > 0) { replicaPolicy.setClusterName(clusterName); } @@ -241,4 +246,8 @@ public class ReplicatedPolicy implements HAPolicy { public int getQuorumVoteWait() { return quorumVoteWait; } -} \ No newline at end of file + + public long getRetryReplicationWait() { + return retryReplicationWait; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java index a446b2e701..015339aafe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -40,14 +41,16 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator { private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private final ActiveMQServerImpl server; + private final long retryReplicationWait; Map> untriedConnectors = new HashMap<>(); Map> triedConnectors = new HashMap<>(); private String nodeID; - public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server) { + public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server, long retryReplicationWait) { super(backupQuorum); this.server = server; + this.retryReplicationWait = retryReplicationWait; } @Override @@ -66,7 +69,9 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator { ConcurrentUtil.await(condition, timeout); } else { while (untriedConnectors.isEmpty()) { - condition.await(); + condition.await(retryReplicationWait, TimeUnit.MILLISECONDS); + untriedConnectors.putAll(triedConnectors); + triedConnectors.clear(); } } } catch (InterruptedException e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java index a3c50fbe12..624808d1f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.artemis.core.server.impl; +import java.util.ArrayList; import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -40,13 +42,16 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator { private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private final String backupGroupName; + private final long retryReplicationWait; private Queue> liveConfigurations = new LinkedList<>(); + private ArrayList> triedConfigurations = new ArrayList<>(); private String nodeID; - public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager) { + public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager, long retryReplicationWait) { super(quorumManager); this.backupGroupName = backupGroupName; + this.retryReplicationWait = retryReplicationWait; } @Override @@ -64,7 +69,9 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator { ConcurrentUtil.await(condition, timeout); } else { while (liveConfigurations.size() == 0) { - condition.await(); + condition.await(retryReplicationWait, TimeUnit.MILLISECONDS); + liveConfigurations.addAll(triedConfigurations); + triedConfigurations.clear(); } } } catch (InterruptedException e) { @@ -112,7 +119,7 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator { public void notifyRegistrationFailed(boolean alreadyReplicating) { try { lock.lock(); - liveConfigurations.poll(); + triedConfigurations.add(liveConfigurations.poll()); super.notifyRegistrationFailed(alreadyReplicating); } finally { lock.unlock(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 863923de06..d1f0a05cb8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -142,7 +142,7 @@ public final class SharedNothingBackupActivation extends Activation { TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT); nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup())); } else { - nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum); + nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer, replicaPolicy.getRetryReplicationWait()) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum, replicaPolicy.getRetryReplicationWait()); } ClusterController clusterController = activeMQServer.getClusterManager().getClusterController(); clusterController.addClusterTopologyListenerForReplication(nodeLocator); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 04e29316aa..685dec4dfb 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2277,6 +2277,13 @@ + + + + If we start as a replica how long to wait (in milliseconds) before trying to replicate again after failing to find a replica + + + @@ -2380,6 +2387,13 @@ + + + + How long to wait (in milliseconds) before trying to replicate again after failing to find a replica + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java index 2a236134a2..fd9b5238b5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java @@ -139,6 +139,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { assertTrue(replicatedPolicy.isCheckForLiveServer()); assertEquals(replicatedPolicy.getClusterName(), "abcdefg"); assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876); + assertEquals(replicatedPolicy.getRetryReplicationWait(), 12345); } finally { server.stop(); } @@ -161,6 +162,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { assertFalse(replicaPolicy.isRestartBackup()); assertTrue(replicaPolicy.isAllowFailback()); assertEquals(replicaPolicy.getInitialReplicationSyncTimeout(), 9876); + assertEquals(replicaPolicy.getRetryReplicationWait(), 12345); ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy(); assertNotNull(scaleDownPolicy); assertEquals(scaleDownPolicy.getGroupName(), "boo!"); diff --git a/artemis-server/src/test/resources/replica-hapolicy-config.xml b/artemis-server/src/test/resources/replica-hapolicy-config.xml index 3b2c3baa69..2ed3724205 100644 --- a/artemis-server/src/test/resources/replica-hapolicy-config.xml +++ b/artemis-server/src/test/resources/replica-hapolicy-config.xml @@ -31,6 +31,7 @@ false true 9876 + 12345 boo! diff --git a/artemis-server/src/test/resources/replicated-hapolicy-config.xml b/artemis-server/src/test/resources/replicated-hapolicy-config.xml index fb2a60243c..22274790dd 100644 --- a/artemis-server/src/test/resources/replicated-hapolicy-config.xml +++ b/artemis-server/src/test/resources/replicated-hapolicy-config.xml @@ -27,6 +27,7 @@ true abcdefg 9876 + 12345 diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java index 01a3ba64b4..c6ee8b4a73 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java @@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.component.WebServerComponent; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -35,6 +37,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy; import org.apache.activemq.artemis.dto.AppDTO; import org.apache.activemq.artemis.dto.WebServerDTO; import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -45,11 +48,13 @@ import org.junit.runner.Description; public class ReplicatedFailoverTest extends FailoverTest { boolean isReplicatedFailbackTest = false; + boolean isExtraBackupGroupNameReplicates = false; @Rule public TestRule watcher = new TestWatcher() { @Override protected void starting(Description description) { isReplicatedFailbackTest = description.getMethodName().equals("testReplicatedFailback") || description.getMethodName().equals("testLoop"); + isExtraBackupGroupNameReplicates = description.getMethodName().equals("testExtraBackupGroupNameReplicates"); } }; @@ -73,6 +78,49 @@ public class ReplicatedFailoverTest extends FailoverTest { Wait.waitFor(server::isReplicaSync); } + @Test + public void testExtraBackupReplicates() throws Exception { + Configuration secondBackupConfig = backupConfig.copy(); + TransportConfiguration tc = secondBackupConfig.getAcceptorConfigurations().iterator().next(); + TestableServer secondBackupServer = createTestableServer(secondBackupConfig); + tc.getParams().put("serverId", "2"); + secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)).setJournalDirectory(getJournalDir(1, true)).setPagingDirectory(getPageDir(1, true)).setLargeMessagesDirectory(getLargeMessagesDir(1, true)).setSecurityEnabled(false); + + waitForRemoteBackupSynchronization(backupServer.getServer()); + + secondBackupServer.start(); + Thread.sleep(5000); + backupServer.stop(); + waitForSync(secondBackupServer.getServer()); + waitForRemoteBackupSynchronization(secondBackupServer.getServer()); + + } + + @Test + public void testExtraBackupGroupNameReplicates() throws Exception { + ReplicaPolicyConfiguration backupReplicaPolicyConfiguration = (ReplicaPolicyConfiguration) backupServer.getServer().getConfiguration().getHAPolicyConfiguration(); + backupReplicaPolicyConfiguration.setGroupName("foo"); + + ReplicatedPolicyConfiguration replicatedPolicyConfiguration = (ReplicatedPolicyConfiguration) liveServer.getServer().getConfiguration().getHAPolicyConfiguration(); + replicatedPolicyConfiguration.setGroupName("foo"); + + Configuration secondBackupConfig = backupConfig.copy(); + TransportConfiguration tc = secondBackupConfig.getAcceptorConfigurations().iterator().next(); + TestableServer secondBackupServer = createTestableServer(secondBackupConfig); + tc.getParams().put("serverId", "2"); + secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)).setJournalDirectory(getJournalDir(1, true)).setPagingDirectory(getPageDir(1, true)).setLargeMessagesDirectory(getLargeMessagesDir(1, true)).setSecurityEnabled(false); + ReplicaPolicyConfiguration replicaPolicyConfiguration = (ReplicaPolicyConfiguration) secondBackupConfig.getHAPolicyConfiguration(); + replicaPolicyConfiguration.setGroupName("foo"); + waitForRemoteBackupSynchronization(backupServer.getServer()); + + secondBackupServer.start(); + Thread.sleep(5000); + backupServer.stop(); + waitForSync(secondBackupServer.getServer()); + waitForRemoteBackupSynchronization(secondBackupServer.getServer()); + + } + @Test(timeout = 120000) /* * default maxSavedReplicatedJournalsSize is 2, this means the backup will fall back to replicated only twice, after this @@ -213,6 +261,12 @@ public class ReplicatedFailoverTest extends FailoverTest { } else { super.setupHAPolicyConfiguration(); } + + if (isExtraBackupGroupNameReplicates) { + ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setGroupName("foo"); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setGroupName("foo"); + + } } @Override