diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 21d97a31c4..6d8542a034 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -374,8 +374,8 @@ public final class ActiveMQDefaultConfiguration { // Whether a server will automatically stop when another places a request to take over its place. The use case is when a regular server stops and its backup takes over its duties, later the main server restarts and requests the server (the former backup) to stop operating. private static boolean DEFAULT_ALLOW_AUTO_FAILBACK = true; - // if we have to start as a replicated server this is the delay to wait before fail-back occurs - private static long DEFAULT_FAILBACK_DELAY = 5000; + // When a replica comes online this is how long the replicating server will wait for a confirmation from the replica that the replication synchronization process is complete + private static long DEFAULT_INITIAL_REPLICATION_SYNC_TIMEOUT = 30000; // Will this backup server come live on a normal server shutdown private static boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false; @@ -987,8 +987,8 @@ public final class ActiveMQDefaultConfiguration { /** * if we have to start as a replicated server this is the delay to wait before fail-back occurs */ - public static long getDefaultFailbackDelay() { - return DEFAULT_FAILBACK_DELAY; + public static long getDefaultInitialReplicationSyncTimeout() { + return DEFAULT_INITIAL_REPLICATION_SYNC_TIMEOUT; } /** diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 06e259bb6a..fffdec175c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -243,6 +243,8 @@ public class PacketImpl implements Packet { public static final byte SESS_BINDINGQUERY_RESP_V2 = -8; + public static final byte REPLICATION_RESPONSE_V2 = -9; + // Static -------------------------------------------------------- public PacketImpl(final byte type) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java index e2a0a44a62..f1bb89c441 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java @@ -64,19 +64,19 @@ public final class ConfigurationUtils { } case REPLICATED: { ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; - return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName()); + return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout()); } case REPLICA: { ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; - return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getFailbackDelay(), getScaleDownPolicy(pc.getScaleDownConfiguration())); + return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration())); } case SHARED_STORE_MASTER: { SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; - return new SharedStoreMasterPolicy(pc.getFailbackDelay(), pc.isFailoverOnServerShutdown()); + return new SharedStoreMasterPolicy(pc.isFailoverOnServerShutdown()); } case SHARED_STORE_SLAVE: { SharedStoreSlavePolicyConfiguration pc = (SharedStoreSlavePolicyConfiguration) conf; - return new SharedStoreSlavePolicy(pc.getFailbackDelay(), pc.isFailoverOnServerShutdown(), pc.isRestartBackup(), pc.isAllowFailBack(), getScaleDownPolicy(pc.getScaleDownConfiguration())); + return new SharedStoreSlavePolicy(pc.isFailoverOnServerShutdown(), pc.isRestartBackup(), pc.isAllowFailBack(), getScaleDownPolicy(pc.getScaleDownConfiguration())); } case COLOCATED: { ColocatedPolicyConfiguration pc = (ColocatedPolicyConfiguration) conf; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java index 9663d24458..17c83d44a2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java @@ -37,7 +37,7 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration { * */ private boolean allowFailBack = false; - private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); + private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); public ReplicaPolicyConfiguration() { } @@ -101,12 +101,22 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration { return this; } + @Deprecated public ReplicaPolicyConfiguration setFailbackDelay(long failbackDelay) { - this.failbackDelay = failbackDelay; return this; } + @Deprecated public long getFailbackDelay() { - return failbackDelay; + return -1; + } + + public long getInitialReplicationSyncTimeout() { + return initialReplicationSyncTimeout; + } + + public ReplicaPolicyConfiguration setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) { + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; + return this; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java index ce6244314d..3b84bb7121 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java @@ -27,6 +27,8 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration { private String clusterName = null; + private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); + public ReplicatedPolicyConfiguration() { } @@ -61,4 +63,12 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration { this.clusterName = clusterName; return this; } + + public long getInitialReplicationSyncTimeout() { + return initialReplicationSyncTimeout; + } + + public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) { + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/SharedStoreMasterPolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/SharedStoreMasterPolicyConfiguration.java index c868022998..6668695bf0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/SharedStoreMasterPolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/SharedStoreMasterPolicyConfiguration.java @@ -21,8 +21,6 @@ import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfiguration { - private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); - private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); public SharedStoreMasterPolicyConfiguration() { @@ -33,12 +31,13 @@ public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfigurati return TYPE.SHARED_STORE_MASTER; } + @Deprecated public long getFailbackDelay() { - return failbackDelay; + return -1; } + @Deprecated public SharedStoreMasterPolicyConfiguration setFailbackDelay(long failbackDelay) { - this.failbackDelay = failbackDelay; return this; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/SharedStoreSlavePolicyConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/SharedStoreSlavePolicyConfiguration.java index 8e220771c7..f29c1d09bc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/SharedStoreSlavePolicyConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/SharedStoreSlavePolicyConfiguration.java @@ -22,8 +22,6 @@ import org.apache.activemq.artemis.core.config.ScaleDownConfiguration; public class SharedStoreSlavePolicyConfiguration implements HAPolicyConfiguration { - private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); - private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup(); @@ -76,13 +74,13 @@ public class SharedStoreSlavePolicyConfiguration implements HAPolicyConfiguratio return this; } + @Deprecated public long getFailbackDelay() { - return failbackDelay; + return -1; } + @Deprecated public SharedStoreSlavePolicyConfiguration setFailbackDelay(long failbackDelay) { - this.failbackDelay = failbackDelay; return this; } - } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 0b081ad1cb..0e75719d22 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -920,6 +920,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK)); + configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO)); + return configuration; } @@ -932,7 +934,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setAllowFailBack(getBoolean(policyNode, "allow-failback", configuration.isAllowFailBack())); - configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO)); + configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout", configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO)); configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK)); @@ -948,8 +950,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown())); - configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO)); - return configuration; } @@ -960,8 +960,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown())); - configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO)); - configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", configuration.isRestartBackup())); configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 76cb1bcf74..23dff8d594 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -335,12 +335,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { Journal getMessageJournal(); /** - * @see org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager#startReplication(org.apache.activemq.artemis.core.replication.ReplicationManager, org.apache.activemq.artemis.core.paging.PagingManager, String, boolean) + * @see org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager#startReplication(org.apache.activemq.artemis.core.replication.ReplicationManager, org.apache.activemq.artemis.core.paging.PagingManager, String, boolean, long) */ void startReplication(ReplicationManager replicationManager, PagingManager pagingManager, String nodeID, - boolean autoFailBack) throws Exception; + boolean autoFailBack, + long initialReplicationSyncTimeout) throws Exception; /** * Write message to page if we are paging. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 440bd625d2..cac2c00fea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -305,7 +305,8 @@ public class JournalStorageManager implements StorageManager { public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager, String nodeID, - final boolean autoFailBack) throws Exception { + final boolean autoFailBack, + long initialReplicationSyncTimeout) throws Exception { if (!started) { throw new IllegalStateException("JournalStorageManager must be started..."); } @@ -376,7 +377,7 @@ public class JournalStorageManager implements StorageManager { storageManagerLock.writeLock().lock(); try { if (replicator != null) { - replicator.sendSynchronizationDone(nodeID); + replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout); performCachedLargeMessageDeletes(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index e6c1fe0cad..289cb77783 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -473,7 +473,8 @@ public class NullStorageManager implements StorageManager { public void startReplication(final ReplicationManager replicationManager, final PagingManager pagingManager, final String nodeID, - final boolean autoFailBack) throws Exception { + final boolean autoFailBack, + long initialReplicationSyncTimeout) throws Exception { // no-op } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index 796fb60cc2..44578085a9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -35,6 +35,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_PAGE_WRITE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_PREPARE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT; @@ -64,6 +65,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; @@ -120,6 +122,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder { packet = new ReplicationResponseMessage(); break; } + case REPLICATION_RESPONSE_V2: { + packet = new ReplicationResponseMessageV2(); + break; + } case REPLICATION_PAGE_WRITE: { packet = new ReplicationPageWriteMessage(); break; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java index ac06997918..c7eff85466 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java @@ -18,9 +18,13 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public final class ReplicationResponseMessage extends PacketImpl { +public class ReplicationResponseMessage extends PacketImpl { public ReplicationResponseMessage() { super(PacketImpl.REPLICATION_RESPONSE); } + + public ReplicationResponseMessage(byte replicationResponseV2) { + super(replicationResponseV2); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java new file mode 100644 index 0000000000..146a3de8dc --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; + +public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage { + boolean synchronizationIsFinishedAcknowledgement = false; + + public ReplicationResponseMessageV2(final boolean synchronizationIsFinishedAcknowledgement) { + super(REPLICATION_RESPONSE_V2); + + this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; + } + + public ReplicationResponseMessageV2() { + super(PacketImpl.REPLICATION_RESPONSE_V2); + } + + public boolean isSynchronizationIsFinishedAcknowledgement() { + return synchronizationIsFinishedAcknowledgement; + } + + public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { + this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeBoolean(synchronizationIsFinishedAcknowledgement); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + synchronizationIsFinishedAcknowledgement = buffer.readBoolean(); + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(getParentString()); + buf.append(", synchronizationIsFinishedAcknowledgement=" + synchronizationIsFinishedAcknowledgement); + buf.append("]"); + return buf.toString(); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java index d6d6753d69..56c946109c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java @@ -203,4 +203,16 @@ public class ReplicationStartSyncMessage extends PacketImpl { return false; return true; } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(getParentString()); + buf.append(", synchronizationIsFinished=" + synchronizationIsFinished); + buf.append(", dataType=" + dataType); + buf.append(", nodeID=" + nodeID); + buf.append(", ids=" + Arrays.toString(ids)); + buf.append(", allowsAutoFailBack=" + allowsAutoFailBack); + buf.append("]"); + return buf.toString(); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index c96a10f6e3..c79e572bfd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -68,6 +68,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage.SyncDataType; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; @@ -196,7 +197,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon handleLargeMessageEnd((ReplicationLargeMessageEndMessage) packet); } else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC) { - handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet); + response = handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet); } else if (type == PacketImpl.REPLICATION_SYNC_FILE) { handleReplicationSynchronization((ReplicationSyncFileMessage) packet); @@ -476,19 +477,23 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon * * @param packet * @throws Exception + * @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise + * return an empty response */ - private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception { + private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception { + ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); if (activation.isRemoteBackupUpToDate()) { throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate(); } synchronized (this) { if (!started) - return; + return replicationResponseMessage; if (packet.isSynchronizationFinished()) { finishSynchronization(packet.getNodeID()); - return; + replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); + return replicationResponseMessage; } switch (packet.getDataType()) { @@ -523,6 +528,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType(); } } + + return replicationResponseMessage; } private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index fa2b72c8d8..d276474eb1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -58,12 +58,15 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.ReusableLatch; /** * Manages replication tasks on the live server (that is the live server side of a "remote backup" @@ -116,6 +119,8 @@ public final class ReplicationManager implements ActiveMQComponent { private volatile boolean inSync = true; + private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); + /** * @param remotingConnection */ @@ -392,8 +397,14 @@ public final class ReplicationManager implements ActiveMQComponent { private final class ResponseHandler implements ChannelHandler { public void handlePacket(final Packet packet) { - if (packet.getType() == PacketImpl.REPLICATION_RESPONSE) { + if (packet.getType() == PacketImpl.REPLICATION_RESPONSE || packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) { replicated(); + if (packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) { + ReplicationResponseMessageV2 replicationResponseMessage = (ReplicationResponseMessageV2) packet; + if (replicationResponseMessage.isSynchronizationIsFinishedAcknowledgement()) { + synchronizationIsFinishedAcknowledgement.countDown(); + } + } } } @@ -534,9 +545,18 @@ public final class ReplicationManager implements ActiveMQComponent { * * @param nodeID */ - public void sendSynchronizationDone(String nodeID) { + public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) { if (enabled) { + synchronizationIsFinishedAcknowledgement.countUp(); sendReplicatePacket(new ReplicationStartSyncMessage(nodeID)); + try { + if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) { + throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); + } + } + catch (InterruptedException e) { + ActiveMQServerLogger.LOGGER.debug(e); + } inSync = false; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index e4402f4730..3d57adec41 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -359,4 +359,7 @@ public interface ActiveMQMessageBundle { @Message(id = 119113, value = "Invalid message load balancing type {0}", format = Message.Format.MESSAGE_FORMAT) IllegalArgumentException invalidMessageLoadBalancingType(String val); + + @Message(id = 119114, value = "Replication synchronization process timed out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT) + IllegalStateException replicationSynchronizationTimeout(long timeout); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java index 421daaa21f..9d54b928c5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java @@ -51,9 +51,11 @@ public abstract class NodeManager implements ActiveMQComponent { public abstract void awaitLiveNode() throws Exception; + public abstract void awaitLiveStatus() throws Exception; + public abstract void startBackup() throws Exception; - public abstract void startLiveNode() throws Exception; + public abstract ActivateCallback startLiveNode() throws Exception; public abstract void pauseLiveServer() throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java index 6ec85b49d2..c32b446e63 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java @@ -36,7 +36,7 @@ public class ReplicaPolicy extends BackupPolicy { //used if we create a replicated policy for when we become live. private boolean allowFailback = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); - private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); + private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); private ReplicatedPolicy replicatedPolicy; @@ -48,14 +48,14 @@ public class ReplicaPolicy extends BackupPolicy { String groupName, boolean restartBackup, boolean allowFailback, - long failbackDelay, + long initialReplicationSyncTimeout, ScaleDownPolicy scaleDownPolicy) { this.clusterName = clusterName; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.groupName = groupName; this.restartBackup = restartBackup; this.allowFailback = allowFailback; - this.failbackDelay = failbackDelay; + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.scaleDownPolicy = scaleDownPolicy; } @@ -87,7 +87,7 @@ public class ReplicaPolicy extends BackupPolicy { public ReplicatedPolicy getReplicatedPolicy() { if (replicatedPolicy == null) { - replicatedPolicy = new ReplicatedPolicy(false, allowFailback, failbackDelay, groupName, clusterName, this); + replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this); } return replicatedPolicy; } @@ -137,12 +137,21 @@ public class ReplicaPolicy extends BackupPolicy { this.allowFailback = allowFailback; } + @Deprecated public long getFailbackDelay() { - return failbackDelay; + return -1; } + @Deprecated public void setFailbackDelay(long failbackDelay) { - this.failbackDelay = failbackDelay; + } + + public long getInitialReplicationSyncTimeout() { + return initialReplicationSyncTimeout; + } + + public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) { + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java index 85fde90850..295a862809 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java @@ -31,14 +31,14 @@ public class ReplicatedPolicy implements HAPolicy { private String clusterName; + private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout(); + /* * these are only set by the ReplicaPolicy after failover to decide if the live server can failback, these should not * be exposed in configuration. * */ private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); - private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); - /* * this are only used as the policy when the server is started as a live after a failover * */ @@ -48,10 +48,11 @@ public class ReplicatedPolicy implements HAPolicy { replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this); } - public ReplicatedPolicy(boolean checkForLiveServer, String groupName, String clusterName) { + public ReplicatedPolicy(boolean checkForLiveServer, String groupName, String clusterName, long initialReplicationSyncTimeout) { this.checkForLiveServer = checkForLiveServer; this.groupName = groupName; this.clusterName = clusterName; + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; /* * we create this with sensible defaults in case we start after a failover * */ @@ -59,7 +60,7 @@ public class ReplicatedPolicy implements HAPolicy { public ReplicatedPolicy(boolean checkForLiveServer, boolean allowAutoFailBack, - long failbackDelay, + long initialReplicationSyncTimeout, String groupName, String clusterName, ReplicaPolicy replicaPolicy) { @@ -67,7 +68,7 @@ public class ReplicatedPolicy implements HAPolicy { this.clusterName = clusterName; this.groupName = groupName; this.allowAutoFailBack = allowAutoFailBack; - this.failbackDelay = failbackDelay; + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.replicaPolicy = replicaPolicy; } @@ -83,12 +84,21 @@ public class ReplicatedPolicy implements HAPolicy { return allowAutoFailBack; } + @Deprecated public long getFailbackDelay() { - return failbackDelay; + return -1; } + @Deprecated public void setFailbackDelay(long failbackDelay) { - this.failbackDelay = failbackDelay; + } + + public long getInitialReplicationSyncTimeout() { + return initialReplicationSyncTimeout; + } + + public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) { + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; } public String getClusterName() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java index 653cd93e97..d1fcb65470 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java @@ -25,8 +25,6 @@ import java.util.Map; public class SharedStoreMasterPolicy implements HAPolicy { - private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); - private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private SharedStoreSlavePolicy sharedStoreSlavePolicy; @@ -34,17 +32,17 @@ public class SharedStoreMasterPolicy implements HAPolicy { public SharedStoreMasterPolicy() { } - public SharedStoreMasterPolicy(long failbackDelay, boolean failoverOnServerShutdown) { - this.failbackDelay = failbackDelay; + public SharedStoreMasterPolicy(boolean failoverOnServerShutdown) { this.failoverOnServerShutdown = failoverOnServerShutdown; } + @Deprecated public long getFailbackDelay() { - return failbackDelay; + return -1; } + @Deprecated public void setFailbackDelay(long failbackDelay) { - this.failbackDelay = failbackDelay; } public boolean isFailoverOnServerShutdown() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java index 7f2693c151..af6a95538a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java @@ -25,8 +25,6 @@ import java.util.Map; public class SharedStoreSlavePolicy extends BackupPolicy { - private long failbackDelay = ActiveMQDefaultConfiguration.getDefaultFailbackDelay(); - private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); @@ -37,24 +35,23 @@ public class SharedStoreSlavePolicy extends BackupPolicy { public SharedStoreSlavePolicy() { } - public SharedStoreSlavePolicy(long failbackDelay, - boolean failoverOnServerShutdown, + public SharedStoreSlavePolicy(boolean failoverOnServerShutdown, boolean restartBackup, boolean allowAutoFailBack, ScaleDownPolicy scaleDownPolicy) { - this.failbackDelay = failbackDelay; this.failoverOnServerShutdown = failoverOnServerShutdown; this.restartBackup = restartBackup; this.allowAutoFailBack = allowAutoFailBack; this.scaleDownPolicy = scaleDownPolicy; } + @Deprecated public long getFailbackDelay() { - return failbackDelay; + return -1; } + @Deprecated public void setFailbackDelay(long failbackDelay) { - this.failbackDelay = failbackDelay; } public boolean isFailoverOnServerShutdown() { @@ -67,7 +64,7 @@ public class SharedStoreSlavePolicy extends BackupPolicy { public SharedStoreMasterPolicy getSharedStoreMasterPolicy() { if (sharedStoreMasterPolicy == null) { - sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failbackDelay, failoverOnServerShutdown); + sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown); } return sharedStoreMasterPolicy; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java index acb431d388..6169ceff7d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java @@ -23,6 +23,7 @@ import java.nio.channels.FileLock; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.utils.UUID; @@ -153,7 +154,7 @@ public class FileLockNodeManager extends NodeManager { } @Override - public void startLiveNode() throws Exception { + public ActivateCallback startLiveNode() throws Exception { setFailingBack(); String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds"; @@ -164,7 +165,29 @@ public class FileLockNodeManager extends NodeManager { ActiveMQServerLogger.LOGGER.obtainedLiveLock(); - setLive(); + return new ActivateCallback() { + @Override + public void preActivate() { + } + + @Override + public void activated() { + } + + @Override + public void deActivate() { + } + + @Override + public void activationComplete() { + try { + setLive(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; } @Override @@ -183,6 +206,13 @@ public class FileLockNodeManager extends NodeManager { } } + @Override + public void awaitLiveStatus() throws Exception { + while (getState() != LIVE) { + Thread.sleep(2000); + } + } + private void setLive() throws Exception { writeFileLockStatus(FileLockNodeManager.LIVE); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java index 726cb50003..48f16270af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java @@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -90,16 +91,45 @@ public final class InVMNodeManager extends NodeManager { } } + @Override + public void awaitLiveStatus() throws Exception { + while (state != LIVE) { + Thread.sleep(10); + } + } + @Override public void startBackup() throws Exception { backupLock.acquire(); } @Override - public void startLiveNode() throws Exception { + public ActivateCallback startLiveNode() throws Exception { state = FAILING_BACK; liveLock.acquire(); - state = LIVE; + return new ActivateCallback() { + @Override + public void preActivate() { + } + + @Override + public void activated() { + } + + @Override + public void deActivate() { + } + + @Override + public void activationComplete() { + try { + state = LIVE; + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; } @Override @@ -110,8 +140,6 @@ public final class InVMNodeManager extends NodeManager { @Override public void crashLiveServer() throws Exception { - //overkill as already set to live - state = LIVE; liveLock.release(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 50c45e66f2..52d62605a6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation { Thread t = new Thread(new Runnable() { public void run() { try { - activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), activeMQServer.getNodeID().toString(), isFailBackRequest && replicatedPolicy.isAllowAutoFailBack()); + activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), activeMQServer.getNodeID().toString(), isFailBackRequest && replicatedPolicy.isAllowAutoFailBack(), replicatedPolicy.getInitialReplicationSyncTimeout()); clusterConnection.nodeAnnounced(System.currentTimeMillis(), activeMQServer.getNodeID().toString(), replicatedPolicy.getGroupName(), replicatedPolicy.getScaleDownGroupName(), pair, true); @@ -168,13 +168,7 @@ public class SharedNothingLiveActivation extends LiveActivation { BackupTopologyListener listener1 = new BackupTopologyListener(activeMQServer.getNodeID().toString()); clusterConnection.addClusterTopologyListener(listener1); if (listener1.waitForBackup()) { - try { - Thread.sleep(replicatedPolicy.getFailbackDelay()); - } - catch (InterruptedException e) { - // - } - //if we have to many backups kept or arent configured to restart just stop, otherwise restart as a backup + //if we have to many backups kept or are not configured to restart just stop, otherwise restart as a backup if (!replicatedPolicy.getReplicaPolicy().isRestartBackup() && activeMQServer.countNumberOfCopiedJournals() >= replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() && replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() >= 0) { activeMQServer.stop(true); ActiveMQServerLogger.LOGGER.stopReplicatedBackupAfterFailback(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java index e556b5d05c..0aee108391 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.server.impl; +import java.nio.channels.ClosedChannelException; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -30,9 +33,6 @@ import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.management.ManagementService; -import java.nio.channels.ClosedChannelException; -import java.util.concurrent.TimeUnit; - public final class SharedStoreBackupActivation extends Activation { //this is how we act as a backup @@ -191,38 +191,51 @@ public final class SharedStoreBackupActivation extends Activation { } private class FailbackChecker implements Runnable { + BackupTopologyListener backupListener; + + FailbackChecker() { + backupListener = new BackupTopologyListener(activeMQServer.getNodeID().toString()); + activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener); + } private boolean restarting = false; public void run() { try { if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) { - ActiveMQServerLogger.LOGGER.awaitFailBack(); - restarting = true; - Thread t = new Thread(new Runnable() { - public void run() { - try { - ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Stopping live node in favor of failback"); + if (backupListener.waitForBackup()) { + ActiveMQServerLogger.LOGGER.awaitFailBack(); + restarting = true; + Thread t = new Thread(new Runnable() { + public void run() { + try { + ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Stopping live node in favor of failback"); - activeMQServer.stop(true, false, true); - // We need to wait some time before we start the backup again - // otherwise we may eventually start before the live had a chance to get it - Thread.sleep(sharedStoreSlavePolicy.getFailbackDelay()); - synchronized (failbackCheckerGuard) { - if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup()) - return; + NodeManager nodeManager = activeMQServer.getNodeManager(); + activeMQServer.stop(true, false, true); - activeMQServer.setHAPolicy(sharedStoreSlavePolicy); - ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Starting backup node now after failback"); - activeMQServer.start(); + // ensure that the server to which we are failing back actually starts fully before we restart + nodeManager.start(); + nodeManager.awaitLiveStatus(); + nodeManager.stop(); + + synchronized (failbackCheckerGuard) { + if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup()) + return; + + activeMQServer.setHAPolicy(sharedStoreSlavePolicy); + ActiveMQServerLogger.LOGGER.debug(activeMQServer + "::Starting backup node now after failback"); + activeMQServer.start(); + } + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.serverRestartWarning(); + e.printStackTrace(); } } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.serverRestartWarning(); - } - } - }); - t.start(); + }); + t.start(); + } } } catch (Exception e) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java index 95c24ab7db..f48bb6c334 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java @@ -55,10 +55,9 @@ public final class SharedStoreLiveActivation extends LiveActivation { } activeMQServer.getBackupManager().start(); activeMQServer.getBackupManager().announceBackup(); - Thread.sleep(sharedStoreMasterPolicy.getFailbackDelay()); } - activeMQServer.getNodeManager().startLiveNode(); + activeMQServer.registerActivateCallback(activeMQServer.getNodeManager().startLiveNode()); if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) { return; diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 6f2583108a..5ab8b7680f 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1624,6 +1624,14 @@ + + + + The amount of time to wait for the replica to acknowledge it has received all the necessary data from + the replicating server at the final step of the initial replication synchronization process. + + + @@ -1681,7 +1689,16 @@ - if we have to start as a replicated server this is the delay to wait before fail-back occurs + DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back occurs + + + + + + + If we have to start as a replicated server this is the amount of time to wait for the replica to + acknowledge it has received all the necessary data from the replicating server at the final step + of the initial replication synchronization process. @@ -1736,7 +1753,7 @@ - delay to wait before fail-back occurs on (live's) restart + DEPRECATED: delay to wait before fail-back occurs on (live's) restart @@ -1764,7 +1781,7 @@ - delay to wait before fail-back occurs on (live's) restart + DEPRECATED: delay to wait before fail-back occurs on (live's) restart diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java index 9f5d2c52e7..7fd25f1c5f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java @@ -121,6 +121,7 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { assertEquals(replicatedPolicy.getGroupName(), "purple"); assertTrue(replicatedPolicy.isCheckForLiveServer()); assertEquals(replicatedPolicy.getClusterName(), "abcdefg"); + assertEquals(replicatedPolicy.getInitialReplicationSyncTimeout(), 9876); } finally { server.stop(); @@ -142,6 +143,8 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { assertEquals(replicaPolicy.getMaxSavedReplicatedJournalsSize(), 22); assertEquals(replicaPolicy.getClusterName(), "33rrrrr"); assertFalse(replicaPolicy.isRestartBackup()); + assertTrue(replicaPolicy.isAllowFailback()); + assertEquals(replicaPolicy.getInitialReplicationSyncTimeout(), 9876); ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy(); assertNotNull(scaleDownPolicy); assertEquals(scaleDownPolicy.getGroupName(), "boo!"); @@ -219,7 +222,6 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { HAPolicy haPolicy = server.getHAPolicy(); assertTrue(haPolicy instanceof SharedStoreMasterPolicy); SharedStoreMasterPolicy masterPolicy = (SharedStoreMasterPolicy) haPolicy; - assertEquals(masterPolicy.getFailbackDelay(), 3456); assertFalse(masterPolicy.isFailoverOnServerShutdown()); } finally { @@ -237,11 +239,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { assertTrue(activation instanceof SharedStoreBackupActivation); HAPolicy haPolicy = server.getHAPolicy(); assertTrue(haPolicy instanceof SharedStoreSlavePolicy); - SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy; - assertEquals(replicaPolicy.getFailbackDelay(), 9876); - assertFalse(replicaPolicy.isFailoverOnServerShutdown()); - assertFalse(replicaPolicy.isRestartBackup()); - ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy(); + SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy; + assertFalse(sharedStoreSlavePolicy.isFailoverOnServerShutdown()); + assertFalse(sharedStoreSlavePolicy.isRestartBackup()); + ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy(); assertNotNull(scaleDownPolicy); assertEquals(scaleDownPolicy.getGroupName(), "boo!"); assertEquals(scaleDownPolicy.getDiscoveryGroup(), "wahey"); @@ -264,11 +265,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { assertTrue(activation instanceof SharedStoreBackupActivation); HAPolicy haPolicy = server.getHAPolicy(); assertTrue(haPolicy instanceof SharedStoreSlavePolicy); - SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy; - assertEquals(replicaPolicy.getFailbackDelay(), 5678); - assertTrue(replicaPolicy.isFailoverOnServerShutdown()); - assertTrue(replicaPolicy.isRestartBackup()); - ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy(); + SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy; + assertTrue(sharedStoreSlavePolicy.isFailoverOnServerShutdown()); + assertTrue(sharedStoreSlavePolicy.isRestartBackup()); + ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy(); assertNotNull(scaleDownPolicy); assertEquals(scaleDownPolicy.getGroupName(), "boo!"); assertEquals(scaleDownPolicy.getDiscoveryGroup(), null); @@ -293,11 +293,10 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { assertTrue(activation instanceof SharedStoreBackupActivation); HAPolicy haPolicy = server.getHAPolicy(); assertTrue(haPolicy instanceof SharedStoreSlavePolicy); - SharedStoreSlavePolicy replicaPolicy = (SharedStoreSlavePolicy) haPolicy; - assertEquals(replicaPolicy.getFailbackDelay(), 5678); - assertTrue(replicaPolicy.isFailoverOnServerShutdown()); - assertTrue(replicaPolicy.isRestartBackup()); - ScaleDownPolicy scaleDownPolicy = replicaPolicy.getScaleDownPolicy(); + SharedStoreSlavePolicy sharedStoreSlavePolicy = (SharedStoreSlavePolicy) haPolicy; + assertTrue(sharedStoreSlavePolicy.isFailoverOnServerShutdown()); + assertTrue(sharedStoreSlavePolicy.isRestartBackup()); + ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy(); assertNull(scaleDownPolicy); } finally { @@ -349,10 +348,8 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase { assertNotNull(livePolicy); assertFalse(livePolicy.isFailoverOnServerShutdown()); - assertEquals(livePolicy.getFailbackDelay(), 1234); SharedStoreSlavePolicy backupPolicy = (SharedStoreSlavePolicy) colocatedPolicy.getBackupPolicy(); assertNotNull(backupPolicy); - assertEquals(backupPolicy.getFailbackDelay(), 44); assertFalse(backupPolicy.isFailoverOnServerShutdown()); assertFalse(backupPolicy.isRestartBackup()); } diff --git a/artemis-server/src/test/resources/colocated-hapolicy-config2.xml b/artemis-server/src/test/resources/colocated-hapolicy-config2.xml index ff65b54449..ca13b65c01 100644 --- a/artemis-server/src/test/resources/colocated-hapolicy-config2.xml +++ b/artemis-server/src/test/resources/colocated-hapolicy-config2.xml @@ -28,19 +28,15 @@ false 33 - 1234 false - 44 false false - - c - + diff --git a/artemis-server/src/test/resources/replica-hapolicy-config.xml b/artemis-server/src/test/resources/replica-hapolicy-config.xml index 71836734f5..03983fc8d5 100644 --- a/artemis-server/src/test/resources/replica-hapolicy-config.xml +++ b/artemis-server/src/test/resources/replica-hapolicy-config.xml @@ -30,7 +30,7 @@ 33rrrrr false true - 9876 + 9876 boo! diff --git a/artemis-server/src/test/resources/replicated-hapolicy-config.xml b/artemis-server/src/test/resources/replicated-hapolicy-config.xml index d41931499c..8195e45379 100644 --- a/artemis-server/src/test/resources/replicated-hapolicy-config.xml +++ b/artemis-server/src/test/resources/replicated-hapolicy-config.xml @@ -26,6 +26,7 @@ purple true abcdefg + 9876 diff --git a/artemis-server/src/test/resources/shared-store-master-hapolicy-config.xml b/artemis-server/src/test/resources/shared-store-master-hapolicy-config.xml index cb55b42875..132cd00c92 100644 --- a/artemis-server/src/test/resources/shared-store-master-hapolicy-config.xml +++ b/artemis-server/src/test/resources/shared-store-master-hapolicy-config.xml @@ -22,11 +22,9 @@ - 3456 false - diff --git a/artemis-server/src/test/resources/shared-store-slave-hapolicy-config.xml b/artemis-server/src/test/resources/shared-store-slave-hapolicy-config.xml index 57dff39245..28c6051f89 100644 --- a/artemis-server/src/test/resources/shared-store-slave-hapolicy-config.xml +++ b/artemis-server/src/test/resources/shared-store-slave-hapolicy-config.xml @@ -26,7 +26,6 @@ true - 9876 false false diff --git a/artemis-server/src/test/resources/shared-store-slave-hapolicy-config2.xml b/artemis-server/src/test/resources/shared-store-slave-hapolicy-config2.xml index 3c1c19b5e0..57acd13f3d 100644 --- a/artemis-server/src/test/resources/shared-store-slave-hapolicy-config2.xml +++ b/artemis-server/src/test/resources/shared-store-slave-hapolicy-config2.xml @@ -22,7 +22,6 @@ - 5678 true true @@ -38,5 +37,4 @@ - diff --git a/artemis-server/src/test/resources/shared-store-slave-hapolicy-config3.xml b/artemis-server/src/test/resources/shared-store-slave-hapolicy-config3.xml index 1c62ae3f7f..991ea55057 100644 --- a/artemis-server/src/test/resources/shared-store-slave-hapolicy-config3.xml +++ b/artemis-server/src/test/resources/shared-store-slave-hapolicy-config3.xml @@ -22,12 +22,10 @@ - 5678 true true - diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md index e0e032db31..1bfec9f95f 100644 --- a/docs/user-manual/en/ha.md +++ b/docs/user-manual/en/ha.md @@ -113,8 +113,13 @@ the connection speed. > **Note** > -> Synchronization occurs in parallel with current network traffic so -> this won't cause any blocking on current clients. +> In general, synchronization occurs in parallel with current network traffic so +> this won't cause any blocking on current clients. However, there is a critical +> moment at the end of this process where the replicating server must complete +> the synchronization and ensure the replica acknowledges this completion. This +> exchange between the replicating server and replica will block any journal +> related operations. The maximum length of time that this exchange will block +> is controlled by the `initial-replication-sync-timeout` configuration element. Replication will create a copy of the data at the backup. One issue to be aware of is: in case of a successful fail-over, the backup's data @@ -257,11 +262,14 @@ HA strategy Replication for `master`: `group-name` - Whether to check the cluster for a (live) server using our own server ID when starting up. This option is only necessary for performing 'fail-back' on replicating servers. + If set, backup servers will only pair with live servers with matching group-name. - `check-for-live-server` - If set, backup servers will only pair with live servers with matching group-name. + `initial-replication-sync-timeout` + The amount of time the replicating server will wait at the completion of the initial + replication process for the replica to acknowledge it has received all the necessary + data. The default is 30,000 milliseconds. Note: during this interval any + journal related operations will be blocked. @@ -309,8 +317,14 @@ HA strategy Replication for `slave`: failed over - `failback-delay` - delay to wait before fail-back occurs on (failed over live's) restart + `initial-replication-sync-timeout` + After failover and the slave has become live, this is + set on the new live server. It represents the amount of time + the replicating server will wait at the completion of the + initial replication process for the replica to acknowledge + it has received all the necessary data. The default is + 30,000 milliseconds. Note: during this interval any + journal related operations will be blocked. @@ -405,16 +419,10 @@ stop. This configuration would look like: true - 5000 - -The `failback-delay` configures how long the backup must wait after -automatically stopping before it restarts. This is to gives the live -server time to start and obtain its lock. - In replication HA mode you need to set an extra property `check-for-live-server` to `true` in the `master` configuration. If set to true, during start-up a live server will first search the cluster for @@ -491,13 +499,6 @@ HA strategy shared store for `master`: - - `failback-delay` - If a backup server is detected as being live, - via the lock file, then the live server will wait - announce itself as a backup and wait this amount - of time (in ms) before starting as a live - `failover-on-server-shutdown` If set to true then when this server is stopped @@ -512,7 +513,7 @@ HA strategy shared store for `master`: The following table lists all the `ha-policy` configuration elements for HA strategy Shared Store for `slave`: - +
@@ -539,17 +540,6 @@ HA strategy Shared Store for `slave`: places a request to take over its place. The use case is when the backup has failed over. - - - -
`failback-delay`After failover and the slave has become live, this is - set on the new live server. When starting If a backup server - is detected as being live, via the lock file, then the live - server will wait announce itself as a backup and wait this - amount of time (in ms) before starting as a live, however - this is unlikely since this backup has just stopped anyway. - It is also used as the delay after failback before this backup - will restart (if `allow-failback` is set to true.
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java index 07bcc13f7b..ee0bee600c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java @@ -49,7 +49,7 @@ public class NodeManagerAction { for (int action : work) { switch (action) { case START_LIVE: - nodeManager.startLiveNode(); + nodeManager.startLiveNode().activationComplete(); hasLiveLock = true; hasBackupLock = false; break; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailBackAutoTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailBackAutoTest.java index 6530b681d4..2fa0f1004c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailBackAutoTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailBackAutoTest.java @@ -249,11 +249,11 @@ public class FailBackAutoTest extends FailoverTestBase { TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setRestartBackup(true)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setRestartBackup(true)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(100)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector); + liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector); liveServer = createTestableServer(liveConfig); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverListenerTest.java index 03c08add22..2088748c78 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverListenerTest.java @@ -187,11 +187,11 @@ public class FailoverListenerTest extends FailoverTestBase { TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector); + liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName(), backupConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector); liveServer = createTestableServer(liveConfig); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java index 4dd5ad3c1d..1dfc9d62a4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java @@ -159,11 +159,11 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); + liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); liveServer = createTestableServer(liveConfig); } @@ -191,7 +191,7 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { } protected void setupHAPolicyConfiguration() { - ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(0).setAllowFailBack(true).setFailbackDelay(5000); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(0).setAllowFailBack(true); } protected final void adaptLiveConfigForReplicatedFailBack(TestableServer server) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java index d74cee933b..430e0ecf0b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java @@ -54,11 +54,11 @@ public class LiveToLiveFailoverTest extends FailoverTest { TransportConfiguration liveConnector0 = getConnectorTransportConfiguration(true, 0); TransportConfiguration liveConnector1 = getConnectorTransportConfiguration(true, 1); - backupConfig = super.createDefaultInVMConfig(1).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 1)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration().addConnector(liveConnector1.getName())))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector1.getName(), liveConnector0.getName())); + backupConfig = super.createDefaultInVMConfig(1).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 1)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration().addConnector(liveConnector1.getName())))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector1.getName(), liveConnector0.getName())); backupServer = createColocatedTestableServer(backupConfig, nodeManager1, nodeManager0, 1); - liveConfig = super.createDefaultInVMConfig(0).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 0)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setBackupRequestRetryInterval(1000).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration()))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector0.getName(), liveConnector1.getName())); + liveConfig = super.createDefaultInVMConfig(0).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true, 0)).setHAPolicyConfiguration(new ColocatedPolicyConfiguration().setRequestBackup(true).setBackupRequestRetryInterval(1000).setLiveConfig(new SharedStoreMasterPolicyConfiguration()).setBackupConfig(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration()))).addConnectorConfiguration(liveConnector0.getName(), liveConnector0).addConnectorConfiguration(liveConnector1.getName(), liveConnector1).addClusterConfiguration(basicClusterConnectionConfig(liveConnector0.getName(), liveConnector1.getName())); liveServer = createColocatedTestableServer(liveConfig, nodeManager0, nodeManager1, 0); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java index 4c0a23e3b8..156400b0f2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/MultipleServerFailoverTestBase.java @@ -82,7 +82,6 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase { if (isSharedStore()) { haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration(); - ((SharedStoreMasterPolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000); } else { haPolicyConfiguration = new ReplicatedPolicyConfiguration(); @@ -128,11 +127,9 @@ public abstract class MultipleServerFailoverTestBase extends ActiveMQTestBase { if (isSharedStore()) { haPolicyConfiguration = new SharedStoreSlavePolicyConfiguration(); - ((SharedStoreSlavePolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000); } else { haPolicyConfiguration = new ReplicaPolicyConfiguration(); - ((ReplicaPolicyConfiguration) haPolicyConfiguration).setFailbackDelay(1000); if (getNodeGroupName() != null) { ((ReplicaPolicyConfiguration) haPolicyConfiguration).setGroupName(getNodeGroupName() + "-" + i); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java index a93335292f..d846cc5058 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java @@ -126,7 +126,7 @@ public class ReplicatedFailoverTest extends FailoverTest { protected void setupHAPolicyConfiguration() { if (isReplicatedFailbackTest) { ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); - ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true).setFailbackDelay(2000); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); } else { super.setupHAPolicyConfiguration(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SecurityFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SecurityFailoverTest.java index 4653a1e69e..e5af60ef5d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SecurityFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SecurityFailoverTest.java @@ -80,7 +80,7 @@ public class SecurityFailoverTest extends FailoverTest { TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setSecurityEnabled(true).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setSecurityEnabled(true).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupServer = createTestableServer(backupConfig); ActiveMQSecurityManagerImpl securityManager = installSecurity(backupServer); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreBackupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreBackupTest.java index 0b9c30fabc..4574adfbda 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreBackupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreBackupTest.java @@ -60,11 +60,11 @@ public class SharedStoreBackupTest extends FailoverTestBase { TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); System.out.println("backup config created - mnovak"); - backupConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailbackDelay(1000).setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false)).setRestartBackup(false)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false)).setRestartBackup(false)).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailbackDelay(1000).setFailoverOnServerShutdown(true)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); + liveConfig = super.createDefaultConfig(false).clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true)).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); liveServer = createTestableServer(liveConfig); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index 7b264402c4..6d1403525c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.replication.ReplicationEndpoint; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -171,7 +172,7 @@ public class BackupSyncDelay implements Interceptor { receivedUpToDate = true; assert onHold == null; onHold = packet; - PacketImpl response = new ReplicationResponseMessage(); + PacketImpl response = new ReplicationResponseMessageV2(true); channel.send(response); return; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java index 2845c9d115..cbdc050fb2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; +import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.core.cluster.DiscoveryEntry; @@ -180,12 +181,33 @@ public class DiscoveryBaseTest extends ActiveMQTestBase { public void awaitLiveNode() throws Exception { } + @Override + public void awaitLiveStatus() throws Exception { + } + @Override public void startBackup() throws Exception { } @Override - public void startLiveNode() throws Exception { + public ActivateCallback startLiveNode() throws Exception { + return new ActivateCallback() { + @Override + public void preActivate() { + } + + @Override + public void activated() { + } + + @Override + public void deActivate() { + } + + @Override + public void activationComplete() { + } + }; } @Override