ARTEMIS-2075 - allow Extra backups to try to replicate more than once

https://issues.apache.org/jira/browse/ARTEMIS-2075
This commit is contained in:
andytaylor 2018-09-05 08:41:06 +01:00
parent ec24ee4561
commit 3b34127bb3
15 changed files with 147 additions and 11 deletions

View File

@ -504,6 +504,8 @@ public final class ActiveMQDefaultConfiguration {
//how long we wait for vote result, 30 secs //how long we wait for vote result, 30 secs
private static int DEFAULT_QUORUM_VOTE_WAIT = 30; private static int DEFAULT_QUORUM_VOTE_WAIT = 30;
private static long DEFAULT_RETRY_REPLICATION_WAIT = 2000;
public static int DEFAULT_QUORUM_SIZE = -1; public static int DEFAULT_QUORUM_SIZE = -1;
public static final boolean DEFAULT_ANALYZE_CRITICAL = true; public static final boolean DEFAULT_ANALYZE_CRITICAL = true;
@ -1384,4 +1386,8 @@ public final class ActiveMQDefaultConfiguration {
public static int getDefaultQuorumVoteWait() { public static int getDefaultQuorumVoteWait() {
return DEFAULT_QUORUM_VOTE_WAIT; return DEFAULT_QUORUM_VOTE_WAIT;
} }
public static long getDefaultRetryReplicationWait() {
return DEFAULT_RETRY_REPLICATION_WAIT;
}
} }

View File

@ -73,11 +73,11 @@ public final class ConfigurationUtils {
} }
case REPLICATED: { case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait()); return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait());
} }
case REPLICA: { case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait()); return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait());
} }
case SHARED_STORE_MASTER: { case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;

View File

@ -49,6 +49,8 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait(); private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait();
private long retryReplicationWait = ActiveMQDefaultConfiguration.getDefaultRetryReplicationWait();
public ReplicaPolicyConfiguration() { public ReplicaPolicyConfiguration() {
} }
@ -170,4 +172,12 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
public void setQuorumVoteWait(int quorumVoteWait) { public void setQuorumVoteWait(int quorumVoteWait) {
this.quorumVoteWait = quorumVoteWait; this.quorumVoteWait = quorumVoteWait;
} }
public long getRetryReplicationWait() {
return retryReplicationWait;
}
public void setRetryReplicationWait(long retryReplicationWait) {
this.retryReplicationWait = retryReplicationWait;
}
} }

View File

@ -39,6 +39,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait(); private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait();
private Long retryReplicationWait = ActiveMQDefaultConfiguration.getDefaultRetryReplicationWait();
public ReplicatedPolicyConfiguration() { public ReplicatedPolicyConfiguration() {
} }
@ -129,4 +131,12 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
this.quorumVoteWait = quorumVoteWait; this.quorumVoteWait = quorumVoteWait;
return this; return this;
} }
public void setRetryReplicationWait(Long retryReplicationWait) {
this.retryReplicationWait = retryReplicationWait;
}
public Long getRetryReplicationWait() {
return retryReplicationWait;
}
} }

View File

@ -1336,6 +1336,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO)); configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setRetryReplicationWait(getLong(policyNode, "retry-replication-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO)); configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
return configuration; return configuration;
@ -1367,6 +1369,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO)); configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setRetryReplicationWait(getLong(policyNode, "retry-replication-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO)); configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
return configuration; return configuration;

View File

@ -59,6 +59,8 @@ public class ReplicaPolicy extends BackupPolicy {
private final int quorumVoteWait; private final int quorumVoteWait;
private long retryReplicationWait;
public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck, int quorumVoteWait) { public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck, int quorumVoteWait) {
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.quorumVoteWait = quorumVoteWait; this.quorumVoteWait = quorumVoteWait;
@ -84,7 +86,8 @@ public class ReplicaPolicy extends BackupPolicy {
int quorumSize, int quorumSize,
int voteRetries, int voteRetries,
long voteRetryWait, long voteRetryWait,
int quorumVoteWait) { int quorumVoteWait,
long retryReplicationWait) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName; this.groupName = groupName;
@ -94,10 +97,12 @@ public class ReplicaPolicy extends BackupPolicy {
this.quorumSize = quorumSize; this.quorumSize = quorumSize;
this.voteRetries = voteRetries; this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait; this.voteRetryWait = voteRetryWait;
this.retryReplicationWait = retryReplicationWait;
this.scaleDownPolicy = scaleDownPolicy; this.scaleDownPolicy = scaleDownPolicy;
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure; this.voteOnReplicationFailure = voteOnReplicationFailure;
this.quorumVoteWait = quorumVoteWait; this.quorumVoteWait = quorumVoteWait;
this.retryReplicationWait = retryReplicationWait;
} }
public ReplicaPolicy(String clusterName, public ReplicaPolicy(String clusterName,
@ -247,4 +252,12 @@ public class ReplicaPolicy extends BackupPolicy {
public int getQuorumVoteWait() { public int getQuorumVoteWait() {
return quorumVoteWait; return quorumVoteWait;
} }
public long getRetryReplicationWait() {
return retryReplicationWait;
}
public void setretryReplicationWait(long retryReplicationWait) {
this.retryReplicationWait = retryReplicationWait;
}
} }

View File

@ -54,6 +54,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
private long voteRetryWait; private long voteRetryWait;
private long retryReplicationWait;
/* /*
* this are only used as the policy when the server is started as a live after a failover * this are only used as the policy when the server is started as a live after a failover
* */ * */
@ -78,7 +80,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
int quorumSize, int quorumSize,
int voteRetries, int voteRetries,
long voteRetryWait, long voteRetryWait,
int quorumVoteWait) { int quorumVoteWait,
long retryReplicationWait) {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName; this.groupName = groupName;
this.clusterName = clusterName; this.clusterName = clusterName;
@ -89,6 +92,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.voteRetries = voteRetries; this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait; this.voteRetryWait = voteRetryWait;
this.quorumVoteWait = quorumVoteWait; this.quorumVoteWait = quorumVoteWait;
this.retryReplicationWait = retryReplicationWait;
} }
public ReplicatedPolicy(boolean checkForLiveServer, public ReplicatedPolicy(boolean checkForLiveServer,
@ -159,6 +163,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure); replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure);
replicaPolicy.setVoteRetries(voteRetries); replicaPolicy.setVoteRetries(voteRetries);
replicaPolicy.setVoteRetryWait(voteRetryWait); replicaPolicy.setVoteRetryWait(voteRetryWait);
replicaPolicy.setretryReplicationWait(retryReplicationWait);
if (clusterName != null && clusterName.length() > 0) { if (clusterName != null && clusterName.length() > 0) {
replicaPolicy.setClusterName(clusterName); replicaPolicy.setClusterName(clusterName);
} }
@ -241,4 +246,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public int getQuorumVoteWait() { public int getQuorumVoteWait() {
return quorumVoteWait; return quorumVoteWait;
} }
public long getRetryReplicationWait() {
return retryReplicationWait;
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -40,14 +41,16 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
private final Lock lock = new ReentrantLock(); private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition(); private final Condition condition = lock.newCondition();
private final ActiveMQServerImpl server; private final ActiveMQServerImpl server;
private final long retryReplicationWait;
Map<String, Pair<TransportConfiguration, TransportConfiguration>> untriedConnectors = new HashMap<>(); Map<String, Pair<TransportConfiguration, TransportConfiguration>> untriedConnectors = new HashMap<>();
Map<String, Pair<TransportConfiguration, TransportConfiguration>> triedConnectors = new HashMap<>(); Map<String, Pair<TransportConfiguration, TransportConfiguration>> triedConnectors = new HashMap<>();
private String nodeID; private String nodeID;
public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server) { public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server, long retryReplicationWait) {
super(backupQuorum); super(backupQuorum);
this.server = server; this.server = server;
this.retryReplicationWait = retryReplicationWait;
} }
@Override @Override
@ -66,7 +69,9 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
ConcurrentUtil.await(condition, timeout); ConcurrentUtil.await(condition, timeout);
} else { } else {
while (untriedConnectors.isEmpty()) { while (untriedConnectors.isEmpty()) {
condition.await(); condition.await(retryReplicationWait, TimeUnit.MILLISECONDS);
untriedConnectors.putAll(triedConnectors);
triedConnectors.clear();
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -16,8 +16,10 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -40,13 +42,16 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
private final Lock lock = new ReentrantLock(); private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition(); private final Condition condition = lock.newCondition();
private final String backupGroupName; private final String backupGroupName;
private final long retryReplicationWait;
private Queue<Pair<TransportConfiguration, TransportConfiguration>> liveConfigurations = new LinkedList<>(); private Queue<Pair<TransportConfiguration, TransportConfiguration>> liveConfigurations = new LinkedList<>();
private ArrayList<Pair<TransportConfiguration, TransportConfiguration>> triedConfigurations = new ArrayList<>();
private String nodeID; private String nodeID;
public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager) { public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager, long retryReplicationWait) {
super(quorumManager); super(quorumManager);
this.backupGroupName = backupGroupName; this.backupGroupName = backupGroupName;
this.retryReplicationWait = retryReplicationWait;
} }
@Override @Override
@ -64,7 +69,9 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
ConcurrentUtil.await(condition, timeout); ConcurrentUtil.await(condition, timeout);
} else { } else {
while (liveConfigurations.size() == 0) { while (liveConfigurations.size() == 0) {
condition.await(); condition.await(retryReplicationWait, TimeUnit.MILLISECONDS);
liveConfigurations.addAll(triedConfigurations);
triedConfigurations.clear();
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -112,7 +119,7 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
public void notifyRegistrationFailed(boolean alreadyReplicating) { public void notifyRegistrationFailed(boolean alreadyReplicating) {
try { try {
lock.lock(); lock.lock();
liveConfigurations.poll(); triedConfigurations.add(liveConfigurations.poll());
super.notifyRegistrationFailed(alreadyReplicating); super.notifyRegistrationFailed(alreadyReplicating);
} finally { } finally {
lock.unlock(); lock.unlock();

View File

@ -142,7 +142,7 @@ public final class SharedNothingBackupActivation extends Activation {
TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT); TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT);
nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup())); nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup()));
} else { } else {
nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum); nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer, replicaPolicy.getRetryReplicationWait()) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum, replicaPolicy.getRetryReplicationWait());
} }
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController(); ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator); clusterController.addClusterTopologyListenerForReplication(nodeLocator);

View File

@ -2277,6 +2277,13 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="retry-replication-wait" type="xsd:long" default="2000" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
If we start as a replica how long to wait (in milliseconds) before trying to replicate again after failing to find a replica
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
<xsd:attributeGroup ref="xml:specialAttrs"/> <xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType> </xsd:complexType>
@ -2380,6 +2387,13 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="retry-replication-wait" type="xsd:long" default="2000" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
How long to wait (in milliseconds) before trying to replicate again after failing to find a replica
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="quorum-vote-wait" type="xsd:integer" default="30" minOccurs="0" maxOccurs="1"> <xsd:element name="quorum-vote-wait" type="xsd:integer" default="30" minOccurs="0" maxOccurs="1">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>

View File

@ -139,6 +139,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertTrue(replicatedPolicy.isCheckForLiveServer()); assertTrue(replicatedPolicy.isCheckForLiveServer());
assertEquals(replicatedPolicy.getClusterName(), "abcdefg"); assertEquals(replicatedPolicy.getClusterName(), "abcdefg");
assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876); assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876);
assertEquals(replicatedPolicy.getRetryReplicationWait(), 12345);
} finally { } finally {
server.stop(); server.stop();
} }
@ -161,6 +162,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertFalse(replicaPolicy.isRestartBackup()); assertFalse(replicaPolicy.isRestartBackup());
assertTrue(replicaPolicy.isAllowFailback()); assertTrue(replicaPolicy.isAllowFailback());
assertEquals(replicaPolicy.getInitialReplicationSyncTimeout(), 9876); assertEquals(replicaPolicy.getInitialReplicationSyncTimeout(), 9876);
assertEquals(replicaPolicy.getRetryReplicationWait(), 12345);
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy(); ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
assertNotNull(scaleDownPolicy); assertNotNull(scaleDownPolicy);
assertEquals(scaleDownPolicy.getGroupName(), "boo!"); assertEquals(scaleDownPolicy.getGroupName(), "boo!");

View File

@ -31,6 +31,7 @@
<restart-backup>false</restart-backup> <restart-backup>false</restart-backup>
<allow-failback>true</allow-failback> <allow-failback>true</allow-failback>
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout> <initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
<retry-replication-wait>12345</retry-replication-wait>
<scale-down> <scale-down>
<!--a grouping of servers that can be scaled down to--> <!--a grouping of servers that can be scaled down to-->
<group-name>boo!</group-name> <group-name>boo!</group-name>

View File

@ -27,6 +27,7 @@
<check-for-live-server>true</check-for-live-server> <check-for-live-server>true</check-for-live-server>
<cluster-name>abcdefg</cluster-name> <cluster-name>abcdefg</cluster-name>
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout> <initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
<retry-replication-wait>12345</retry-replication-wait>
</master> </master>
</replication> </replication>
</ha-policy> </ha-policy>

View File

@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit;
import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer; import com.sun.net.httpserver.HttpServer;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.component.WebServerComponent; import org.apache.activemq.artemis.component.WebServerComponent;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -35,6 +37,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.dto.AppDTO; import org.apache.activemq.artemis.dto.AppDTO;
import org.apache.activemq.artemis.dto.WebServerDTO; import org.apache.activemq.artemis.dto.WebServerDTO;
import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -45,11 +48,13 @@ import org.junit.runner.Description;
public class ReplicatedFailoverTest extends FailoverTest { public class ReplicatedFailoverTest extends FailoverTest {
boolean isReplicatedFailbackTest = false; boolean isReplicatedFailbackTest = false;
boolean isExtraBackupGroupNameReplicates = false;
@Rule @Rule
public TestRule watcher = new TestWatcher() { public TestRule watcher = new TestWatcher() {
@Override @Override
protected void starting(Description description) { protected void starting(Description description) {
isReplicatedFailbackTest = description.getMethodName().equals("testReplicatedFailback") || description.getMethodName().equals("testLoop"); isReplicatedFailbackTest = description.getMethodName().equals("testReplicatedFailback") || description.getMethodName().equals("testLoop");
isExtraBackupGroupNameReplicates = description.getMethodName().equals("testExtraBackupGroupNameReplicates");
} }
}; };
@ -73,6 +78,49 @@ public class ReplicatedFailoverTest extends FailoverTest {
Wait.waitFor(server::isReplicaSync); Wait.waitFor(server::isReplicaSync);
} }
@Test
public void testExtraBackupReplicates() throws Exception {
Configuration secondBackupConfig = backupConfig.copy();
TransportConfiguration tc = secondBackupConfig.getAcceptorConfigurations().iterator().next();
TestableServer secondBackupServer = createTestableServer(secondBackupConfig);
tc.getParams().put("serverId", "2");
secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)).setJournalDirectory(getJournalDir(1, true)).setPagingDirectory(getPageDir(1, true)).setLargeMessagesDirectory(getLargeMessagesDir(1, true)).setSecurityEnabled(false);
waitForRemoteBackupSynchronization(backupServer.getServer());
secondBackupServer.start();
Thread.sleep(5000);
backupServer.stop();
waitForSync(secondBackupServer.getServer());
waitForRemoteBackupSynchronization(secondBackupServer.getServer());
}
@Test
public void testExtraBackupGroupNameReplicates() throws Exception {
ReplicaPolicyConfiguration backupReplicaPolicyConfiguration = (ReplicaPolicyConfiguration) backupServer.getServer().getConfiguration().getHAPolicyConfiguration();
backupReplicaPolicyConfiguration.setGroupName("foo");
ReplicatedPolicyConfiguration replicatedPolicyConfiguration = (ReplicatedPolicyConfiguration) liveServer.getServer().getConfiguration().getHAPolicyConfiguration();
replicatedPolicyConfiguration.setGroupName("foo");
Configuration secondBackupConfig = backupConfig.copy();
TransportConfiguration tc = secondBackupConfig.getAcceptorConfigurations().iterator().next();
TestableServer secondBackupServer = createTestableServer(secondBackupConfig);
tc.getParams().put("serverId", "2");
secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)).setJournalDirectory(getJournalDir(1, true)).setPagingDirectory(getPageDir(1, true)).setLargeMessagesDirectory(getLargeMessagesDir(1, true)).setSecurityEnabled(false);
ReplicaPolicyConfiguration replicaPolicyConfiguration = (ReplicaPolicyConfiguration) secondBackupConfig.getHAPolicyConfiguration();
replicaPolicyConfiguration.setGroupName("foo");
waitForRemoteBackupSynchronization(backupServer.getServer());
secondBackupServer.start();
Thread.sleep(5000);
backupServer.stop();
waitForSync(secondBackupServer.getServer());
waitForRemoteBackupSynchronization(secondBackupServer.getServer());
}
@Test(timeout = 120000) @Test(timeout = 120000)
/* /*
* default maxSavedReplicatedJournalsSize is 2, this means the backup will fall back to replicated only twice, after this * default maxSavedReplicatedJournalsSize is 2, this means the backup will fall back to replicated only twice, after this
@ -213,6 +261,12 @@ public class ReplicatedFailoverTest extends FailoverTest {
} else { } else {
super.setupHAPolicyConfiguration(); super.setupHAPolicyConfiguration();
} }
if (isExtraBackupGroupNameReplicates) {
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setGroupName("foo");
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setGroupName("foo");
}
} }
@Override @Override