This closes #204 failback changes

This commit is contained in:
Clebert Suconic 2015-10-20 14:55:31 -04:00
commit c35651fd6f
49 changed files with 419 additions and 191 deletions

View File

@ -374,8 +374,8 @@ public final class ActiveMQDefaultConfiguration {
// Whether a server will automatically stop when another places a request to take over its place. The use case is when a regular server stops and its backup takes over its duties, later the main server restarts and requests the server (the former backup) to stop operating. // Whether a server will automatically stop when another places a request to take over its place. The use case is when a regular server stops and its backup takes over its duties, later the main server restarts and requests the server (the former backup) to stop operating.
private static boolean DEFAULT_ALLOW_AUTO_FAILBACK = true; private static boolean DEFAULT_ALLOW_AUTO_FAILBACK = true;
// if we have to start as a replicated server this is the delay to wait before fail-back occurs // When a replica comes online this is how long the replicating server will wait for a confirmation from the replica that the replication synchronization process is complete
private static long DEFAULT_FAILBACK_DELAY = 5000; private static long DEFAULT_INITIAL_REPLICATION_SYNC_TIMEOUT = 30000;
// Will this backup server come live on a normal server shutdown // Will this backup server come live on a normal server shutdown
private static boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false; private static boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false;
@ -987,8 +987,8 @@ public final class ActiveMQDefaultConfiguration {
/** /**
* if we have to start as a replicated server this is the delay to wait before fail-back occurs * if we have to start as a replicated server this is the delay to wait before fail-back occurs
*/ */
public static long getDefaultFailbackDelay() { public static long getDefaultInitialReplicationSyncTimeout() {
return DEFAULT_FAILBACK_DELAY; return DEFAULT_INITIAL_REPLICATION_SYNC_TIMEOUT;
} }
/** /**

View File

@ -243,6 +243,8 @@ public class PacketImpl implements Packet {
public static final byte SESS_BINDINGQUERY_RESP_V2 = -8; public static final byte SESS_BINDINGQUERY_RESP_V2 = -8;
public static final byte REPLICATION_RESPONSE_V2 = -9;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
public PacketImpl(final byte type) { public PacketImpl(final byte type) {

View File

@ -64,19 +64,19 @@ public final class ConfigurationUtils {
} }
case REPLICATED: { case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName()); return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout());
} }
case REPLICA: { case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getFailbackDelay(), getScaleDownPolicy(pc.getScaleDownConfiguration())); return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()));
} }
case SHARED_STORE_MASTER: { case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;
return new SharedStoreMasterPolicy(pc.getFailbackDelay(), pc.isFailoverOnServerShutdown()); return new SharedStoreMasterPolicy(pc.isFailoverOnServerShutdown());
} }
case SHARED_STORE_SLAVE: { case SHARED_STORE_SLAVE: {
SharedStoreSlavePolicyConfiguration pc = (SharedStoreSlavePolicyConfiguration) conf; SharedStoreSlavePolicyConfiguration pc = (SharedStoreSlavePolicyConfiguration) conf;
return new SharedStoreSlavePolicy(pc.getFailbackDelay(), pc.isFailoverOnServerShutdown(), pc.isRestartBackup(), pc.isAllowFailBack(), getScaleDownPolicy(pc.getScaleDownConfiguration())); return new SharedStoreSlavePolicy(pc.isFailoverOnServerShutdown(), pc.isRestartBackup(), pc.isAllowFailBack(), getScaleDownPolicy(pc.getScaleDownConfiguration()));
} }
case COLOCATED: { case COLOCATED: {
ColocatedPolicyConfiguration pc = (ColocatedPolicyConfiguration) conf; ColocatedPolicyConfiguration pc = (ColocatedPolicyConfiguration) conf;

View File

@ -37,7 +37,7 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
* */ * */
private boolean allowFailBack = false; private boolean allowFailBack = false;
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
public ReplicaPolicyConfiguration() { public ReplicaPolicyConfiguration() {
} }
@ -101,12 +101,22 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration {
return this; return this;
} }
@Deprecated
public ReplicaPolicyConfiguration setFailbackDelay(long failbackDelay) { public ReplicaPolicyConfiguration setFailbackDelay(long failbackDelay) {
this.failbackDelay = failbackDelay;
return this; return this;
} }
@Deprecated
public long getFailbackDelay() { public long getFailbackDelay() {
return failbackDelay; return -1;
}
public long getInitialReplicationSyncTimeout() {
return initialReplicationSyncTimeout;
}
public ReplicaPolicyConfiguration setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
return this;
} }
} }

View File

@ -27,6 +27,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
private String clusterName = null; private String clusterName = null;
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
public ReplicatedPolicyConfiguration() { public ReplicatedPolicyConfiguration() {
} }
@ -61,4 +63,12 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration {
this.clusterName = clusterName; this.clusterName = clusterName;
return this; return this;
} }
public long getInitialReplicationSyncTimeout() {
return initialReplicationSyncTimeout;
}
public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
}
} }

View File

@ -21,8 +21,6 @@ import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfiguration { public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfiguration {
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
public SharedStoreMasterPolicyConfiguration() { public SharedStoreMasterPolicyConfiguration() {
@ -33,12 +31,13 @@ public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfigurati
return TYPE.SHARED_STORE_MASTER; return TYPE.SHARED_STORE_MASTER;
} }
@Deprecated
public long getFailbackDelay() { public long getFailbackDelay() {
return failbackDelay; return -1;
} }
@Deprecated
public SharedStoreMasterPolicyConfiguration setFailbackDelay(long failbackDelay) { public SharedStoreMasterPolicyConfiguration setFailbackDelay(long failbackDelay) {
this.failbackDelay = failbackDelay;
return this; return this;
} }

View File

@ -22,8 +22,6 @@ import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
public class SharedStoreSlavePolicyConfiguration implements HAPolicyConfiguration { public class SharedStoreSlavePolicyConfiguration implements HAPolicyConfiguration {
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
private boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup(); private boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();
@ -76,13 +74,13 @@ public class SharedStoreSlavePolicyConfiguration implements HAPolicyConfiguratio
return this; return this;
} }
@Deprecated
public long getFailbackDelay() { public long getFailbackDelay() {
return failbackDelay; return -1;
} }
@Deprecated
public SharedStoreSlavePolicyConfiguration setFailbackDelay(long failbackDelay) { public SharedStoreSlavePolicyConfiguration setFailbackDelay(long failbackDelay) {
this.failbackDelay = failbackDelay;
return this; return this;
} }
} }

View File

@ -920,6 +920,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK)); configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK));
configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO));
return configuration; return configuration;
} }
@ -932,7 +934,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setAllowFailBack(getBoolean(policyNode, "allow-failback", configuration.isAllowFailBack())); configuration.setAllowFailBack(getBoolean(policyNode, "allow-failback", configuration.isAllowFailBack()));
configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO)); configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO));
configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK)); configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK));
@ -948,8 +950,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown())); configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown()));
configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO));
return configuration; return configuration;
} }
@ -960,8 +960,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown())); configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown()));
configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO));
configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", configuration.isRestartBackup())); configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", configuration.isRestartBackup()));
configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode)); configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode));

View File

@ -335,12 +335,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
Journal getMessageJournal(); Journal getMessageJournal();
/** /**
* @see org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager#startReplication(org.apache.activemq.artemis.core.replication.ReplicationManager, org.apache.activemq.artemis.core.paging.PagingManager, String, boolean) * @see org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager#startReplication(org.apache.activemq.artemis.core.replication.ReplicationManager, org.apache.activemq.artemis.core.paging.PagingManager, String, boolean, long)
*/ */
void startReplication(ReplicationManager replicationManager, void startReplication(ReplicationManager replicationManager,
PagingManager pagingManager, PagingManager pagingManager,
String nodeID, String nodeID,
boolean autoFailBack) throws Exception; boolean autoFailBack,
long initialReplicationSyncTimeout) throws Exception;
/** /**
* Write message to page if we are paging. * Write message to page if we are paging.

View File

@ -305,7 +305,8 @@ public class JournalStorageManager implements StorageManager {
public void startReplication(ReplicationManager replicationManager, public void startReplication(ReplicationManager replicationManager,
PagingManager pagingManager, PagingManager pagingManager,
String nodeID, String nodeID,
final boolean autoFailBack) throws Exception { final boolean autoFailBack,
long initialReplicationSyncTimeout) throws Exception {
if (!started) { if (!started) {
throw new IllegalStateException("JournalStorageManager must be started..."); throw new IllegalStateException("JournalStorageManager must be started...");
} }
@ -376,7 +377,7 @@ public class JournalStorageManager implements StorageManager {
storageManagerLock.writeLock().lock(); storageManagerLock.writeLock().lock();
try { try {
if (replicator != null) { if (replicator != null) {
replicator.sendSynchronizationDone(nodeID); replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout);
performCachedLargeMessageDeletes(); performCachedLargeMessageDeletes();
} }
} }

View File

@ -473,7 +473,8 @@ public class NullStorageManager implements StorageManager {
public void startReplication(final ReplicationManager replicationManager, public void startReplication(final ReplicationManager replicationManager,
final PagingManager pagingManager, final PagingManager pagingManager,
final String nodeID, final String nodeID,
final boolean autoFailBack) throws Exception { final boolean autoFailBack,
long initialReplicationSyncTimeout) throws Exception {
// no-op // no-op
} }

View File

@ -35,6 +35,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_PAGE_WRITE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_PAGE_WRITE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_PREPARE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_PREPARE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
@ -64,6 +65,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
@ -120,6 +122,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
packet = new ReplicationResponseMessage(); packet = new ReplicationResponseMessage();
break; break;
} }
case REPLICATION_RESPONSE_V2: {
packet = new ReplicationResponseMessageV2();
break;
}
case REPLICATION_PAGE_WRITE: { case REPLICATION_PAGE_WRITE: {
packet = new ReplicationPageWriteMessage(); packet = new ReplicationPageWriteMessage();
break; break;

View File

@ -18,9 +18,13 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public final class ReplicationResponseMessage extends PacketImpl { public class ReplicationResponseMessage extends PacketImpl {
public ReplicationResponseMessage() { public ReplicationResponseMessage() {
super(PacketImpl.REPLICATION_RESPONSE); super(PacketImpl.REPLICATION_RESPONSE);
} }
public ReplicationResponseMessage(byte replicationResponseV2) {
super(replicationResponseV2);
}
} }

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage {
boolean synchronizationIsFinishedAcknowledgement = false;
public ReplicationResponseMessageV2(final boolean synchronizationIsFinishedAcknowledgement) {
super(REPLICATION_RESPONSE_V2);
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
}
public ReplicationResponseMessageV2() {
super(PacketImpl.REPLICATION_RESPONSE_V2);
}
public boolean isSynchronizationIsFinishedAcknowledgement() {
return synchronizationIsFinishedAcknowledgement;
}
public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeBoolean(synchronizationIsFinishedAcknowledgement);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
synchronizationIsFinishedAcknowledgement = buffer.readBoolean();
}
@Override
public String toString() {
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", synchronizationIsFinishedAcknowledgement=" + synchronizationIsFinishedAcknowledgement);
buf.append("]");
return buf.toString();
}
}

View File

@ -203,4 +203,16 @@ public class ReplicationStartSyncMessage extends PacketImpl {
return false; return false;
return true; return true;
} }
@Override
public String toString() {
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", synchronizationIsFinished=" + synchronizationIsFinished);
buf.append(", dataType=" + dataType);
buf.append(", nodeID=" + nodeID);
buf.append(", ids=" + Arrays.toString(ids));
buf.append(", allowsAutoFailBack=" + allowsAutoFailBack);
buf.append("]");
return buf.toString();
}
} }

View File

@ -68,6 +68,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage.SyncDataType; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage.SyncDataType;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
@ -196,7 +197,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
handleLargeMessageEnd((ReplicationLargeMessageEndMessage) packet); handleLargeMessageEnd((ReplicationLargeMessageEndMessage) packet);
} }
else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC) { else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC) {
handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet); response = handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet);
} }
else if (type == PacketImpl.REPLICATION_SYNC_FILE) { else if (type == PacketImpl.REPLICATION_SYNC_FILE) {
handleReplicationSynchronization((ReplicationSyncFileMessage) packet); handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
@ -476,19 +477,23 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* *
* @param packet * @param packet
* @throws Exception * @throws Exception
* @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise
* return an empty response
*/ */
private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception { private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception {
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (activation.isRemoteBackupUpToDate()) { if (activation.isRemoteBackupUpToDate()) {
throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate(); throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate();
} }
synchronized (this) { synchronized (this) {
if (!started) if (!started)
return; return replicationResponseMessage;
if (packet.isSynchronizationFinished()) { if (packet.isSynchronizationFinished()) {
finishSynchronization(packet.getNodeID()); finishSynchronization(packet.getNodeID());
return; replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
return replicationResponseMessage;
} }
switch (packet.getDataType()) { switch (packet.getDataType()) {
@ -523,6 +528,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType(); throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
} }
} }
return replicationResponseMessage;
} }
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) { private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) {

View File

@ -58,12 +58,15 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
/** /**
* Manages replication tasks on the live server (that is the live server side of a "remote backup" * Manages replication tasks on the live server (that is the live server side of a "remote backup"
@ -116,6 +119,8 @@ public final class ReplicationManager implements ActiveMQComponent {
private volatile boolean inSync = true; private volatile boolean inSync = true;
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
/** /**
* @param remotingConnection * @param remotingConnection
*/ */
@ -392,8 +397,14 @@ public final class ReplicationManager implements ActiveMQComponent {
private final class ResponseHandler implements ChannelHandler { private final class ResponseHandler implements ChannelHandler {
public void handlePacket(final Packet packet) { public void handlePacket(final Packet packet) {
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE) { if (packet.getType() == PacketImpl.REPLICATION_RESPONSE || packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
replicated(); replicated();
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
ReplicationResponseMessageV2 replicationResponseMessage = (ReplicationResponseMessageV2) packet;
if (replicationResponseMessage.isSynchronizationIsFinishedAcknowledgement()) {
synchronizationIsFinishedAcknowledgement.countDown();
}
}
} }
} }
@ -534,9 +545,18 @@ public final class ReplicationManager implements ActiveMQComponent {
* *
* @param nodeID * @param nodeID
*/ */
public void sendSynchronizationDone(String nodeID) { public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) {
if (enabled) { if (enabled) {
synchronizationIsFinishedAcknowledgement.countUp();
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID)); sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
try {
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
}
}
catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.debug(e);
}
inSync = false; inSync = false;
} }
} }

View File

@ -359,4 +359,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 119113, value = "Invalid message load balancing type {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 119113, value = "Invalid message load balancing type {0}", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidMessageLoadBalancingType(String val); IllegalArgumentException invalidMessageLoadBalancingType(String val);
@Message(id = 119114, value = "Replication synchronization process timed out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT)
IllegalStateException replicationSynchronizationTimeout(long timeout);
} }

View File

@ -51,9 +51,11 @@ public abstract class NodeManager implements ActiveMQComponent {
public abstract void awaitLiveNode() throws Exception; public abstract void awaitLiveNode() throws Exception;
public abstract void awaitLiveStatus() throws Exception;
public abstract void startBackup() throws Exception; public abstract void startBackup() throws Exception;
public abstract void startLiveNode() throws Exception; public abstract ActivateCallback startLiveNode() throws Exception;
public abstract void pauseLiveServer() throws Exception; public abstract void pauseLiveServer() throws Exception;

View File

@ -36,7 +36,7 @@ public class ReplicaPolicy extends BackupPolicy {
//used if we create a replicated policy for when we become live. //used if we create a replicated policy for when we become live.
private boolean allowFailback = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); private boolean allowFailback = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
private ReplicatedPolicy replicatedPolicy; private ReplicatedPolicy replicatedPolicy;
@ -48,14 +48,14 @@ public class ReplicaPolicy extends BackupPolicy {
String groupName, String groupName,
boolean restartBackup, boolean restartBackup,
boolean allowFailback, boolean allowFailback,
long failbackDelay, long initialReplicationSyncTimeout,
ScaleDownPolicy scaleDownPolicy) { ScaleDownPolicy scaleDownPolicy) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName; this.groupName = groupName;
this.restartBackup = restartBackup; this.restartBackup = restartBackup;
this.allowFailback = allowFailback; this.allowFailback = allowFailback;
this.failbackDelay = failbackDelay; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.scaleDownPolicy = scaleDownPolicy; this.scaleDownPolicy = scaleDownPolicy;
} }
@ -87,7 +87,7 @@ public class ReplicaPolicy extends BackupPolicy {
public ReplicatedPolicy getReplicatedPolicy() { public ReplicatedPolicy getReplicatedPolicy() {
if (replicatedPolicy == null) { if (replicatedPolicy == null) {
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, failbackDelay, groupName, clusterName, this); replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this);
} }
return replicatedPolicy; return replicatedPolicy;
} }
@ -137,12 +137,21 @@ public class ReplicaPolicy extends BackupPolicy {
this.allowFailback = allowFailback; this.allowFailback = allowFailback;
} }
@Deprecated
public long getFailbackDelay() { public long getFailbackDelay() {
return failbackDelay; return -1;
} }
@Deprecated
public void setFailbackDelay(long failbackDelay) { public void setFailbackDelay(long failbackDelay) {
this.failbackDelay = failbackDelay; }
public long getInitialReplicationSyncTimeout() {
return initialReplicationSyncTimeout;
}
public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
} }
@Override @Override

View File

@ -31,14 +31,14 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
private String clusterName; private String clusterName;
private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
/* /*
* these are only set by the ReplicaPolicy after failover to decide if the live server can failback, these should not * these are only set by the ReplicaPolicy after failover to decide if the live server can failback, these should not
* be exposed in configuration. * be exposed in configuration.
* */ * */
private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
/* /*
* this are only used as the policy when the server is started as a live after a failover * this are only used as the policy when the server is started as a live after a failover
* */ * */
@ -48,10 +48,11 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this); replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this);
} }
public ReplicatedPolicy(boolean checkForLiveServer, String groupName, String clusterName) { public ReplicatedPolicy(boolean checkForLiveServer, String groupName, String clusterName, long initialReplicationSyncTimeout) {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName; this.groupName = groupName;
this.clusterName = clusterName; this.clusterName = clusterName;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
/* /*
* we create this with sensible defaults in case we start after a failover * we create this with sensible defaults in case we start after a failover
* */ * */
@ -59,7 +60,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public ReplicatedPolicy(boolean checkForLiveServer, public ReplicatedPolicy(boolean checkForLiveServer,
boolean allowAutoFailBack, boolean allowAutoFailBack,
long failbackDelay, long initialReplicationSyncTimeout,
String groupName, String groupName,
String clusterName, String clusterName,
ReplicaPolicy replicaPolicy) { ReplicaPolicy replicaPolicy) {
@ -67,7 +68,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
this.clusterName = clusterName; this.clusterName = clusterName;
this.groupName = groupName; this.groupName = groupName;
this.allowAutoFailBack = allowAutoFailBack; this.allowAutoFailBack = allowAutoFailBack;
this.failbackDelay = failbackDelay; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.replicaPolicy = replicaPolicy; this.replicaPolicy = replicaPolicy;
} }
@ -83,12 +84,21 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
return allowAutoFailBack; return allowAutoFailBack;
} }
@Deprecated
public long getFailbackDelay() { public long getFailbackDelay() {
return failbackDelay; return -1;
} }
@Deprecated
public void setFailbackDelay(long failbackDelay) { public void setFailbackDelay(long failbackDelay) {
this.failbackDelay = failbackDelay; }
public long getInitialReplicationSyncTimeout() {
return initialReplicationSyncTimeout;
}
public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
} }
public String getClusterName() { public String getClusterName() {

View File

@ -25,8 +25,6 @@ import java.util.Map;
public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> { public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
private SharedStoreSlavePolicy sharedStoreSlavePolicy; private SharedStoreSlavePolicy sharedStoreSlavePolicy;
@ -34,17 +32,17 @@ public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
public SharedStoreMasterPolicy() { public SharedStoreMasterPolicy() {
} }
public SharedStoreMasterPolicy(long failbackDelay, boolean failoverOnServerShutdown) { public SharedStoreMasterPolicy(boolean failoverOnServerShutdown) {
this.failbackDelay = failbackDelay;
this.failoverOnServerShutdown = failoverOnServerShutdown; this.failoverOnServerShutdown = failoverOnServerShutdown;
} }
@Deprecated
public long getFailbackDelay() { public long getFailbackDelay() {
return failbackDelay; return -1;
} }
@Deprecated
public void setFailbackDelay(long failbackDelay) { public void setFailbackDelay(long failbackDelay) {
this.failbackDelay = failbackDelay;
} }
public boolean isFailoverOnServerShutdown() { public boolean isFailoverOnServerShutdown() {

View File

@ -25,8 +25,6 @@ import java.util.Map;
public class SharedStoreSlavePolicy extends BackupPolicy { public class SharedStoreSlavePolicy extends BackupPolicy {
private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay();
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
@ -37,24 +35,23 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
public SharedStoreSlavePolicy() { public SharedStoreSlavePolicy() {
} }
public SharedStoreSlavePolicy(long failbackDelay, public SharedStoreSlavePolicy(boolean failoverOnServerShutdown,
boolean failoverOnServerShutdown,
boolean restartBackup, boolean restartBackup,
boolean allowAutoFailBack, boolean allowAutoFailBack,
ScaleDownPolicy scaleDownPolicy) { ScaleDownPolicy scaleDownPolicy) {
this.failbackDelay = failbackDelay;
this.failoverOnServerShutdown = failoverOnServerShutdown; this.failoverOnServerShutdown = failoverOnServerShutdown;
this.restartBackup = restartBackup; this.restartBackup = restartBackup;
this.allowAutoFailBack = allowAutoFailBack; this.allowAutoFailBack = allowAutoFailBack;
this.scaleDownPolicy = scaleDownPolicy; this.scaleDownPolicy = scaleDownPolicy;
} }
@Deprecated
public long getFailbackDelay() { public long getFailbackDelay() {
return failbackDelay; return -1;
} }
@Deprecated
public void setFailbackDelay(long failbackDelay) { public void setFailbackDelay(long failbackDelay) {
this.failbackDelay = failbackDelay;
} }
public boolean isFailoverOnServerShutdown() { public boolean isFailoverOnServerShutdown() {
@ -67,7 +64,7 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
public SharedStoreMasterPolicy getSharedStoreMasterPolicy() { public SharedStoreMasterPolicy getSharedStoreMasterPolicy() {
if (sharedStoreMasterPolicy == null) { if (sharedStoreMasterPolicy == null) {
sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failbackDelay, failoverOnServerShutdown); sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown);
} }
return sharedStoreMasterPolicy; return sharedStoreMasterPolicy;
} }

View File

@ -23,6 +23,7 @@ import java.nio.channels.FileLock;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
@ -153,7 +154,7 @@ public class FileLockNodeManager extends NodeManager {
} }
@Override @Override
public void startLiveNode() throws Exception { public ActivateCallback startLiveNode() throws Exception {
setFailingBack(); setFailingBack();
String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds"; String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds";
@ -164,7 +165,29 @@ public class FileLockNodeManager extends NodeManager {
ActiveMQServerLogger.LOGGER.obtainedLiveLock(); ActiveMQServerLogger.LOGGER.obtainedLiveLock();
setLive(); return new ActivateCallback() {
@Override
public void preActivate() {
}
@Override
public void activated() {
}
@Override
public void deActivate() {
}
@Override
public void activationComplete() {
try {
setLive();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
} }
@Override @Override
@ -183,6 +206,13 @@ public class FileLockNodeManager extends NodeManager {
} }
} }
@Override
public void awaitLiveStatus() throws Exception {
while (getState() != LIVE) {
Thread.sleep(2000);
}
}
private void setLive() throws Exception { private void setLive() throws Exception {
writeFileLockStatus(FileLockNodeManager.LIVE); writeFileLockStatus(FileLockNodeManager.LIVE);
} }

View File

@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -90,16 +91,45 @@ public final class InVMNodeManager extends NodeManager {
} }
} }
@Override
public void awaitLiveStatus() throws Exception {
while (state != LIVE) {
Thread.sleep(10);
}
}
@Override @Override
public void startBackup() throws Exception { public void startBackup() throws Exception {
backupLock.acquire(); backupLock.acquire();
} }
@Override @Override
public void startLiveNode() throws Exception { public ActivateCallback startLiveNode() throws Exception {
state = FAILING_BACK; state = FAILING_BACK;
liveLock.acquire(); liveLock.acquire();
state = LIVE; return new ActivateCallback() {
@Override
public void preActivate() {
}
@Override
public void activated() {
}
@Override
public void deActivate() {
}
@Override
public void activationComplete() {
try {
state = LIVE;
}
catch (Exception e) {
e.printStackTrace();
}
}
};
} }
@Override @Override
@ -110,8 +140,6 @@ public final class InVMNodeManager extends NodeManager {
@Override @Override
public void crashLiveServer() throws Exception { public void crashLiveServer() throws Exception {
//overkill as already set to live
state = LIVE;
liveLock.release(); liveLock.release();
} }

View File

@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
public void run() { public void run() {
try { try {
activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), activeMQServer.getNodeID().toString(), isFailBackRequest && replicatedPolicy.isAllowAutoFailBack()); activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), activeMQServer.getNodeID().toString(), isFailBackRequest && replicatedPolicy.isAllowAutoFailBack(), replicatedPolicy.getInitialReplicationSyncTimeout());
clusterConnection.nodeAnnounced(System.currentTimeMillis(), activeMQServer.getNodeID().toString(), replicatedPolicy.getGroupName(), replicatedPolicy.getScaleDownGroupName(), pair, true); clusterConnection.nodeAnnounced(System.currentTimeMillis(), activeMQServer.getNodeID().toString(), replicatedPolicy.getGroupName(), replicatedPolicy.getScaleDownGroupName(), pair, true);
@ -168,13 +168,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
BackupTopologyListener listener1 = new BackupTopologyListener(activeMQServer.getNodeID().toString()); BackupTopologyListener listener1 = new BackupTopologyListener(activeMQServer.getNodeID().toString());
clusterConnection.addClusterTopologyListener(listener1); clusterConnection.addClusterTopologyListener(listener1);
if (listener1.waitForBackup()) { if (listener1.waitForBackup()) {
try { //if we have to many backups kept or are not configured to restart just stop, otherwise restart as a backup
Thread.sleep(replicatedPolicy.getFailbackDelay());
}
catch (InterruptedException e) {
//
}
//if we have to many backups kept or arent configured to restart just stop, otherwise restart as a backup
if (!replicatedPolicy.getReplicaPolicy().isRestartBackup() && activeMQServer.countNumberOfCopiedJournals() >= replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() && replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() >= 0) { if (!replicatedPolicy.getReplicaPolicy().isRestartBackup() && activeMQServer.countNumberOfCopiedJournals() >= replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() && replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() >= 0) {
activeMQServer.stop(true); activeMQServer.stop(true);
ActiveMQServerLogger.LOGGER.stopReplicatedBackupAfterFailback(); ActiveMQServerLogger.LOGGER.stopReplicatedBackupAfterFailback();

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
@ -30,9 +33,6 @@ import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy
import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
public final class SharedStoreBackupActivation extends Activation { public final class SharedStoreBackupActivation extends Activation {
//this is how we act as a backup //this is how we act as a backup
@ -191,38 +191,51 @@ public final class SharedStoreBackupActivation extends Activation {
} }
private class FailbackChecker implements Runnable { private class FailbackChecker implements Runnable {
BackupTopologyListener backupListener;
FailbackChecker() {
backupListener = new BackupTopologyListener(activeMQServer.getNodeID().toString());
activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener);
}
private boolean restarting = false; private boolean restarting = false;
public void run() { public void run() {
try { try {
if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) { if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) {
ActiveMQServerLogger.LOGGER.awaitFailBack(); if (backupListener.waitForBackup()) {
restarting = true; ActiveMQServerLogger.LOGGER.awaitFailBack();
Thread t = new Thread(new Runnable() { restarting = true;
public void run() { Thread t = new Thread(new Runnable() {
try { public void run() {
ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Stopping live node in favor of failback"); try {
ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Stopping live node in favor of failback");
activeMQServer.stop(true, false, true); NodeManager nodeManager = activeMQServer.getNodeManager();
// We need to wait some time before we start the backup again activeMQServer.stop(true, false, true);
// otherwise we may eventually start before the live had a chance to get it
Thread.sleep(sharedStoreSlavePolicy.getFailbackDelay());
synchronized (failbackCheckerGuard) {
if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
return;
activeMQServer.setHAPolicy(sharedStoreSlavePolicy); // ensure that the server to which we are failing back actually starts fully before we restart
ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Starting backup node now after failback"); nodeManager.start();
activeMQServer.start(); nodeManager.awaitLiveStatus();
nodeManager.stop();
synchronized (failbackCheckerGuard) {
if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
return;
activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Starting backup node now after failback");
activeMQServer.start();
}
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.serverRestartWarning();
e.printStackTrace();
} }
} }
catch (Exception e) { });
ActiveMQServerLogger.LOGGER.serverRestartWarning(); t.start();
} }
}
});
t.start();
} }
} }
catch (Exception e) { catch (Exception e) {

View File

@ -55,10 +55,9 @@ public final class SharedStoreLiveActivation extends LiveActivation {
} }
activeMQServer.getBackupManager().start(); activeMQServer.getBackupManager().start();
activeMQServer.getBackupManager().announceBackup(); activeMQServer.getBackupManager().announceBackup();
Thread.sleep(sharedStoreMasterPolicy.getFailbackDelay());
} }
activeMQServer.getNodeManager().startLiveNode(); activeMQServer.registerActivateCallback(activeMQServer.getNodeManager().startLiveNode());
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) { if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
return; return;

View File

@ -1624,6 +1624,14 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The amount of time to wait for the replica to acknowledge it has received all the necessary data from
the replicating server at the final step of the initial replication synchronization process.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
</xsd:complexType> </xsd:complexType>
<xsd:complexType name="replicaPolicyType"> <xsd:complexType name="replicaPolicyType">
@ -1681,7 +1689,16 @@
<xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0"> <xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
if we have to start as a replicated server this is the delay to wait before fail-back occurs DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back occurs
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
If we have to start as a replicated server this is the amount of time to wait for the replica to
acknowledge it has received all the necessary data from the replicating server at the final step
of the initial replication synchronization process.
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
@ -1736,7 +1753,7 @@
<xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0"> <xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
delay to wait before fail-back occurs on (live's) restart DEPRECATED: delay to wait before fail-back occurs on (live's) restart
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
@ -1764,7 +1781,7 @@
<xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0"> <xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
delay to wait before fail-back occurs on (live's) restart DEPRECATED: delay to wait before fail-back occurs on (live's) restart
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>

View File

@ -121,6 +121,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertEquals(replicatedPolicy.getGroupName(), "purple"); assertEquals(replicatedPolicy.getGroupName(), "purple");
assertTrue(replicatedPolicy.isCheckForLiveServer()); assertTrue(replicatedPolicy.isCheckForLiveServer());
assertEquals(replicatedPolicy.getClusterName(), "abcdefg"); assertEquals(replicatedPolicy.getClusterName(), "abcdefg");
assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876);
} }
finally { finally {
server.stop(); server.stop();
@ -142,6 +143,8 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertEquals(replicaPolicy.getMaxSavedReplicatedJournalsSize(), 22); assertEquals(replicaPolicy.getMaxSavedReplicatedJournalsSize(), 22);
assertEquals(replicaPolicy.getClusterName(), "33rrrrr"); assertEquals(replicaPolicy.getClusterName(), "33rrrrr");
assertFalse(replicaPolicy.isRestartBackup()); assertFalse(replicaPolicy.isRestartBackup());
assertTrue(replicaPolicy.isAllowFailback());
assertEquals(replicaPolicy.getInitialReplicationSyncTimeout(), 9876);
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy(); ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
assertNotNull(scaleDownPolicy); assertNotNull(scaleDownPolicy);
assertEquals(scaleDownPolicy.getGroupName(), "boo!"); assertEquals(scaleDownPolicy.getGroupName(), "boo!");
@ -219,7 +222,6 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
HAPolicy haPolicy = server.getHAPolicy(); HAPolicy haPolicy = server.getHAPolicy();
assertTrue(haPolicy instanceof SharedStoreMasterPolicy); assertTrue(haPolicy instanceof SharedStoreMasterPolicy);
SharedStoreMasterPolicy masterPolicy = (SharedStoreMasterPolicy) haPolicy; SharedStoreMasterPolicy masterPolicy = (SharedStoreMasterPolicy) haPolicy;
assertEquals(masterPolicy.getFailbackDelay(), 3456);
assertFalse(masterPolicy.isFailoverOnServerShutdown()); assertFalse(masterPolicy.isFailoverOnServerShutdown());
} }
finally { finally {
@ -237,11 +239,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertTrue(activation instanceof SharedStoreBackupActivation); assertTrue(activation instanceof SharedStoreBackupActivation);
HAPolicy haPolicy = server.getHAPolicy(); HAPolicy haPolicy = server.getHAPolicy();
assertTrue(haPolicy instanceof SharedStoreSlavePolicy); assertTrue(haPolicy instanceof SharedStoreSlavePolicy);
SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy; SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy;
assertEquals(replicaPolicy.getFailbackDelay(), 9876); assertFalse(sharedStoreSlavePolicy.isFailoverOnServerShutdown());
assertFalse(replicaPolicy.isFailoverOnServerShutdown()); assertFalse(sharedStoreSlavePolicy.isRestartBackup());
assertFalse(replicaPolicy.isRestartBackup()); ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
assertNotNull(scaleDownPolicy); assertNotNull(scaleDownPolicy);
assertEquals(scaleDownPolicy.getGroupName(), "boo!"); assertEquals(scaleDownPolicy.getGroupName(), "boo!");
assertEquals(scaleDownPolicy.getDiscoveryGroup(), "wahey"); assertEquals(scaleDownPolicy.getDiscoveryGroup(), "wahey");
@ -264,11 +265,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertTrue(activation instanceof SharedStoreBackupActivation); assertTrue(activation instanceof SharedStoreBackupActivation);
HAPolicy haPolicy = server.getHAPolicy(); HAPolicy haPolicy = server.getHAPolicy();
assertTrue(haPolicy instanceof SharedStoreSlavePolicy); assertTrue(haPolicy instanceof SharedStoreSlavePolicy);
SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy; SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy;
assertEquals(replicaPolicy.getFailbackDelay(), 5678); assertTrue(sharedStoreSlavePolicy.isFailoverOnServerShutdown());
assertTrue(replicaPolicy.isFailoverOnServerShutdown()); assertTrue(sharedStoreSlavePolicy.isRestartBackup());
assertTrue(replicaPolicy.isRestartBackup()); ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
assertNotNull(scaleDownPolicy); assertNotNull(scaleDownPolicy);
assertEquals(scaleDownPolicy.getGroupName(), "boo!"); assertEquals(scaleDownPolicy.getGroupName(), "boo!");
assertEquals(scaleDownPolicy.getDiscoveryGroup(), null); assertEquals(scaleDownPolicy.getDiscoveryGroup(), null);
@ -293,11 +293,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertTrue(activation instanceof SharedStoreBackupActivation); assertTrue(activation instanceof SharedStoreBackupActivation);
HAPolicy haPolicy = server.getHAPolicy(); HAPolicy haPolicy = server.getHAPolicy();
assertTrue(haPolicy instanceof SharedStoreSlavePolicy); assertTrue(haPolicy instanceof SharedStoreSlavePolicy);
SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy; SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy;
assertEquals(replicaPolicy.getFailbackDelay(), 5678); assertTrue(sharedStoreSlavePolicy.isFailoverOnServerShutdown());
assertTrue(replicaPolicy.isFailoverOnServerShutdown()); assertTrue(sharedStoreSlavePolicy.isRestartBackup());
assertTrue(replicaPolicy.isRestartBackup()); ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy();
assertNull(scaleDownPolicy); assertNull(scaleDownPolicy);
} }
finally { finally {
@ -349,10 +348,8 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
assertNotNull(livePolicy); assertNotNull(livePolicy);
assertFalse(livePolicy.isFailoverOnServerShutdown()); assertFalse(livePolicy.isFailoverOnServerShutdown());
assertEquals(livePolicy.getFailbackDelay(), 1234);
SharedStoreSlavePolicy backupPolicy = (SharedStoreSlavePolicy) colocatedPolicy.getBackupPolicy(); SharedStoreSlavePolicy backupPolicy = (SharedStoreSlavePolicy) colocatedPolicy.getBackupPolicy();
assertNotNull(backupPolicy); assertNotNull(backupPolicy);
assertEquals(backupPolicy.getFailbackDelay(), 44);
assertFalse(backupPolicy.isFailoverOnServerShutdown()); assertFalse(backupPolicy.isFailoverOnServerShutdown());
assertFalse(backupPolicy.isRestartBackup()); assertFalse(backupPolicy.isRestartBackup());
} }

View File

@ -28,19 +28,15 @@
<request-backup>false</request-backup> <request-backup>false</request-backup>
<backup-port-offset>33</backup-port-offset> <backup-port-offset>33</backup-port-offset>
<master> <master>
<failback-delay>1234</failback-delay>
<failover-on-shutdown>false</failover-on-shutdown> <failover-on-shutdown>false</failover-on-shutdown>
</master> </master>
<slave> <slave>
<failback-delay>44</failback-delay>
<failover-on-shutdown>false</failover-on-shutdown> <failover-on-shutdown>false</failover-on-shutdown>
<restart-backup>false</restart-backup> <restart-backup>false</restart-backup>
<scale-down/> <scale-down/>
</slave> </slave>
</colocated> </colocated>
</shared-store> </shared-store>
</ha-policy> </ha-policy>
</core>c </core>
</configuration> </configuration>

View File

@ -30,7 +30,7 @@
<cluster-name>33rrrrr</cluster-name> <cluster-name>33rrrrr</cluster-name>
<restart-backup>false</restart-backup> <restart-backup>false</restart-backup>
<allow-failback>true</allow-failback> <allow-failback>true</allow-failback>
<failback-delay>9876</failback-delay> <initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
<scale-down> <scale-down>
<!--a grouping of servers that can be scaled down to--> <!--a grouping of servers that can be scaled down to-->
<group-name>boo!</group-name> <group-name>boo!</group-name>

View File

@ -26,6 +26,7 @@
<group-name>purple</group-name> <group-name>purple</group-name>
<check-for-live-server>true</check-for-live-server> <check-for-live-server>true</check-for-live-server>
<cluster-name>abcdefg</cluster-name> <cluster-name>abcdefg</cluster-name>
<initial-replication-sync-timeout>9876</initial-replication-sync-timeout>
</master> </master>
</replication> </replication>
</ha-policy> </ha-policy>

View File

@ -22,11 +22,9 @@
<ha-policy> <ha-policy>
<shared-store> <shared-store>
<master> <master>
<failback-delay>3456</failback-delay>
<failover-on-shutdown>false</failover-on-shutdown> <failover-on-shutdown>false</failover-on-shutdown>
</master> </master>
</shared-store> </shared-store>
</ha-policy> </ha-policy>
</core> </core>
</configuration> </configuration>

View File

@ -26,7 +26,6 @@
<shared-store> <shared-store>
<slave> <slave>
<allow-failback>true</allow-failback> <allow-failback>true</allow-failback>
<failback-delay>9876</failback-delay>
<failover-on-shutdown>false</failover-on-shutdown> <failover-on-shutdown>false</failover-on-shutdown>
<restart-backup>false</restart-backup> <restart-backup>false</restart-backup>
<scale-down> <scale-down>

View File

@ -22,7 +22,6 @@
<ha-policy> <ha-policy>
<shared-store> <shared-store>
<slave> <slave>
<failback-delay>5678</failback-delay>
<failover-on-shutdown>true</failover-on-shutdown> <failover-on-shutdown>true</failover-on-shutdown>
<restart-backup>true</restart-backup> <restart-backup>true</restart-backup>
<scale-down> <scale-down>
@ -38,5 +37,4 @@
</shared-store> </shared-store>
</ha-policy> </ha-policy>
</core> </core>
</configuration> </configuration>

View File

@ -22,12 +22,10 @@
<ha-policy> <ha-policy>
<shared-store> <shared-store>
<slave> <slave>
<failback-delay>5678</failback-delay>
<failover-on-shutdown>true</failover-on-shutdown> <failover-on-shutdown>true</failover-on-shutdown>
<restart-backup>true</restart-backup> <restart-backup>true</restart-backup>
</slave> </slave>
</shared-store> </shared-store>
</ha-policy> </ha-policy>
</core> </core>
</configuration> </configuration>

View File

@ -113,8 +113,13 @@ the connection speed.
> **Note** > **Note**
> >
> Synchronization occurs in parallel with current network traffic so > In general, synchronization occurs in parallel with current network traffic so
> this won't cause any blocking on current clients. > this won't cause any blocking on current clients. However, there is a critical
> moment at the end of this process where the replicating server must complete
> the synchronization and ensure the replica acknowledges this completion. This
> exchange between the replicating server and replica will block any journal
> related operations. The maximum length of time that this exchange will block
> is controlled by the `initial-replication-sync-timeout` configuration element.
Replication will create a copy of the data at the backup. One issue to Replication will create a copy of the data at the backup. One issue to
be aware of is: in case of a successful fail-over, the backup's data be aware of is: in case of a successful fail-over, the backup's data
@ -257,11 +262,14 @@ HA strategy Replication for `master`:
</tr> </tr>
<tr> <tr>
<td>`group-name`</td> <td>`group-name`</td>
<td>Whether to check the cluster for a (live) server using our own server ID when starting up. This option is only necessary for performing 'fail-back' on replicating servers.</td> <td>If set, backup servers will only pair with live servers with matching group-name.</td>
</tr> </tr>
<tr> <tr>
<td>`check-for-live-server`</td> <td>`initial-replication-sync-timeout`</td>
<td>If set, backup servers will only pair with live servers with matching group-name.</td> <td>The amount of time the replicating server will wait at the completion of the initial
replication process for the replica to acknowledge it has received all the necessary
data. The default is 30,000 milliseconds. <strong>Note</strong>: during this interval any
journal related operations will be blocked.</td>
</tr> </tr>
</tbody> </tbody>
</table> </table>
@ -309,8 +317,14 @@ HA strategy Replication for `slave`:
failed over</td> failed over</td>
</tr> </tr>
<tr> <tr>
<td>`failback-delay`</td> <td>`initial-replication-sync-timeout`</td>
<td>delay to wait before fail-back occurs on (failed over live's) restart</td> <td>After failover and the slave has become live, this is
set on the new live server. It represents the amount of time
the replicating server will wait at the completion of the
initial replication process for the replica to acknowledge
it has received all the necessary data. The default is
30,000 milliseconds. <strong>Note</strong>: during this interval any
journal related operations will be blocked.</td>
</tr> </tr>
</tbody> </tbody>
</table> </table>
@ -405,16 +419,10 @@ stop. This configuration would look like:
<shared-store> <shared-store>
<slave> <slave>
<allow-failback>true</allow-failback> <allow-failback>true</allow-failback>
<failback-delay>5000</failback-delay>
</slave> </slave>
</shared-store> </shared-store>
</ha-policy> </ha-policy>
The `failback-delay` configures how long the backup must wait after
automatically stopping before it restarts. This is to gives the live
server time to start and obtain its lock.
In replication HA mode you need to set an extra property In replication HA mode you need to set an extra property
`check-for-live-server` to `true` in the `master` configuration. If set `check-for-live-server` to `true` in the `master` configuration. If set
to true, during start-up a live server will first search the cluster for to true, during start-up a live server will first search the cluster for
@ -491,13 +499,6 @@ HA strategy shared store for `master`:
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
<tr>
<td>`failback-delay`</td>
<td>If a backup server is detected as being live,
via the lock file, then the live server will wait
announce itself as a backup and wait this amount
of time (in ms) before starting as a live</td>
</tr>
<tr> <tr>
<td>`failover-on-server-shutdown`</td> <td>`failover-on-server-shutdown`</td>
<td>If set to true then when this server is stopped <td>If set to true then when this server is stopped
@ -512,7 +513,7 @@ HA strategy shared store for `master`:
The following table lists all the `ha-policy` configuration elements for The following table lists all the `ha-policy` configuration elements for
HA strategy Shared Store for `slave`: HA strategy Shared Store for `slave`:
<table summary="HA Replication Slave Policy" border="1"> <table summary="HA Shared Store Slave Policy" border="1">
<colgroup> <colgroup>
<col/> <col/>
<col/> <col/>
@ -539,17 +540,6 @@ HA strategy Shared Store for `slave`:
places a request to take over its place. The use case is places a request to take over its place. The use case is
when the backup has failed over.</td> when the backup has failed over.</td>
</tr> </tr>
<tr>
<td>`failback-delay`</td>
<td>After failover and the slave has become live, this is
set on the new live server. When starting If a backup server
is detected as being live, via the lock file, then the live
server will wait announce itself as a backup and wait this
amount of time (in ms) before starting as a live, however
this is unlikely since this backup has just stopped anyway.
It is also used as the delay after failback before this backup
will restart (if `allow-failback` is set to true.</td>
</tr>
</tbody> </tbody>
</table> </table>

View File

@ -49,7 +49,7 @@ public class NodeManagerAction {
for (int action : work) { for (int action : work) {
switch (action) { switch (action) {
case START_LIVE: case START_LIVE:
nodeManager.startLiveNode(); nodeManager.startLiveNode().activationComplete();
hasLiveLock = true; hasLiveLock = true;
hasBackupLock = false; hasBackupLock = false;
break; break;

View File

@ -249,11 +249,11 @@ public class FailBackAutoTest extends FailoverTestBase {
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setRestartBackup(true)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setRestartBackup(true)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
backupServer = createTestableServer(backupConfig); backupServer = createTestableServer(backupConfig);
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(100)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector); liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector);
liveServer = createTestableServer(liveConfig); liveServer = createTestableServer(liveConfig);
} }

View File

@ -187,11 +187,11 @@ public class FailoverListenerTest extends FailoverTestBase {
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
backupServer = createTestableServer(backupConfig); backupServer = createTestableServer(backupConfig);
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector); liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector);
liveServer = createTestableServer(liveConfig); liveServer = createTestableServer(liveConfig);
} }

View File

@ -159,11 +159,11 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
backupServer = createTestableServer(backupConfig); backupServer = createTestableServer(backupConfig);
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
liveServer = createTestableServer(liveConfig); liveServer = createTestableServer(liveConfig);
} }
@ -191,7 +191,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
} }
protected void setupHAPolicyConfiguration() { protected void setupHAPolicyConfiguration() {
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(0).setAllowFailBack(true).setFailbackDelay(5000); ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(0).setAllowFailBack(true);
} }
protected final void adaptLiveConfigForReplicatedFailBack(TestableServer server) { protected final void adaptLiveConfigForReplicatedFailBack(TestableServer server) {

View File

@ -54,11 +54,11 @@ public class LiveToLiveFailoverTest extends FailoverTest {
TransportConfiguration liveConnector0 = getConnectorTransportConfiguration(true, 0); TransportConfiguration liveConnector0 = getConnectorTransportConfiguration(true, 0);
TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(true, 1); TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(true, 1);
backupConfig = super.createDefaultInVMConfig(1).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 1)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration().addConnector(liveConnector1.getName())))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector1.getName(), liveConnector0.getName())); backupConfig = super.createDefaultInVMConfig(1).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 1)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration().addConnector(liveConnector1.getName())))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector1.getName(), liveConnector0.getName()));
backupServer = createColocatedTestableServer(backupConfig, nodeManager1, nodeManager0, 1); backupServer = createColocatedTestableServer(backupConfig, nodeManager1, nodeManager0, 1);
liveConfig = super.createDefaultInVMConfig(0).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 0)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setBackupRequestRetryInterval(1000).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration()))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector0.getName(), liveConnector1.getName())); liveConfig = super.createDefaultInVMConfig(0).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 0)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setBackupRequestRetryInterval(1000).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration()))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector0.getName(), liveConnector1.getName()));
liveServer = createColocatedTestableServer(liveConfig, nodeManager0, nodeManager1, 0); liveServer = createColocatedTestableServer(liveConfig, nodeManager0, nodeManager1, 0);
} }

View File

@ -82,7 +82,6 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase {
if (isSharedStore()) { if (isSharedStore()) {
haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration(); haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration();
((SharedStoreMasterPolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000);
} }
else { else {
haPolicyConfiguration = new ReplicatedPolicyConfiguration(); haPolicyConfiguration = new ReplicatedPolicyConfiguration();
@ -128,11 +127,9 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase {
if (isSharedStore()) { if (isSharedStore()) {
haPolicyConfiguration = new SharedStoreSlavePolicyConfiguration(); haPolicyConfiguration = new SharedStoreSlavePolicyConfiguration();
((SharedStoreSlavePolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000);
} }
else { else {
haPolicyConfiguration = new ReplicaPolicyConfiguration(); haPolicyConfiguration = new ReplicaPolicyConfiguration();
((ReplicaPolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000);
if (getNodeGroupName() != null) { if (getNodeGroupName() != null) {
((ReplicaPolicyConfiguration) haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i); ((ReplicaPolicyConfiguration) haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i);
} }

View File

@ -126,7 +126,7 @@ public class ReplicatedFailoverTest extends FailoverTest {
protected void setupHAPolicyConfiguration() { protected void setupHAPolicyConfiguration() {
if (isReplicatedFailbackTest) { if (isReplicatedFailbackTest) {
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true).setFailbackDelay(2000); ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true);
} }
else { else {
super.setupHAPolicyConfiguration(); super.setupHAPolicyConfiguration();

View File

@ -80,7 +80,7 @@ public class SecurityFailoverTest extends FailoverTest {
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setSecurityEnabled(true).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setSecurityEnabled(true).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
backupServer = createTestableServer(backupConfig); backupServer = createTestableServer(backupConfig);
ActiveMQSecurityManagerImpl securityManager = installSecurity(backupServer); ActiveMQSecurityManagerImpl securityManager = installSecurity(backupServer);

View File

@ -60,11 +60,11 @@ public class SharedStoreBackupTest extends FailoverTestBase {
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
System.out.println("backup config created - mnovak"); System.out.println("backup config created - mnovak");
backupConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false)).setRestartBackup(false)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false)).setRestartBackup(false)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
backupServer = createTestableServer(backupConfig); backupServer = createTestableServer(backupConfig);
liveConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000).setFailoverOnServerShutdown(true)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); liveConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
liveServer = createTestableServer(liveConfig); liveServer = createTestableServer(liveConfig);
} }

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.replication.ReplicationEndpoint; import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -171,7 +172,7 @@ public class BackupSyncDelay implements Interceptor {
receivedUpToDate = true; receivedUpToDate = true;
assert onHold == null; assert onHold == null;
onHold = packet; onHold = packet;
PacketImpl response = new ReplicationResponseMessage(); PacketImpl response = new ReplicationResponseMessageV2(true);
channel.send(response); channel.send(response);
return; return;
} }

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry; import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
@ -180,12 +181,33 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
public void awaitLiveNode() throws Exception { public void awaitLiveNode() throws Exception {
} }
@Override
public void awaitLiveStatus() throws Exception {
}
@Override @Override
public void startBackup() throws Exception { public void startBackup() throws Exception {
} }
@Override @Override
public void startLiveNode() throws Exception { public ActivateCallback startLiveNode() throws Exception {
return new ActivateCallback() {
@Override
public void preActivate() {
}
@Override
public void activated() {
}
@Override
public void deActivate() {
}
@Override
public void activationComplete() {
}
};
} }
@Override @Override