ARTEMIS-4129 Add max-saved-replicated-journals-size parameter to primary and replicated policy configuration

Allow override default max-saved-replicated-journals-size value when the server is configured as primary or replicated.
This commit is contained in:
iliya 2023-01-16 14:09:33 +03:00 committed by Justin Bertram
parent 9ff5c0f426
commit e68034518d
13 changed files with 75 additions and 17 deletions

View File

@ -77,7 +77,7 @@ public final class ConfigurationUtils {
} }
case REPLICATED: { case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait()); return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(), pc.getQuorumSize(), pc.getVoteRetries(), pc.getVoteRetryWait(), pc.getQuorumVoteWait(), pc.getRetryReplicationWait());
} }
case REPLICA: { case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;

View File

@ -27,6 +27,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
private String clusterName = null; private String clusterName = null;
private int maxSavedReplicatedJournalsSize = ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
private boolean voteOnReplicationFailure = ActiveMQDefaultConfiguration.getDefaultVoteOnReplicationFailure(); private boolean voteOnReplicationFailure = ActiveMQDefaultConfiguration.getDefaultVoteOnReplicationFailure();
@ -77,6 +79,15 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
return this; return this;
} }
public int getMaxSavedReplicatedJournalsSize() {
return maxSavedReplicatedJournalsSize;
}
public ReplicatedPolicyConfiguration setMaxSavedReplicatedJournalsSize(int maxSavedReplicatedJournalsSize) {
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
return this;
}
public long getInitialReplicationSyncTimeout() { public long getInitialReplicationSyncTimeout() {
return initialReplicationSyncTimeout; return initialReplicationSyncTimeout;
} }

View File

@ -33,6 +33,8 @@ public class ReplicationPrimaryPolicyConfiguration implements HAPolicyConfigurat
private String coordinationId = null; private String coordinationId = null;
private int maxSavedReplicatedJournalsSize = ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();
public static ReplicationPrimaryPolicyConfiguration withDefault() { public static ReplicationPrimaryPolicyConfiguration withDefault() {
return new ReplicationPrimaryPolicyConfiguration(); return new ReplicationPrimaryPolicyConfiguration();
} }
@ -111,4 +113,13 @@ public class ReplicationPrimaryPolicyConfiguration implements HAPolicyConfigurat
this.coordinationId = this.coordinationId.replace('-', '.'); this.coordinationId = this.coordinationId.replace('-', '.');
} }
} }
public int getMaxSavedReplicatedJournalsSize() {
return maxSavedReplicatedJournalsSize;
}
public ReplicationPrimaryPolicyConfiguration setMaxSavedReplicatedJournalsSize(int maxSavedReplicatedJournalsSize) {
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
return this;
}
} }

View File

@ -1718,6 +1718,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK)); configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK));
configuration.setMaxSavedReplicatedJournalsSize(getInteger(policyNode, "max-saved-replicated-journals-size", configuration.getMaxSavedReplicatedJournalsSize(), Validators.MINUS_ONE_OR_GE_ZERO));
configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO)); configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO));
configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure())); configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure", configuration.getVoteOnReplicationFailure()));
@ -1781,6 +1783,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setCoordinationId(getString(policyNode, "coordination-id", configuration.getCoordinationId(), Validators.NOT_NULL_OR_EMPTY)); configuration.setCoordinationId(getString(policyNode, "coordination-id", configuration.getCoordinationId(), Validators.NOT_NULL_OR_EMPTY));
configuration.setMaxSavedReplicatedJournalsSize(getInteger(policyNode, "max-saved-replicated-journals-size", configuration.getMaxSavedReplicatedJournalsSize(), Validators.MINUS_ONE_OR_GE_ZERO));
return configuration; return configuration;
} }

View File

@ -33,6 +33,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
private String clusterName; private String clusterName;
private int maxSavedReplicatedJournalsSize = ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
/* /*
@ -75,6 +77,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public ReplicatedPolicy(boolean checkForLiveServer, public ReplicatedPolicy(boolean checkForLiveServer,
String groupName, String groupName,
String clusterName, String clusterName,
int maxSavedReplicatedJournalsSize,
long initialReplicationSyncTimeout, long initialReplicationSyncTimeout,
NetworkHealthCheck networkHealthCheck, NetworkHealthCheck networkHealthCheck,
boolean voteOnReplicationFailure, boolean voteOnReplicationFailure,
@ -86,6 +89,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName; this.groupName = groupName;
this.clusterName = clusterName; this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
this.voteOnReplicationFailure = voteOnReplicationFailure; this.voteOnReplicationFailure = voteOnReplicationFailure;
@ -165,6 +169,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
replicaPolicy.setVoteRetries(voteRetries); replicaPolicy.setVoteRetries(voteRetries);
replicaPolicy.setVoteRetryWait(voteRetryWait); replicaPolicy.setVoteRetryWait(voteRetryWait);
replicaPolicy.setretryReplicationWait(retryReplicationWait); replicaPolicy.setretryReplicationWait(retryReplicationWait);
replicaPolicy.setMaxSavedReplicatedJournalsSize(maxSavedReplicatedJournalsSize);
if (clusterName != null && clusterName.length() > 0) { if (clusterName != null && clusterName.length() > 0) {
replicaPolicy.setClusterName(clusterName); replicaPolicy.setClusterName(clusterName);
} }
@ -251,4 +256,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public long getRetryReplicationWait() { public long getRetryReplicationWait() {
return retryReplicationWait; return retryReplicationWait;
} }
public int getMaxSavedReplicatedJournalsSize() {
return maxSavedReplicatedJournalsSize;
}
} }

View File

@ -79,12 +79,14 @@ public class ReplicationBackupPolicy implements HAPolicy<ReplicationBackupActiva
* It creates a companion backup policy for a natural-born primary: it would cause the broker to try failback. * It creates a companion backup policy for a natural-born primary: it would cause the broker to try failback.
*/ */
static ReplicationBackupPolicy failback(long retryReplicationWait, static ReplicationBackupPolicy failback(long retryReplicationWait,
int maxSavedReplicatedJournalsSize,
String clusterName, String clusterName,
String groupName, String groupName,
ReplicationPrimaryPolicy livePolicy, ReplicationPrimaryPolicy livePolicy,
DistributedPrimitiveManagerConfiguration distributedManagerConfiguration) { DistributedPrimitiveManagerConfiguration distributedManagerConfiguration) {
return new ReplicationBackupPolicy(ReplicationBackupPolicyConfiguration.withDefault() return new ReplicationBackupPolicy(ReplicationBackupPolicyConfiguration.withDefault()
.setRetryReplicationWait(retryReplicationWait) .setRetryReplicationWait(retryReplicationWait)
.setMaxSavedReplicatedJournalsSize(maxSavedReplicatedJournalsSize)
.setClusterName(clusterName) .setClusterName(clusterName)
.setGroupName(groupName) .setGroupName(groupName)
.setDistributedManagerConfiguration(distributedManagerConfiguration), .setDistributedManagerConfiguration(distributedManagerConfiguration),

View File

@ -56,7 +56,7 @@ public class ReplicationPrimaryPolicy implements HAPolicy<ReplicationPrimaryActi
initialReplicationSyncTimeout = config.getInitialReplicationSyncTimeout(); initialReplicationSyncTimeout = config.getInitialReplicationSyncTimeout();
distributedManagerConfiguration = config.getDistributedManagerConfiguration(); distributedManagerConfiguration = config.getDistributedManagerConfiguration();
this.allowAutoFailBack = false; this.allowAutoFailBack = false;
backupPolicy = ReplicationBackupPolicy.failback(config.getRetryReplicationWait(), config.getClusterName(), backupPolicy = ReplicationBackupPolicy.failback(config.getRetryReplicationWait(), config.getMaxSavedReplicatedJournalsSize(), config.getClusterName(),
config.getGroupName(), this, config.getGroupName(), this,
config.getDistributedManagerConfiguration()); config.getDistributedManagerConfiguration());
} }

View File

@ -3252,6 +3252,15 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="max-saved-replicated-journals-size" type="xsd:int" default="2" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
This option specifies how many replication backup directories will be kept when server starts as replica.
Every time when server starts as replica all former data moves to 'oldreplica.{id}' directory, where id is growing backup index,
this parameter sets the maximum number of such directories kept on disk.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="check-for-live-server" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0"> <xsd:element name="check-for-live-server" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
@ -3340,9 +3349,9 @@
<xsd:element name="max-saved-replicated-journals-size" type="xsd:int" default="2" maxOccurs="1" minOccurs="0"> <xsd:element name="max-saved-replicated-journals-size" type="xsd:int" default="2" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
This specifies how many times a replicated backup server can restart after moving its files on start. This option specifies how many replication backup directories will be kept when server starts as replica.
Once there are this number of backup journal files the server will stop permanently after if fails Every time when server starts as replica all former data moves to 'oldreplica.{id}' directory, where id is growing backup index,
back. this parameter sets the maximum number of such directories kept on disk.
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
@ -3486,6 +3495,15 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="max-saved-replicated-journals-size" type="xsd:int" default="2" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
This option specifies how many replication backup directories will be kept when server starts as replica.
Every time when server starts as replica all former data moves to 'oldreplica.{id}' directory, where id is growing backup index,
this parameter sets the maximum number of such directories kept on disk.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
<xsd:attributeGroup ref="xml:specialAttrs"/> <xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType> </xsd:complexType>
@ -3518,9 +3536,9 @@
<xsd:element name="max-saved-replicated-journals-size" type="xsd:int" default="2" maxOccurs="1" minOccurs="0"> <xsd:element name="max-saved-replicated-journals-size" type="xsd:int" default="2" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
This specifies how many times a replicated backup server can restart after moving its files on start. This option specifies how many replication backup directories will be kept when server starts as replica.
Once there are this number of backup journal files the server will stop permanently after if fails Every time when server starts as replica all former data moves to 'oldreplica.{id}' directory, where id is growing backup index,
back. this parameter sets the maximum number of such directories kept on disk.
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
@ -3576,9 +3594,9 @@
<xsd:element name="max-saved-replicated-journals-size" type="xsd:int" default="2" maxOccurs="1" minOccurs="0"> <xsd:element name="max-saved-replicated-journals-size" type="xsd:int" default="2" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
This specifies how many times a replicated backup server can restart after moving its files on start. This option specifies how many replication backup directories will be kept when server starts as replica.
Once there are this number of backup journal files the server will stop permanently after if fails Every time when server starts as replica all former data moves to 'oldreplica.{id}' directory, where id is growing backup index,
back. this parameter sets the maximum number of such directories kept on disk.
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>

View File

@ -21,7 +21,6 @@ import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
@ -317,7 +316,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertEquals(policy.getGroupName(), failbackPolicy.getGroupName()); assertEquals(policy.getGroupName(), failbackPolicy.getGroupName());
assertEquals(policy.getBackupGroupName(), failbackPolicy.getBackupGroupName()); assertEquals(policy.getBackupGroupName(), failbackPolicy.getBackupGroupName());
assertEquals(policy.getClusterName(), failbackPolicy.getClusterName()); assertEquals(policy.getClusterName(), failbackPolicy.getClusterName());
assertEquals(failbackPolicy.getMaxSavedReplicatedJournalsSize(), ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize()); assertEquals(73, failbackPolicy.getMaxSavedReplicatedJournalsSize());
assertTrue(failbackPolicy.isTryFailback()); assertTrue(failbackPolicy.isTryFailback());
assertTrue(failbackPolicy.isBackup()); assertTrue(failbackPolicy.isBackup());
assertFalse(failbackPolicy.isSharedStore()); assertFalse(failbackPolicy.isSharedStore());
@ -413,6 +412,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertEquals(replicatedPolicy.getClusterName(), "abcdefg"); assertEquals(replicatedPolicy.getClusterName(), "abcdefg");
assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876); assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876);
assertEquals(replicatedPolicy.getRetryReplicationWait(), 12345); assertEquals(replicatedPolicy.getRetryReplicationWait(), 12345);
assertEquals(replicatedPolicy.getMaxSavedReplicatedJournalsSize(), 73);
} finally { } finally {
server.stop(); server.stop();
} }

View File

@ -27,6 +27,7 @@
<cluster-name>abcdefg</cluster-name> <cluster-name>abcdefg</cluster-name>
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout> <initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
<retry-replication-wait>12345</retry-replication-wait> <retry-replication-wait>12345</retry-replication-wait>
<max-saved-replicated-journals-size>73</max-saved-replicated-journals-size>
<manager> <manager>
<class-name> <class-name>
org.apache.activemq.artemis.core.config.impl.HAPolicyConfigurationTest$FakeDistributedPrimitiveManager org.apache.activemq.artemis.core.config.impl.HAPolicyConfigurationTest$FakeDistributedPrimitiveManager

View File

@ -28,6 +28,7 @@
<cluster-name>abcdefg</cluster-name> <cluster-name>abcdefg</cluster-name>
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout> <initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
<retry-replication-wait>12345</retry-replication-wait> <retry-replication-wait>12345</retry-replication-wait>
<max-saved-replicated-journals-size>73</max-saved-replicated-journals-size>
</master> </master>
</replication> </replication>
</ha-policy> </ha-policy>

View File

@ -191,7 +191,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t
[management-address](management.md#configuring-management)| the name of the management address to send management messages to. | `activemq.management` [management-address](management.md#configuring-management)| the name of the management address to send management messages to. | `activemq.management`
[management-notification-address](management.md#configuring-the-management-notification-address) | the name of the address that consumers bind to receive management notifications. | `activemq.notifications` [management-notification-address](management.md#configuring-the-management-notification-address) | the name of the address that consumers bind to receive management notifications. | `activemq.notifications`
[mask-password](masking-passwords.md) | This option controls whether passwords in server configuration need be masked. If set to "true" the passwords are masked. | `false` [mask-password](masking-passwords.md) | This option controls whether passwords in server configuration need be masked. If set to "true" the passwords are masked. | `false`
[max-saved-replicated-journals-size](ha.md#data-replication) | This specifies how many times a replicated backup server can restart after moving its files on start. Once there are this number of backup journal files the server will stop permanently after if fails back. -1 Means no Limit; 0 don't keep a copy at all. | 2 [max-saved-replicated-journals-size](ha.md#data-replication) | This specifies how many replication backup directories will be kept when server starts as replica. -1 Means no Limit; 0 don't keep a copy at all. | 2
[max-disk-usage](paging.md#max-disk-usage) | The max percentage of data we should use from disks. The broker will block while the disk is full. Disable by setting -1. | 90 [max-disk-usage](paging.md#max-disk-usage) | The max percentage of data we should use from disks. The broker will block while the disk is full. Disable by setting -1. | 90
[memory-measure-interval](perf-tuning.md) | frequency to sample JVM memory in ms (or -1 to disable memory sampling). | -1 [memory-measure-interval](perf-tuning.md) | frequency to sample JVM memory in ms (or -1 to disable memory sampling). | -1
[memory-warning-threshold](perf-tuning.md)| Percentage of available memory which will trigger a warning log. | 25 [memory-warning-threshold](perf-tuning.md)| Percentage of available memory which will trigger a warning log. | 25

View File

@ -350,9 +350,10 @@ group-name
- `max-saved-replicated-journals-size` - `max-saved-replicated-journals-size`
This specifies how many times a replicated backup server can restart This option specifies how many replication backup directories will be kept
after moving its files on start. Once there are this number of backup when server starts as replica. Every time when server starts as replica all
journal files the server will stop permanently after if fails back. former data moves to 'oldreplica.{id}' directory, where id is growing backup index,
this parameter sets the maximum number of such directories kept on disk.
- `allow-failback` - `allow-failback`