This commit is contained in:
Clebert Suconic 2018-09-13 09:19:08 -04:00
commit bf0eede933
15 changed files with 147 additions and 11 deletions

View File

@ -504,6 +504,8 @@ public final class ActiveMQDefaultConfiguration {
//how long we wait for vote result, 30 secs
private static int DEFAULT_QUORUM_VOTE_WAIT = 30;
private static long DEFAULT_RETRY_REPLICATION_WAIT = 2000;
public static int DEFAULT_QUORUM_SIZE = -1;
public static final boolean DEFAULT_ANALYZE_CRITICAL = true;
@ -1384,4 +1386,8 @@ public final class ActiveMQDefaultConfiguration {
public static int getDefaultQuorumVoteWait() {
return DEFAULT_QUORUM_VOTE_WAIT;
}
public static long getDefaultRetryReplicationWait() {
return DEFAULT_RETRY_REPLICATION_WAIT;
}
}

View File

@ -73,11 +73,11 @@ public final class ConfigurationUtils {
}
case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait());
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait());
}
case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait());
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait());
}
case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;

View File

@ -49,6 +49,8 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait();
private long retryReplicationWait = ActiveMQDefaultConfiguration.getDefaultRetryReplicationWait();
public ReplicaPolicyConfiguration() {
}
@ -170,4 +172,12 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
public void setQuorumVoteWait(int quorumVoteWait) {
this.quorumVoteWait = quorumVoteWait;
}
public long getRetryReplicationWait() {
return retryReplicationWait;
}
public void setRetryReplicationWait(long retryReplicationWait) {
this.retryReplicationWait = retryReplicationWait;
}
}

View File

@ -39,6 +39,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
private int quorumVoteWait = ActiveMQDefaultConfiguration.getDefaultQuorumVoteWait();
private Long retryReplicationWait = ActiveMQDefaultConfiguration.getDefaultRetryReplicationWait();
public ReplicatedPolicyConfiguration() {
}
@ -129,4 +131,12 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
this.quorumVoteWait = quorumVoteWait;
return this;
}
public void setRetryReplicationWait(Long retryReplicationWait) {
this.retryReplicationWait = retryReplicationWait;
}
public Long getRetryReplicationWait() {
return retryReplicationWait;
}
}

View File

@ -1336,6 +1336,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setRetryReplicationWait(getLong(policyNode, "retry-replication-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
return configuration;
@ -1367,6 +1369,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setVoteRetryWait(getLong(policyNode, "vote-retry-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setRetryReplicationWait(getLong(policyNode, "retry-replication-wait", configuration.getVoteRetryWait(), Validators.GT_ZERO));
configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(), Validators.MINUS_ONE_OR_GT_ZERO));
return configuration;

View File

@ -59,6 +59,8 @@ public class ReplicaPolicy extends BackupPolicy {
private final int quorumVoteWait;
private long retryReplicationWait;
public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck, int quorumVoteWait) {
this.networkHealthCheck = networkHealthCheck;
this.quorumVoteWait = quorumVoteWait;
@ -84,7 +86,8 @@ public class ReplicaPolicy extends BackupPolicy {
int quorumSize,
int voteRetries,
long voteRetryWait,
int quorumVoteWait) {
int quorumVoteWait,
long retryReplicationWait) {
this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName;
@ -94,10 +97,12 @@ public class ReplicaPolicy extends BackupPolicy {
this.quorumSize = quorumSize;
this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait;
this.retryReplicationWait = retryReplicationWait;
this.scaleDownPolicy = scaleDownPolicy;
this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure;
this.quorumVoteWait = quorumVoteWait;
this.retryReplicationWait = retryReplicationWait;
}
public ReplicaPolicy(String clusterName,
@ -247,4 +252,12 @@ public class ReplicaPolicy extends BackupPolicy {
public int getQuorumVoteWait() {
return quorumVoteWait;
}
public long getRetryReplicationWait() {
return retryReplicationWait;
}
public void setretryReplicationWait(long retryReplicationWait) {
this.retryReplicationWait = retryReplicationWait;
}
}

View File

@ -54,6 +54,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
private long voteRetryWait;
private long retryReplicationWait;
/*
* this are only used as the policy when the server is started as a live after a failover
* */
@ -78,7 +80,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
int quorumSize,
int voteRetries,
long voteRetryWait,
int quorumVoteWait) {
int quorumVoteWait,
long retryReplicationWait) {
this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName;
this.clusterName = clusterName;
@ -89,6 +92,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.voteRetries = voteRetries;
this.voteRetryWait = voteRetryWait;
this.quorumVoteWait = quorumVoteWait;
this.retryReplicationWait = retryReplicationWait;
}
public ReplicatedPolicy(boolean checkForLiveServer,
@ -159,6 +163,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure);
replicaPolicy.setVoteRetries(voteRetries);
replicaPolicy.setVoteRetryWait(voteRetryWait);
replicaPolicy.setretryReplicationWait(retryReplicationWait);
if (clusterName != null && clusterName.length() > 0) {
replicaPolicy.setClusterName(clusterName);
}
@ -241,4 +246,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public int getQuorumVoteWait() {
return quorumVoteWait;
}
public long getRetryReplicationWait() {
return retryReplicationWait;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -40,14 +41,16 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private final ActiveMQServerImpl server;
private final long retryReplicationWait;
Map<String, Pair<TransportConfiguration, TransportConfiguration>> untriedConnectors = new HashMap<>();
Map<String, Pair<TransportConfiguration, TransportConfiguration>> triedConnectors = new HashMap<>();
private String nodeID;
public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server) {
public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server, long retryReplicationWait) {
super(backupQuorum);
this.server = server;
this.retryReplicationWait = retryReplicationWait;
}
@Override
@ -66,7 +69,9 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
ConcurrentUtil.await(condition, timeout);
} else {
while (untriedConnectors.isEmpty()) {
condition.await();
condition.await(retryReplicationWait, TimeUnit.MILLISECONDS);
untriedConnectors.putAll(triedConnectors);
triedConnectors.clear();
}
}
} catch (InterruptedException e) {

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -40,13 +42,16 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private final String backupGroupName;
private final long retryReplicationWait;
private Queue<Pair<TransportConfiguration, TransportConfiguration>> liveConfigurations = new LinkedList<>();
private ArrayList<Pair<TransportConfiguration, TransportConfiguration>> triedConfigurations = new ArrayList<>();
private String nodeID;
public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager) {
public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager, long retryReplicationWait) {
super(quorumManager);
this.backupGroupName = backupGroupName;
this.retryReplicationWait = retryReplicationWait;
}
@Override
@ -64,7 +69,9 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
ConcurrentUtil.await(condition, timeout);
} else {
while (liveConfigurations.size() == 0) {
condition.await();
condition.await(retryReplicationWait, TimeUnit.MILLISECONDS);
liveConfigurations.addAll(triedConfigurations);
triedConfigurations.clear();
}
}
} catch (InterruptedException e) {
@ -112,7 +119,7 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
public void notifyRegistrationFailed(boolean alreadyReplicating) {
try {
lock.lock();
liveConfigurations.poll();
triedConfigurations.add(liveConfigurations.poll());
super.notifyRegistrationFailed(alreadyReplicating);
} finally {
lock.unlock();

View File

@ -142,7 +142,7 @@ public final class SharedNothingBackupActivation extends Activation {
TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT);
nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup()));
} else {
nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum);
nodeLocator = replicaPolicy.getGroupName() == null ? new AnyLiveNodeLocatorForReplication(backupQuorum, activeMQServer, replicaPolicy.getRetryReplicationWait()) : new NamedLiveNodeLocatorForReplication(replicaPolicy.getGroupName(), backupQuorum, replicaPolicy.getRetryReplicationWait());
}
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator);

View File

@ -2277,6 +2277,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="retry-replication-wait" type="xsd:long" default="2000" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
If we start as a replica how long to wait (in milliseconds) before trying to replicate again after failing to find a replica
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
@ -2380,6 +2387,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="retry-replication-wait" type="xsd:long" default="2000" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
How long to wait (in milliseconds) before trying to replicate again after failing to find a replica
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="quorum-vote-wait" type="xsd:integer" default="30" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>

View File

@ -139,6 +139,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertTrue(replicatedPolicy.isCheckForLiveServer());
assertEquals(replicatedPolicy.getClusterName(), "abcdefg");
assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876);
assertEquals(replicatedPolicy.getRetryReplicationWait(), 12345);
} finally {
server.stop();
}
@ -161,6 +162,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertFalse(replicaPolicy.isRestartBackup());
assertTrue(replicaPolicy.isAllowFailback());
assertEquals(replicaPolicy.getInitialReplicationSyncTimeout(), 9876);
assertEquals(replicaPolicy.getRetryReplicationWait(), 12345);
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
assertNotNull(scaleDownPolicy);
assertEquals(scaleDownPolicy.getGroupName(), "boo!");

View File

@ -31,6 +31,7 @@
<restart-backup>false</restart-backup>
<allow-failback>true</allow-failback>
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
<retry-replication-wait>12345</retry-replication-wait>
<scale-down>
<!--a grouping of servers that can be scaled down to-->
<group-name>boo!</group-name>

View File

@ -27,6 +27,7 @@
<check-for-live-server>true</check-for-live-server>
<cluster-name>abcdefg</cluster-name>
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
<retry-replication-wait>12345</retry-replication-wait>
</master>
</replication>
</ha-policy>

View File

@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.component.WebServerComponent;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -35,6 +37,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.dto.AppDTO;
import org.apache.activemq.artemis.dto.WebServerDTO;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -45,11 +48,13 @@ import org.junit.runner.Description;
public class ReplicatedFailoverTest extends FailoverTest {
boolean isReplicatedFailbackTest = false;
boolean isExtraBackupGroupNameReplicates = false;
@Rule
public TestRule watcher = new TestWatcher() {
@Override
protected void starting(Description description) {
isReplicatedFailbackTest = description.getMethodName().equals("testReplicatedFailback") || description.getMethodName().equals("testLoop");
isExtraBackupGroupNameReplicates = description.getMethodName().equals("testExtraBackupGroupNameReplicates");
}
};
@ -73,6 +78,49 @@ public class ReplicatedFailoverTest extends FailoverTest {
Wait.waitFor(server::isReplicaSync);
}
@Test
public void testExtraBackupReplicates() throws Exception {
Configuration secondBackupConfig = backupConfig.copy();
TransportConfiguration tc = secondBackupConfig.getAcceptorConfigurations().iterator().next();
TestableServer secondBackupServer = createTestableServer(secondBackupConfig);
tc.getParams().put("serverId", "2");
secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)).setJournalDirectory(getJournalDir(1, true)).setPagingDirectory(getPageDir(1, true)).setLargeMessagesDirectory(getLargeMessagesDir(1, true)).setSecurityEnabled(false);
waitForRemoteBackupSynchronization(backupServer.getServer());
secondBackupServer.start();
Thread.sleep(5000);
backupServer.stop();
waitForSync(secondBackupServer.getServer());
waitForRemoteBackupSynchronization(secondBackupServer.getServer());
}
@Test
public void testExtraBackupGroupNameReplicates() throws Exception {
ReplicaPolicyConfiguration backupReplicaPolicyConfiguration = (ReplicaPolicyConfiguration) backupServer.getServer().getConfiguration().getHAPolicyConfiguration();
backupReplicaPolicyConfiguration.setGroupName("foo");
ReplicatedPolicyConfiguration replicatedPolicyConfiguration = (ReplicatedPolicyConfiguration) liveServer.getServer().getConfiguration().getHAPolicyConfiguration();
replicatedPolicyConfiguration.setGroupName("foo");
Configuration secondBackupConfig = backupConfig.copy();
TransportConfiguration tc = secondBackupConfig.getAcceptorConfigurations().iterator().next();
TestableServer secondBackupServer = createTestableServer(secondBackupConfig);
tc.getParams().put("serverId", "2");
secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)).setJournalDirectory(getJournalDir(1, true)).setPagingDirectory(getPageDir(1, true)).setLargeMessagesDirectory(getLargeMessagesDir(1, true)).setSecurityEnabled(false);
ReplicaPolicyConfiguration replicaPolicyConfiguration = (ReplicaPolicyConfiguration) secondBackupConfig.getHAPolicyConfiguration();
replicaPolicyConfiguration.setGroupName("foo");
waitForRemoteBackupSynchronization(backupServer.getServer());
secondBackupServer.start();
Thread.sleep(5000);
backupServer.stop();
waitForSync(secondBackupServer.getServer());
waitForRemoteBackupSynchronization(secondBackupServer.getServer());
}
@Test(timeout = 120000)
/*
* default maxSavedReplicatedJournalsSize is 2, this means the backup will fall back to replicated only twice, after this
@ -213,6 +261,12 @@ public class ReplicatedFailoverTest extends FailoverTest {
} else {
super.setupHAPolicyConfiguration();
}
if (isExtraBackupGroupNameReplicates) {
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setGroupName("foo");
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setGroupName("foo");
}
}
@Override