ARTEMIS-256 orchestrate failback deterministically
The failback process needs to be deterministic rather than relying on various incarnations of Thread.sleep() at crucial points. Important aspects of this change include: 1) Make the initial replication synchronization process block at the very last step and wait for a response from the replica to ensure the replica has as the necessary data. This is a critical piece of knowledge during the failback process because it allows the soon-to-become-backup server to know for sure when it can shut itself down and allow the soon-to-become-live server to take over. Also, introduce a new configuration element called "initial-replication-sync-timeout" to conrol how long this blocking will occur. 2) Set the state of the server as 'LIVE' only after the server is fully started. This is necessary because once the soon-to-be-backup server shuts down it needs to know that the soon-to-be-live server has started fully before it restarts itself as the new backup. If the soon-to-be-backup server restarts before the soon-to-be-live is fully started then it won't actually become a backup server but instead will become a live server which will break the failback process. 3) Wait to receive the announcement of a backup server before failing-back.
This commit is contained in:
parent
30b3463748
commit
ef5a9809f2
|
@ -374,8 +374,8 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// Whether a server will automatically stop when another places a request to take over its place. The use case is when a regular server stops and its backup takes over its duties, later the main server restarts and requests the server (the former backup) to stop operating.
|
||||
private static boolean DEFAULT_ALLOW_AUTO_FAILBACK = true;
|
||||
|
||||
// if we have to start as a replicated server this is the delay to wait before fail-back occurs
|
||||
private static long DEFAULT_FAILBACK_DELAY = 5000;
|
||||
// When a replica comes online this is how long the replicating server will wait for a confirmation from the replica that the replication synchronization process is complete
|
||||
private static long DEFAULT_INITIAL_REPLICATION_SYNC_TIMEOUT = 30000;
|
||||
|
||||
// Will this backup server come live on a normal server shutdown
|
||||
private static boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false;
|
||||
|
@ -987,8 +987,8 @@ public final class ActiveMQDefaultConfiguration {
|
|||
/**
|
||||
* if we have to start as a replicated server this is the delay to wait before fail-back occurs
|
||||
*/
|
||||
public static long getDefaultFailbackDelay() {
|
||||
return DEFAULT_FAILBACK_DELAY;
|
||||
public static long getDefaultInitialReplicationSyncTimeout() {
|
||||
return DEFAULT_INITIAL_REPLICATION_SYNC_TIMEOUT;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -243,6 +243,8 @@ public class PacketImpl implements Packet {
|
|||
|
||||
public static final byte SESS_BINDINGQUERY_RESP_V2 = -8;
|
||||
|
||||
public static final byte REPLICATION_RESPONSE_V2 = -9;
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
public PacketImpl(final byte type) {
|
||||
|
|
|
@ -64,19 +64,19 @@ public final class ConfigurationUtils {
|
|||
}
|
||||
case REPLICATED: {
|
||||
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
|
||||
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName());
|
||||
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout());
|
||||
}
|
||||
case REPLICA: {
|
||||
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
|
||||
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getFailbackDelay(), getScaleDownPolicy(pc.getScaleDownConfiguration()));
|
||||
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()));
|
||||
}
|
||||
case SHARED_STORE_MASTER: {
|
||||
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;
|
||||
return new SharedStoreMasterPolicy(pc.getFailbackDelay(), pc.isFailoverOnServerShutdown());
|
||||
return new SharedStoreMasterPolicy(pc.isFailoverOnServerShutdown());
|
||||
}
|
||||
case SHARED_STORE_SLAVE: {
|
||||
SharedStoreSlavePolicyConfiguration pc = (SharedStoreSlavePolicyConfiguration) conf;
|
||||
return new SharedStoreSlavePolicy(pc.getFailbackDelay(), pc.isFailoverOnServerShutdown(), pc.isRestartBackup(), pc.isAllowFailBack(), getScaleDownPolicy(pc.getScaleDownConfiguration()));
|
||||
return new SharedStoreSlavePolicy(pc.isFailoverOnServerShutdown(), pc.isRestartBackup(), pc.isAllowFailBack(), getScaleDownPolicy(pc.getScaleDownConfiguration()));
|
||||
}
|
||||
case COLOCATED: {
|
||||
ColocatedPolicyConfiguration pc = (ColocatedPolicyConfiguration) conf;
|
||||
|
|
|
@ -37,7 +37,7 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
|
|||
* */
|
||||
private boolean allowFailBack = false;
|
||||
|
||||
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
|
||||
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
|
||||
|
||||
public ReplicaPolicyConfiguration() {
|
||||
}
|
||||
|
@ -101,12 +101,22 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public ReplicaPolicyConfiguration setFailbackDelay(long failbackDelay) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getFailbackDelay() {
|
||||
return failbackDelay;
|
||||
return -1;
|
||||
}
|
||||
|
||||
public long getInitialReplicationSyncTimeout() {
|
||||
return initialReplicationSyncTimeout;
|
||||
}
|
||||
|
||||
public ReplicaPolicyConfiguration setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
|
||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
|
|||
|
||||
private String clusterName = null;
|
||||
|
||||
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
|
||||
|
||||
public ReplicatedPolicyConfiguration() {
|
||||
}
|
||||
|
||||
|
@ -61,4 +63,12 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
|
|||
this.clusterName = clusterName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getInitialReplicationSyncTimeout() {
|
||||
return initialReplicationSyncTimeout;
|
||||
}
|
||||
|
||||
public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
|
||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,6 @@ import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
|
|||
|
||||
public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfiguration {
|
||||
|
||||
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
|
||||
|
||||
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
|
||||
|
||||
public SharedStoreMasterPolicyConfiguration() {
|
||||
|
@ -33,12 +31,13 @@ public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfigurati
|
|||
return TYPE.SHARED_STORE_MASTER;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getFailbackDelay() {
|
||||
return failbackDelay;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public SharedStoreMasterPolicyConfiguration setFailbackDelay(long failbackDelay) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,8 +22,6 @@ import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
|
|||
|
||||
public class SharedStoreSlavePolicyConfiguration implements HAPolicyConfiguration {
|
||||
|
||||
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
|
||||
|
||||
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
|
||||
|
||||
private boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();
|
||||
|
@ -76,13 +74,13 @@ public class SharedStoreSlavePolicyConfiguration implements HAPolicyConfiguratio
|
|||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getFailbackDelay() {
|
||||
return failbackDelay;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public SharedStoreSlavePolicyConfiguration setFailbackDelay(long failbackDelay) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -920,6 +920,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK));
|
||||
|
||||
configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO));
|
||||
|
||||
return configuration;
|
||||
}
|
||||
|
||||
|
@ -932,7 +934,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
configuration.setAllowFailBack(getBoolean(policyNode, "allow-failback", configuration.isAllowFailBack()));
|
||||
|
||||
configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO));
|
||||
configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO));
|
||||
|
||||
configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK));
|
||||
|
||||
|
@ -948,8 +950,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown()));
|
||||
|
||||
configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO));
|
||||
|
||||
return configuration;
|
||||
}
|
||||
|
||||
|
@ -960,8 +960,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown()));
|
||||
|
||||
configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO));
|
||||
|
||||
configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", configuration.isRestartBackup()));
|
||||
|
||||
configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode));
|
||||
|
|
|
@ -335,12 +335,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
|||
Journal getMessageJournal();
|
||||
|
||||
/**
|
||||
* @see org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager#startReplication(org.apache.activemq.artemis.core.replication.ReplicationManager, org.apache.activemq.artemis.core.paging.PagingManager, String, boolean)
|
||||
* @see org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager#startReplication(org.apache.activemq.artemis.core.replication.ReplicationManager, org.apache.activemq.artemis.core.paging.PagingManager, String, boolean, long)
|
||||
*/
|
||||
void startReplication(ReplicationManager replicationManager,
|
||||
PagingManager pagingManager,
|
||||
String nodeID,
|
||||
boolean autoFailBack) throws Exception;
|
||||
boolean autoFailBack,
|
||||
long initialReplicationSyncTimeout) throws Exception;
|
||||
|
||||
/**
|
||||
* Write message to page if we are paging.
|
||||
|
|
|
@ -305,7 +305,8 @@ public class JournalStorageManager implements StorageManager {
|
|||
public void startReplication(ReplicationManager replicationManager,
|
||||
PagingManager pagingManager,
|
||||
String nodeID,
|
||||
final boolean autoFailBack) throws Exception {
|
||||
final boolean autoFailBack,
|
||||
long initialReplicationSyncTimeout) throws Exception {
|
||||
if (!started) {
|
||||
throw new IllegalStateException("JournalStorageManager must be started...");
|
||||
}
|
||||
|
@ -376,7 +377,7 @@ public class JournalStorageManager implements StorageManager {
|
|||
storageManagerLock.writeLock().lock();
|
||||
try {
|
||||
if (replicator != null) {
|
||||
replicator.sendSynchronizationDone(nodeID);
|
||||
replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout);
|
||||
performCachedLargeMessageDeletes();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -473,7 +473,8 @@ public class NullStorageManager implements StorageManager {
|
|||
public void startReplication(final ReplicationManager replicationManager,
|
||||
final PagingManager pagingManager,
|
||||
final String nodeID,
|
||||
final boolean autoFailBack) throws Exception {
|
||||
final boolean autoFailBack,
|
||||
long initialReplicationSyncTimeout) throws Exception {
|
||||
// no-op
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
|
|||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_PAGE_WRITE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_PREPARE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
|
||||
|
@ -64,6 +65,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
||||
|
@ -120,6 +122,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
|||
packet = new ReplicationResponseMessage();
|
||||
break;
|
||||
}
|
||||
case REPLICATION_RESPONSE_V2: {
|
||||
packet = new ReplicationResponseMessageV2();
|
||||
break;
|
||||
}
|
||||
case REPLICATION_PAGE_WRITE: {
|
||||
packet = new ReplicationPageWriteMessage();
|
||||
break;
|
||||
|
|
|
@ -18,9 +18,13 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
|||
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
|
||||
public final class ReplicationResponseMessage extends PacketImpl {
|
||||
public class ReplicationResponseMessage extends PacketImpl {
|
||||
|
||||
public ReplicationResponseMessage() {
|
||||
super(PacketImpl.REPLICATION_RESPONSE);
|
||||
}
|
||||
|
||||
public ReplicationResponseMessage(byte replicationResponseV2) {
|
||||
super(replicationResponseV2);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
|
||||
public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage {
|
||||
boolean synchronizationIsFinishedAcknowledgement = false;
|
||||
|
||||
public ReplicationResponseMessageV2(final boolean synchronizationIsFinishedAcknowledgement) {
|
||||
super(REPLICATION_RESPONSE_V2);
|
||||
|
||||
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
|
||||
}
|
||||
|
||||
public ReplicationResponseMessageV2() {
|
||||
super(PacketImpl.REPLICATION_RESPONSE_V2);
|
||||
}
|
||||
|
||||
public boolean isSynchronizationIsFinishedAcknowledgement() {
|
||||
return synchronizationIsFinishedAcknowledgement;
|
||||
}
|
||||
|
||||
public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
|
||||
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeBoolean(synchronizationIsFinishedAcknowledgement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
synchronizationIsFinishedAcknowledgement = buffer.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buf = new StringBuffer(getParentString());
|
||||
buf.append(", synchronizationIsFinishedAcknowledgement=" + synchronizationIsFinishedAcknowledgement);
|
||||
buf.append("]");
|
||||
return buf.toString();
|
||||
}
|
||||
}
|
|
@ -203,4 +203,16 @@ public class ReplicationStartSyncMessage extends PacketImpl {
|
|||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buf = new StringBuffer(getParentString());
|
||||
buf.append(", synchronizationIsFinished=" + synchronizationIsFinished);
|
||||
buf.append(", dataType=" + dataType);
|
||||
buf.append(", nodeID=" + nodeID);
|
||||
buf.append(", ids=" + Arrays.toString(ids));
|
||||
buf.append(", allowsAutoFailBack=" + allowsAutoFailBack);
|
||||
buf.append("]");
|
||||
return buf.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage.SyncDataType;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||
|
@ -196,7 +197,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
handleLargeMessageEnd((ReplicationLargeMessageEndMessage) packet);
|
||||
}
|
||||
else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC) {
|
||||
handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet);
|
||||
response = handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet);
|
||||
}
|
||||
else if (type == PacketImpl.REPLICATION_SYNC_FILE) {
|
||||
handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
|
||||
|
@ -476,19 +477,23 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
*
|
||||
* @param packet
|
||||
* @throws Exception
|
||||
* @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise
|
||||
* return an empty response
|
||||
*/
|
||||
private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception {
|
||||
private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception {
|
||||
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
|
||||
if (activation.isRemoteBackupUpToDate()) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate();
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if (!started)
|
||||
return;
|
||||
return replicationResponseMessage;
|
||||
|
||||
if (packet.isSynchronizationFinished()) {
|
||||
finishSynchronization(packet.getNodeID());
|
||||
return;
|
||||
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
|
||||
return replicationResponseMessage;
|
||||
}
|
||||
|
||||
switch (packet.getDataType()) {
|
||||
|
@ -523,6 +528,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
|
||||
}
|
||||
}
|
||||
|
||||
return replicationResponseMessage;
|
||||
}
|
||||
|
||||
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) {
|
||||
|
|
|
@ -58,12 +58,15 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
||||
/**
|
||||
* Manages replication tasks on the live server (that is the live server side of a "remote backup"
|
||||
|
@ -116,6 +119,8 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
|
||||
private volatile boolean inSync = true;
|
||||
|
||||
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
|
||||
|
||||
/**
|
||||
* @param remotingConnection
|
||||
*/
|
||||
|
@ -392,8 +397,14 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
private final class ResponseHandler implements ChannelHandler {
|
||||
|
||||
public void handlePacket(final Packet packet) {
|
||||
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE) {
|
||||
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE || packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
|
||||
replicated();
|
||||
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
|
||||
ReplicationResponseMessageV2 replicationResponseMessage = (ReplicationResponseMessageV2) packet;
|
||||
if (replicationResponseMessage.isSynchronizationIsFinishedAcknowledgement()) {
|
||||
synchronizationIsFinishedAcknowledgement.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -534,9 +545,18 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
*
|
||||
* @param nodeID
|
||||
*/
|
||||
public void sendSynchronizationDone(String nodeID) {
|
||||
public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) {
|
||||
if (enabled) {
|
||||
synchronizationIsFinishedAcknowledgement.countUp();
|
||||
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
|
||||
try {
|
||||
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
ActiveMQServerLogger.LOGGER.debug(e);
|
||||
}
|
||||
inSync = false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -359,4 +359,7 @@ public interface ActiveMQMessageBundle {
|
|||
|
||||
@Message(id = 119113, value = "Invalid message load balancing type {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalArgumentException invalidMessageLoadBalancingType(String val);
|
||||
|
||||
@Message(id = 119114, value = "Replication synchronization process timed out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalStateException replicationSynchronizationTimeout(long timeout);
|
||||
}
|
||||
|
|
|
@ -51,9 +51,11 @@ public abstract class NodeManager implements ActiveMQComponent {
|
|||
|
||||
public abstract void awaitLiveNode() throws Exception;
|
||||
|
||||
public abstract void awaitLiveStatus() throws Exception;
|
||||
|
||||
public abstract void startBackup() throws Exception;
|
||||
|
||||
public abstract void startLiveNode() throws Exception;
|
||||
public abstract ActivateCallback startLiveNode() throws Exception;
|
||||
|
||||
public abstract void pauseLiveServer() throws Exception;
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ public class ReplicaPolicy extends BackupPolicy {
|
|||
//used if we create a replicated policy for when we become live.
|
||||
private boolean allowFailback = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
|
||||
|
||||
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
|
||||
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
|
||||
|
||||
private ReplicatedPolicy replicatedPolicy;
|
||||
|
||||
|
@ -48,14 +48,14 @@ public class ReplicaPolicy extends BackupPolicy {
|
|||
String groupName,
|
||||
boolean restartBackup,
|
||||
boolean allowFailback,
|
||||
long failbackDelay,
|
||||
long initialReplicationSyncTimeout,
|
||||
ScaleDownPolicy scaleDownPolicy) {
|
||||
this.clusterName = clusterName;
|
||||
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
|
||||
this.groupName = groupName;
|
||||
this.restartBackup = restartBackup;
|
||||
this.allowFailback = allowFailback;
|
||||
this.failbackDelay = failbackDelay;
|
||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||
this.scaleDownPolicy = scaleDownPolicy;
|
||||
}
|
||||
|
||||
|
@ -87,7 +87,7 @@ public class ReplicaPolicy extends BackupPolicy {
|
|||
|
||||
public ReplicatedPolicy getReplicatedPolicy() {
|
||||
if (replicatedPolicy == null) {
|
||||
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, failbackDelay, groupName, clusterName, this);
|
||||
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this);
|
||||
}
|
||||
return replicatedPolicy;
|
||||
}
|
||||
|
@ -137,12 +137,21 @@ public class ReplicaPolicy extends BackupPolicy {
|
|||
this.allowFailback = allowFailback;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getFailbackDelay() {
|
||||
return failbackDelay;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setFailbackDelay(long failbackDelay) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
}
|
||||
|
||||
public long getInitialReplicationSyncTimeout() {
|
||||
return initialReplicationSyncTimeout;
|
||||
}
|
||||
|
||||
public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
|
||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,14 +31,14 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
|
|||
|
||||
private String clusterName;
|
||||
|
||||
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
|
||||
|
||||
/*
|
||||
* these are only set by the ReplicaPolicy after failover to decide if the live server can failback, these should not
|
||||
* be exposed in configuration.
|
||||
* */
|
||||
private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
|
||||
|
||||
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
|
||||
|
||||
/*
|
||||
* this are only used as the policy when the server is started as a live after a failover
|
||||
* */
|
||||
|
@ -48,10 +48,11 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
|
|||
replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this);
|
||||
}
|
||||
|
||||
public ReplicatedPolicy(boolean checkForLiveServer, String groupName, String clusterName) {
|
||||
public ReplicatedPolicy(boolean checkForLiveServer, String groupName, String clusterName, long initialReplicationSyncTimeout) {
|
||||
this.checkForLiveServer = checkForLiveServer;
|
||||
this.groupName = groupName;
|
||||
this.clusterName = clusterName;
|
||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||
/*
|
||||
* we create this with sensible defaults in case we start after a failover
|
||||
* */
|
||||
|
@ -59,7 +60,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
|
|||
|
||||
public ReplicatedPolicy(boolean checkForLiveServer,
|
||||
boolean allowAutoFailBack,
|
||||
long failbackDelay,
|
||||
long initialReplicationSyncTimeout,
|
||||
String groupName,
|
||||
String clusterName,
|
||||
ReplicaPolicy replicaPolicy) {
|
||||
|
@ -67,7 +68,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
|
|||
this.clusterName = clusterName;
|
||||
this.groupName = groupName;
|
||||
this.allowAutoFailBack = allowAutoFailBack;
|
||||
this.failbackDelay = failbackDelay;
|
||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||
this.replicaPolicy = replicaPolicy;
|
||||
}
|
||||
|
||||
|
@ -83,12 +84,21 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
|
|||
return allowAutoFailBack;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getFailbackDelay() {
|
||||
return failbackDelay;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setFailbackDelay(long failbackDelay) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
}
|
||||
|
||||
public long getInitialReplicationSyncTimeout() {
|
||||
return initialReplicationSyncTimeout;
|
||||
}
|
||||
|
||||
public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
|
||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||
}
|
||||
|
||||
public String getClusterName() {
|
||||
|
|
|
@ -25,8 +25,6 @@ import java.util.Map;
|
|||
|
||||
public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
|
||||
|
||||
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
|
||||
|
||||
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
|
||||
|
||||
private SharedStoreSlavePolicy sharedStoreSlavePolicy;
|
||||
|
@ -34,17 +32,17 @@ public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
|
|||
public SharedStoreMasterPolicy() {
|
||||
}
|
||||
|
||||
public SharedStoreMasterPolicy(long failbackDelay, boolean failoverOnServerShutdown) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
public SharedStoreMasterPolicy(boolean failoverOnServerShutdown) {
|
||||
this.failoverOnServerShutdown = failoverOnServerShutdown;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getFailbackDelay() {
|
||||
return failbackDelay;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setFailbackDelay(long failbackDelay) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
}
|
||||
|
||||
public boolean isFailoverOnServerShutdown() {
|
||||
|
|
|
@ -25,8 +25,6 @@ import java.util.Map;
|
|||
|
||||
public class SharedStoreSlavePolicy extends BackupPolicy {
|
||||
|
||||
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
|
||||
|
||||
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
|
||||
|
||||
private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
|
||||
|
@ -37,24 +35,23 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
|
|||
public SharedStoreSlavePolicy() {
|
||||
}
|
||||
|
||||
public SharedStoreSlavePolicy(long failbackDelay,
|
||||
boolean failoverOnServerShutdown,
|
||||
public SharedStoreSlavePolicy(boolean failoverOnServerShutdown,
|
||||
boolean restartBackup,
|
||||
boolean allowAutoFailBack,
|
||||
ScaleDownPolicy scaleDownPolicy) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
this.failoverOnServerShutdown = failoverOnServerShutdown;
|
||||
this.restartBackup = restartBackup;
|
||||
this.allowAutoFailBack = allowAutoFailBack;
|
||||
this.scaleDownPolicy = scaleDownPolicy;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public long getFailbackDelay() {
|
||||
return failbackDelay;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setFailbackDelay(long failbackDelay) {
|
||||
this.failbackDelay = failbackDelay;
|
||||
}
|
||||
|
||||
public boolean isFailoverOnServerShutdown() {
|
||||
|
@ -67,7 +64,7 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
|
|||
|
||||
public SharedStoreMasterPolicy getSharedStoreMasterPolicy() {
|
||||
if (sharedStoreMasterPolicy == null) {
|
||||
sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failbackDelay, failoverOnServerShutdown);
|
||||
sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown);
|
||||
}
|
||||
return sharedStoreMasterPolicy;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.nio.channels.FileLock;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.ActivateCallback;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.utils.UUID;
|
||||
|
@ -153,7 +154,7 @@ public class FileLockNodeManager extends NodeManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void startLiveNode() throws Exception {
|
||||
public ActivateCallback startLiveNode() throws Exception {
|
||||
setFailingBack();
|
||||
|
||||
String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds";
|
||||
|
@ -164,7 +165,29 @@ public class FileLockNodeManager extends NodeManager {
|
|||
|
||||
ActiveMQServerLogger.LOGGER.obtainedLiveLock();
|
||||
|
||||
setLive();
|
||||
return new ActivateCallback() {
|
||||
@Override
|
||||
public void preActivate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activated() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deActivate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activationComplete() {
|
||||
try {
|
||||
setLive();
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -183,6 +206,13 @@ public class FileLockNodeManager extends NodeManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitLiveStatus() throws Exception {
|
||||
while (getState() != LIVE) {
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
}
|
||||
|
||||
private void setLive() throws Exception {
|
||||
writeFileLockStatus(FileLockNodeManager.LIVE);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.ActivateCallback;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
||||
|
@ -90,16 +91,45 @@ public final class InVMNodeManager extends NodeManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitLiveStatus() throws Exception {
|
||||
while (state != LIVE) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBackup() throws Exception {
|
||||
backupLock.acquire();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startLiveNode() throws Exception {
|
||||
public ActivateCallback startLiveNode() throws Exception {
|
||||
state = FAILING_BACK;
|
||||
liveLock.acquire();
|
||||
state = LIVE;
|
||||
return new ActivateCallback() {
|
||||
@Override
|
||||
public void preActivate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activated() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deActivate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activationComplete() {
|
||||
try {
|
||||
state = LIVE;
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -110,8 +140,6 @@ public final class InVMNodeManager extends NodeManager {
|
|||
|
||||
@Override
|
||||
public void crashLiveServer() throws Exception {
|
||||
//overkill as already set to live
|
||||
state = LIVE;
|
||||
liveLock.release();
|
||||
}
|
||||
|
||||
|
|
|
@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
|||
Thread t = new Thread(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), activeMQServer.getNodeID().toString(), isFailBackRequest && replicatedPolicy.isAllowAutoFailBack());
|
||||
activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), activeMQServer.getNodeID().toString(), isFailBackRequest && replicatedPolicy.isAllowAutoFailBack(), replicatedPolicy.getInitialReplicationSyncTimeout());
|
||||
|
||||
clusterConnection.nodeAnnounced(System.currentTimeMillis(), activeMQServer.getNodeID().toString(), replicatedPolicy.getGroupName(), replicatedPolicy.getScaleDownGroupName(), pair, true);
|
||||
|
||||
|
@ -168,13 +168,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
|||
BackupTopologyListener listener1 = new BackupTopologyListener(activeMQServer.getNodeID().toString());
|
||||
clusterConnection.addClusterTopologyListener(listener1);
|
||||
if (listener1.waitForBackup()) {
|
||||
try {
|
||||
Thread.sleep(replicatedPolicy.getFailbackDelay());
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
//
|
||||
}
|
||||
//if we have to many backups kept or arent configured to restart just stop, otherwise restart as a backup
|
||||
//if we have to many backups kept or are not configured to restart just stop, otherwise restart as a backup
|
||||
if (!replicatedPolicy.getReplicaPolicy().isRestartBackup() && activeMQServer.countNumberOfCopiedJournals() >= replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() && replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() >= 0) {
|
||||
activeMQServer.stop(true);
|
||||
ActiveMQServerLogger.LOGGER.stopReplicatedBackupAfterFailback();
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.impl;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
|
@ -30,9 +33,6 @@ import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy
|
|||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public final class SharedStoreBackupActivation extends Activation {
|
||||
|
||||
//this is how we act as a backup
|
||||
|
@ -191,38 +191,51 @@ public final class SharedStoreBackupActivation extends Activation {
|
|||
}
|
||||
|
||||
private class FailbackChecker implements Runnable {
|
||||
BackupTopologyListener backupListener;
|
||||
|
||||
FailbackChecker() {
|
||||
backupListener = new BackupTopologyListener(activeMQServer.getNodeID().toString());
|
||||
activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener);
|
||||
}
|
||||
|
||||
private boolean restarting = false;
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) {
|
||||
ActiveMQServerLogger.LOGGER.awaitFailBack();
|
||||
restarting = true;
|
||||
Thread t = new Thread(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Stopping live node in favor of failback");
|
||||
if (backupListener.waitForBackup()) {
|
||||
ActiveMQServerLogger.LOGGER.awaitFailBack();
|
||||
restarting = true;
|
||||
Thread t = new Thread(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Stopping live node in favor of failback");
|
||||
|
||||
activeMQServer.stop(true, false, true);
|
||||
// We need to wait some time before we start the backup again
|
||||
// otherwise we may eventually start before the live had a chance to get it
|
||||
Thread.sleep(sharedStoreSlavePolicy.getFailbackDelay());
|
||||
synchronized (failbackCheckerGuard) {
|
||||
if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
|
||||
return;
|
||||
NodeManager nodeManager = activeMQServer.getNodeManager();
|
||||
activeMQServer.stop(true, false, true);
|
||||
|
||||
activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
|
||||
ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Starting backup node now after failback");
|
||||
activeMQServer.start();
|
||||
// ensure that the server to which we are failing back actually starts fully before we restart
|
||||
nodeManager.start();
|
||||
nodeManager.awaitLiveStatus();
|
||||
nodeManager.stop();
|
||||
|
||||
synchronized (failbackCheckerGuard) {
|
||||
if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
|
||||
return;
|
||||
|
||||
activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
|
||||
ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Starting backup node now after failback");
|
||||
activeMQServer.start();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.serverRestartWarning();
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.serverRestartWarning();
|
||||
}
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
});
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -55,10 +55,9 @@ public final class SharedStoreLiveActivation extends LiveActivation {
|
|||
}
|
||||
activeMQServer.getBackupManager().start();
|
||||
activeMQServer.getBackupManager().announceBackup();
|
||||
Thread.sleep(sharedStoreMasterPolicy.getFailbackDelay());
|
||||
}
|
||||
|
||||
activeMQServer.getNodeManager().startLiveNode();
|
||||
activeMQServer.registerActivateCallback(activeMQServer.getNodeManager().startLiveNode());
|
||||
|
||||
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
|
||||
return;
|
||||
|
|
|
@ -1624,6 +1624,14 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The amount of time to wait for the replica to acknowledge it has received all the necessary data from
|
||||
the replicating server at the final step of the initial replication synchronization process.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
</xsd:complexType>
|
||||
<xsd:complexType name="replicaPolicyType">
|
||||
|
@ -1681,7 +1689,16 @@
|
|||
<xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
if we have to start as a replicated server this is the delay to wait before fail-back occurs
|
||||
DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back occurs
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
If we have to start as a replicated server this is the amount of time to wait for the replica to
|
||||
acknowledge it has received all the necessary data from the replicating server at the final step
|
||||
of the initial replication synchronization process.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
@ -1736,7 +1753,7 @@
|
|||
<xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
delay to wait before fail-back occurs on (live's) restart
|
||||
DEPRECATED: delay to wait before fail-back occurs on (live's) restart
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
@ -1764,7 +1781,7 @@
|
|||
<xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
delay to wait before fail-back occurs on (live's) restart
|
||||
DEPRECATED: delay to wait before fail-back occurs on (live's) restart
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
|
|
@ -121,6 +121,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
|
|||
assertEquals(replicatedPolicy.getGroupName(), "purple");
|
||||
assertTrue(replicatedPolicy.isCheckForLiveServer());
|
||||
assertEquals(replicatedPolicy.getClusterName(), "abcdefg");
|
||||
assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876);
|
||||
}
|
||||
finally {
|
||||
server.stop();
|
||||
|
@ -142,6 +143,8 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
|
|||
assertEquals(replicaPolicy.getMaxSavedReplicatedJournalsSize(), 22);
|
||||
assertEquals(replicaPolicy.getClusterName(), "33rrrrr");
|
||||
assertFalse(replicaPolicy.isRestartBackup());
|
||||
assertTrue(replicaPolicy.isAllowFailback());
|
||||
assertEquals(replicaPolicy.getInitialReplicationSyncTimeout(), 9876);
|
||||
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
|
||||
assertNotNull(scaleDownPolicy);
|
||||
assertEquals(scaleDownPolicy.getGroupName(), "boo!");
|
||||
|
@ -219,7 +222,6 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
|
|||
HAPolicy haPolicy = server.getHAPolicy();
|
||||
assertTrue(haPolicy instanceof SharedStoreMasterPolicy);
|
||||
SharedStoreMasterPolicy masterPolicy = (SharedStoreMasterPolicy) haPolicy;
|
||||
assertEquals(masterPolicy.getFailbackDelay(), 3456);
|
||||
assertFalse(masterPolicy.isFailoverOnServerShutdown());
|
||||
}
|
||||
finally {
|
||||
|
@ -237,11 +239,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
|
|||
assertTrue(activation instanceof SharedStoreBackupActivation);
|
||||
HAPolicy haPolicy = server.getHAPolicy();
|
||||
assertTrue(haPolicy instanceof SharedStoreSlavePolicy);
|
||||
SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy;
|
||||
assertEquals(replicaPolicy.getFailbackDelay(), 9876);
|
||||
assertFalse(replicaPolicy.isFailoverOnServerShutdown());
|
||||
assertFalse(replicaPolicy.isRestartBackup());
|
||||
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
|
||||
SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy;
|
||||
assertFalse(sharedStoreSlavePolicy.isFailoverOnServerShutdown());
|
||||
assertFalse(sharedStoreSlavePolicy.isRestartBackup());
|
||||
ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
|
||||
assertNotNull(scaleDownPolicy);
|
||||
assertEquals(scaleDownPolicy.getGroupName(), "boo!");
|
||||
assertEquals(scaleDownPolicy.getDiscoveryGroup(), "wahey");
|
||||
|
@ -264,11 +265,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
|
|||
assertTrue(activation instanceof SharedStoreBackupActivation);
|
||||
HAPolicy haPolicy = server.getHAPolicy();
|
||||
assertTrue(haPolicy instanceof SharedStoreSlavePolicy);
|
||||
SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy;
|
||||
assertEquals(replicaPolicy.getFailbackDelay(), 5678);
|
||||
assertTrue(replicaPolicy.isFailoverOnServerShutdown());
|
||||
assertTrue(replicaPolicy.isRestartBackup());
|
||||
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
|
||||
SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy;
|
||||
assertTrue(sharedStoreSlavePolicy.isFailoverOnServerShutdown());
|
||||
assertTrue(sharedStoreSlavePolicy.isRestartBackup());
|
||||
ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
|
||||
assertNotNull(scaleDownPolicy);
|
||||
assertEquals(scaleDownPolicy.getGroupName(), "boo!");
|
||||
assertEquals(scaleDownPolicy.getDiscoveryGroup(), null);
|
||||
|
@ -293,11 +293,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
|
|||
assertTrue(activation instanceof SharedStoreBackupActivation);
|
||||
HAPolicy haPolicy = server.getHAPolicy();
|
||||
assertTrue(haPolicy instanceof SharedStoreSlavePolicy);
|
||||
SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy;
|
||||
assertEquals(replicaPolicy.getFailbackDelay(), 5678);
|
||||
assertTrue(replicaPolicy.isFailoverOnServerShutdown());
|
||||
assertTrue(replicaPolicy.isRestartBackup());
|
||||
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
|
||||
SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy;
|
||||
assertTrue(sharedStoreSlavePolicy.isFailoverOnServerShutdown());
|
||||
assertTrue(sharedStoreSlavePolicy.isRestartBackup());
|
||||
ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
|
||||
assertNull(scaleDownPolicy);
|
||||
}
|
||||
finally {
|
||||
|
@ -349,10 +348,8 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
|
|||
assertNotNull(livePolicy);
|
||||
|
||||
assertFalse(livePolicy.isFailoverOnServerShutdown());
|
||||
assertEquals(livePolicy.getFailbackDelay(), 1234);
|
||||
SharedStoreSlavePolicy backupPolicy = (SharedStoreSlavePolicy) colocatedPolicy.getBackupPolicy();
|
||||
assertNotNull(backupPolicy);
|
||||
assertEquals(backupPolicy.getFailbackDelay(), 44);
|
||||
assertFalse(backupPolicy.isFailoverOnServerShutdown());
|
||||
assertFalse(backupPolicy.isRestartBackup());
|
||||
}
|
||||
|
|
|
@ -28,19 +28,15 @@
|
|||
<request-backup>false</request-backup>
|
||||
<backup-port-offset>33</backup-port-offset>
|
||||
<master>
|
||||
<failback-delay>1234</failback-delay>
|
||||
<failover-on-shutdown>false</failover-on-shutdown>
|
||||
</master>
|
||||
<slave>
|
||||
<failback-delay>44</failback-delay>
|
||||
<failover-on-shutdown>false</failover-on-shutdown>
|
||||
<restart-backup>false</restart-backup>
|
||||
<scale-down/>
|
||||
</slave>
|
||||
</colocated>
|
||||
|
||||
</shared-store>
|
||||
</ha-policy>
|
||||
</core>c
|
||||
|
||||
</core>
|
||||
</configuration>
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
<cluster-name>33rrrrr</cluster-name>
|
||||
<restart-backup>false</restart-backup>
|
||||
<allow-failback>true</allow-failback>
|
||||
<failback-delay>9876</failback-delay>
|
||||
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
|
||||
<scale-down>
|
||||
<!--a grouping of servers that can be scaled down to-->
|
||||
<group-name>boo!</group-name>
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
<group-name>purple</group-name>
|
||||
<check-for-live-server>true</check-for-live-server>
|
||||
<cluster-name>abcdefg</cluster-name>
|
||||
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
|
||||
</master>
|
||||
</replication>
|
||||
</ha-policy>
|
||||
|
|
|
@ -22,11 +22,9 @@
|
|||
<ha-policy>
|
||||
<shared-store>
|
||||
<master>
|
||||
<failback-delay>3456</failback-delay>
|
||||
<failover-on-shutdown>false</failover-on-shutdown>
|
||||
</master>
|
||||
</shared-store>
|
||||
</ha-policy>
|
||||
</core>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
<shared-store>
|
||||
<slave>
|
||||
<allow-failback>true</allow-failback>
|
||||
<failback-delay>9876</failback-delay>
|
||||
<failover-on-shutdown>false</failover-on-shutdown>
|
||||
<restart-backup>false</restart-backup>
|
||||
<scale-down>
|
||||
|
|
|
@ -22,7 +22,6 @@
|
|||
<ha-policy>
|
||||
<shared-store>
|
||||
<slave>
|
||||
<failback-delay>5678</failback-delay>
|
||||
<failover-on-shutdown>true</failover-on-shutdown>
|
||||
<restart-backup>true</restart-backup>
|
||||
<scale-down>
|
||||
|
@ -38,5 +37,4 @@
|
|||
</shared-store>
|
||||
</ha-policy>
|
||||
</core>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -22,12 +22,10 @@
|
|||
<ha-policy>
|
||||
<shared-store>
|
||||
<slave>
|
||||
<failback-delay>5678</failback-delay>
|
||||
<failover-on-shutdown>true</failover-on-shutdown>
|
||||
<restart-backup>true</restart-backup>
|
||||
</slave>
|
||||
</shared-store>
|
||||
</ha-policy>
|
||||
</core>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -113,8 +113,13 @@ the connection speed.
|
|||
|
||||
> **Note**
|
||||
>
|
||||
> Synchronization occurs in parallel with current network traffic so
|
||||
> this won't cause any blocking on current clients.
|
||||
> In general, synchronization occurs in parallel with current network traffic so
|
||||
> this won't cause any blocking on current clients. However, there is a critical
|
||||
> moment at the end of this process where the replicating server must complete
|
||||
> the synchronization and ensure the replica acknowledges this completion. This
|
||||
> exchange between the replicating server and replica will block any journal
|
||||
> related operations. The maximum length of time that this exchange will block
|
||||
> is controlled by the `initial-replication-sync-timeout` configuration element.
|
||||
|
||||
Replication will create a copy of the data at the backup. One issue to
|
||||
be aware of is: in case of a successful fail-over, the backup's data
|
||||
|
@ -257,11 +262,14 @@ HA strategy Replication for `master`:
|
|||
</tr>
|
||||
<tr>
|
||||
<td>`group-name`</td>
|
||||
<td>Whether to check the cluster for a (live) server using our own server ID when starting up. This option is only necessary for performing 'fail-back' on replicating servers.</td>
|
||||
<td>If set, backup servers will only pair with live servers with matching group-name.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`check-for-live-server`</td>
|
||||
<td>If set, backup servers will only pair with live servers with matching group-name.</td>
|
||||
<td>`initial-replication-sync-timeout`</td>
|
||||
<td>The amount of time the replicating server will wait at the completion of the initial
|
||||
replication process for the replica to acknowledge it has received all the necessary
|
||||
data. The default is 30,000 milliseconds. <strong>Note</strong>: during this interval any
|
||||
journal related operations will be blocked.</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
@ -309,8 +317,14 @@ HA strategy Replication for `slave`:
|
|||
failed over</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`failback-delay`</td>
|
||||
<td>delay to wait before fail-back occurs on (failed over live's) restart</td>
|
||||
<td>`initial-replication-sync-timeout`</td>
|
||||
<td>After failover and the slave has become live, this is
|
||||
set on the new live server. It represents the amount of time
|
||||
the replicating server will wait at the completion of the
|
||||
initial replication process for the replica to acknowledge
|
||||
it has received all the necessary data. The default is
|
||||
30,000 milliseconds. <strong>Note</strong>: during this interval any
|
||||
journal related operations will be blocked.</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
@ -405,16 +419,10 @@ stop. This configuration would look like:
|
|||
<shared-store>
|
||||
<slave>
|
||||
<allow-failback>true</allow-failback>
|
||||
<failback-delay>5000</failback-delay>
|
||||
</slave>
|
||||
</shared-store>
|
||||
</ha-policy>
|
||||
|
||||
|
||||
The `failback-delay` configures how long the backup must wait after
|
||||
automatically stopping before it restarts. This is to gives the live
|
||||
server time to start and obtain its lock.
|
||||
|
||||
In replication HA mode you need to set an extra property
|
||||
`check-for-live-server` to `true` in the `master` configuration. If set
|
||||
to true, during start-up a live server will first search the cluster for
|
||||
|
@ -491,13 +499,6 @@ HA strategy shared store for `master`:
|
|||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>`failback-delay`</td>
|
||||
<td>If a backup server is detected as being live,
|
||||
via the lock file, then the live server will wait
|
||||
announce itself as a backup and wait this amount
|
||||
of time (in ms) before starting as a live</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`failover-on-server-shutdown`</td>
|
||||
<td>If set to true then when this server is stopped
|
||||
|
@ -512,7 +513,7 @@ HA strategy shared store for `master`:
|
|||
The following table lists all the `ha-policy` configuration elements for
|
||||
HA strategy Shared Store for `slave`:
|
||||
|
||||
<table summary="HA Replication Slave Policy" border="1">
|
||||
<table summary="HA Shared Store Slave Policy" border="1">
|
||||
<colgroup>
|
||||
<col/>
|
||||
<col/>
|
||||
|
@ -539,17 +540,6 @@ HA strategy Shared Store for `slave`:
|
|||
places a request to take over its place. The use case is
|
||||
when the backup has failed over.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>`failback-delay`</td>
|
||||
<td>After failover and the slave has become live, this is
|
||||
set on the new live server. When starting If a backup server
|
||||
is detected as being live, via the lock file, then the live
|
||||
server will wait announce itself as a backup and wait this
|
||||
amount of time (in ms) before starting as a live, however
|
||||
this is unlikely since this backup has just stopped anyway.
|
||||
It is also used as the delay after failback before this backup
|
||||
will restart (if `allow-failback` is set to true.</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ public class NodeManagerAction {
|
|||
for (int action : work) {
|
||||
switch (action) {
|
||||
case START_LIVE:
|
||||
nodeManager.startLiveNode();
|
||||
nodeManager.startLiveNode().activationComplete();
|
||||
hasLiveLock = true;
|
||||
hasBackupLock = false;
|
||||
break;
|
||||
|
|
|
@ -249,11 +249,11 @@ public class FailBackAutoTest extends FailoverTestBase {
|
|||
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||
|
||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setRestartBackup(true)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setRestartBackup(true)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
|
||||
backupServer = createTestableServer(backupConfig);
|
||||
|
||||
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(100)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector);
|
||||
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector);
|
||||
|
||||
liveServer = createTestableServer(liveConfig);
|
||||
}
|
||||
|
|
|
@ -187,11 +187,11 @@ public class FailoverListenerTest extends FailoverTestBase {
|
|||
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||
|
||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
|
||||
backupServer = createTestableServer(backupConfig);
|
||||
|
||||
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector);
|
||||
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector);
|
||||
|
||||
liveServer = createTestableServer(liveConfig);
|
||||
}
|
||||
|
|
|
@ -159,11 +159,11 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
|
|||
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||
|
||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
|
||||
backupServer = createTestableServer(backupConfig);
|
||||
|
||||
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
|
||||
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
|
||||
|
||||
liveServer = createTestableServer(liveConfig);
|
||||
}
|
||||
|
@ -191,7 +191,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
protected void setupHAPolicyConfiguration() {
|
||||
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(0).setAllowFailBack(true).setFailbackDelay(5000);
|
||||
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(0).setAllowFailBack(true);
|
||||
}
|
||||
|
||||
protected final void adaptLiveConfigForReplicatedFailBack(TestableServer server) {
|
||||
|
|
|
@ -54,11 +54,11 @@ public class LiveToLiveFailoverTest extends FailoverTest {
|
|||
TransportConfiguration liveConnector0 = getConnectorTransportConfiguration(true, 0);
|
||||
TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(true, 1);
|
||||
|
||||
backupConfig = super.createDefaultInVMConfig(1).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 1)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration().addConnector(liveConnector1.getName())))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector1.getName(), liveConnector0.getName()));
|
||||
backupConfig = super.createDefaultInVMConfig(1).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 1)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration().addConnector(liveConnector1.getName())))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector1.getName(), liveConnector0.getName()));
|
||||
|
||||
backupServer = createColocatedTestableServer(backupConfig, nodeManager1, nodeManager0, 1);
|
||||
|
||||
liveConfig = super.createDefaultInVMConfig(0).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 0)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setBackupRequestRetryInterval(1000).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration()))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector0.getName(), liveConnector1.getName()));
|
||||
liveConfig = super.createDefaultInVMConfig(0).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 0)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setBackupRequestRetryInterval(1000).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration()))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector0.getName(), liveConnector1.getName()));
|
||||
|
||||
liveServer = createColocatedTestableServer(liveConfig, nodeManager0, nodeManager1, 0);
|
||||
}
|
||||
|
|
|
@ -82,7 +82,6 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase {
|
|||
|
||||
if (isSharedStore()) {
|
||||
haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration();
|
||||
((SharedStoreMasterPolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000);
|
||||
}
|
||||
else {
|
||||
haPolicyConfiguration = new ReplicatedPolicyConfiguration();
|
||||
|
@ -128,11 +127,9 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase {
|
|||
|
||||
if (isSharedStore()) {
|
||||
haPolicyConfiguration = new SharedStoreSlavePolicyConfiguration();
|
||||
((SharedStoreSlavePolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000);
|
||||
}
|
||||
else {
|
||||
haPolicyConfiguration = new ReplicaPolicyConfiguration();
|
||||
((ReplicaPolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000);
|
||||
if (getNodeGroupName() != null) {
|
||||
((ReplicaPolicyConfiguration) haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i);
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ public class ReplicatedFailoverTest extends FailoverTest {
|
|||
protected void setupHAPolicyConfiguration() {
|
||||
if (isReplicatedFailbackTest) {
|
||||
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true);
|
||||
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true).setFailbackDelay(2000);
|
||||
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true);
|
||||
}
|
||||
else {
|
||||
super.setupHAPolicyConfiguration();
|
||||
|
|
|
@ -80,7 +80,7 @@ public class SecurityFailoverTest extends FailoverTest {
|
|||
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||
|
||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setSecurityEnabled(true).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setSecurityEnabled(true).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
|
||||
backupServer = createTestableServer(backupConfig);
|
||||
ActiveMQSecurityManagerImpl securityManager = installSecurity(backupServer);
|
||||
|
|
|
@ -60,11 +60,11 @@ public class SharedStoreBackupTest extends FailoverTestBase {
|
|||
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||
System.out.println("backup config created - mnovak");
|
||||
backupConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false)).setRestartBackup(false)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
backupConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false)).setRestartBackup(false)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
||||
|
||||
backupServer = createTestableServer(backupConfig);
|
||||
|
||||
liveConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000).setFailoverOnServerShutdown(true)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
|
||||
liveConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
|
||||
|
||||
liveServer = createTestableServer(liveConfig);
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
|||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
||||
import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
|
@ -171,7 +172,7 @@ public class BackupSyncDelay implements Interceptor {
|
|||
receivedUpToDate = true;
|
||||
assert onHold == null;
|
||||
onHold = packet;
|
||||
PacketImpl response = new ReplicationResponseMessage();
|
||||
PacketImpl response = new ReplicationResponseMessageV2(true);
|
||||
channel.send(response);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Map;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.core.server.ActivateCallback;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
|
||||
|
@ -180,12 +181,33 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
|
|||
public void awaitLiveNode() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitLiveStatus() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBackup() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startLiveNode() throws Exception {
|
||||
public ActivateCallback startLiveNode() throws Exception {
|
||||
return new ActivateCallback() {
|
||||
@Override
|
||||
public void preActivate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activated() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deActivate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activationComplete() {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue