activationParams,
+ IOCriticalErrorListener shutdownOnCriticalIO) throws Exception {
+ return new ReplicationPrimaryActivation(server,
+ DistributedPrimitiveManager.newInstanceOf(
+ distributedManagerConfiguration.getClassName(),
+ distributedManagerConfiguration.getProperties()), this);
+ }
+
+ @Override
+ public boolean isSharedStore() {
+ return false;
+ }
+
+ @Override
+ public boolean isBackup() {
+ return false;
+ }
+
+ @Override
+ public boolean isWaitForActivation() {
+ return true;
+ }
+
+ @Override
+ public boolean canScaleDown() {
+ return false;
+ }
+
+ @Override
+ public String getBackupGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public String getScaleDownGroupName() {
+ return null;
+ }
+
+ @Override
+ public String getScaleDownClustername() {
+ return null;
+ }
+
+ public boolean isCheckForLiveServer() {
+ return checkForLiveServer;
+ }
+
+ public boolean isAllowAutoFailBack() {
+ return allowAutoFailBack;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public long getInitialReplicationSyncTimeout() {
+ return initialReplicationSyncTimeout;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public boolean useQuorumManager() {
+ return false;
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
index 2e4b0f7523..82b0a3f4ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
@@ -28,11 +28,12 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LiveNodeLocator.BackupRegistrationListener;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.jboss.logging.Logger;
-public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {
+public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener, BackupRegistrationListener {
private static final Logger LOGGER = Logger.getLogger(SharedNothingBackupQuorum.class);
@@ -236,13 +237,9 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
}
}
- public void notifyRegistrationFailed() {
- signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
- latch.countDown();
- }
-
- public void notifyAlreadyReplicating() {
- signal = BACKUP_ACTIVATION.ALREADY_REPLICATING;
+ @Override
+ public void onBackupRegistrationFailed(boolean alreadyReplicating) {
+ signal = alreadyReplicating ? BACKUP_ACTIVATION.ALREADY_REPLICATING : BACKUP_ACTIVATION.FAILURE_REPLICATING;
latch.countDown();
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileMoveManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileMoveManager.java
index f29e4a1741..5e2c1cb441 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileMoveManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileMoveManager.java
@@ -36,6 +36,7 @@ public class FileMoveManager {
private static final Logger logger = Logger.getLogger(FileMoveManager.class);
private final File folder;
+ private final String[] prefixesToPreserve;
private int maxFolders;
public static final String PREFIX = "oldreplica.";
@@ -70,9 +71,10 @@ public class FileMoveManager {
this(folder, -1);
}
- public FileMoveManager(File folder, int maxFolders) {
+ public FileMoveManager(File folder, int maxFolders, String... prefixesToPreserve) {
this.folder = folder;
this.maxFolders = maxFolders;
+ this.prefixesToPreserve = prefixesToPreserve != null ? Arrays.copyOf(prefixesToPreserve, prefixesToPreserve.length) : null;
}
public int getMaxFolders() {
@@ -99,8 +101,23 @@ public class FileMoveManager {
ActiveMQServerLogger.LOGGER.backupDeletingData(folder.getPath());
for (String fileMove : files) {
File fileFrom = new File(folder, fileMove);
- logger.tracef("deleting %s", fileFrom);
- deleteTree(fileFrom);
+ if (prefixesToPreserve != null) {
+ boolean skip = false;
+ for (String prefixToPreserve : prefixesToPreserve) {
+ if (fileMove.startsWith(prefixToPreserve)) {
+ logger.tracef("skipping %s", fileFrom);
+ skip = true;
+ break;
+ }
+ }
+ if (!skip) {
+ logger.tracef("deleting %s", fileFrom);
+ deleteTree(fileFrom);
+ }
+ } else {
+ logger.tracef("deleting %s", fileFrom);
+ deleteTree(fileFrom);
+ }
}
} else {
// Since we will create one folder, we are already taking that one into consideration
@@ -113,8 +130,26 @@ public class FileMoveManager {
for (String fileMove : files) {
File fileFrom = new File(folder, fileMove);
File fileTo = new File(folderTo, fileMove);
- logger.tracef("doMove:: moving %s as %s", fileFrom, fileTo);
- Files.move(fileFrom.toPath(), fileTo.toPath());
+ if (prefixesToPreserve != null) {
+ boolean copy = false;
+ for (String prefixToPreserve : prefixesToPreserve) {
+ if (fileMove.startsWith(prefixToPreserve)) {
+ logger.tracef("skipping %s", fileFrom);
+ copy = true;
+ break;
+ }
+ }
+ if (copy) {
+ logger.tracef("copying %s to %s", fileFrom, fileTo);
+ Files.copy(fileFrom.toPath(), fileTo.toPath());
+ } else {
+ logger.tracef("doMove:: moving %s as %s", fileFrom, fileTo);
+ Files.move(fileFrom.toPath(), fileTo.toPath());
+ }
+ } else {
+ logger.tracef("doMove:: moving %s as %s", fileFrom, fileTo);
+ Files.move(fileFrom.toPath(), fileTo.toPath());
+ }
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/Activation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/Activation.java
index 0eab1ba30f..a055036ef8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/Activation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/Activation.java
@@ -110,4 +110,8 @@ public abstract class Activation implements Runnable {
public ReplicationManager getReplicationManager() {
return null;
}
+
+ public boolean isReplicaSync() {
+ return false;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 740c3f7b8d..cfd929199f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -109,7 +109,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
-import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
@@ -797,14 +796,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
- @Override
- public ReplicationEndpoint getReplicationEndpoint() {
- if (activation instanceof SharedNothingBackupActivation) {
- return ((SharedNothingBackupActivation) activation).getReplicationEndpoint();
- }
- return null;
- }
-
@Override
public void unlockActivation() {
activationLock.release();
@@ -921,7 +912,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return threadPool;
}
- public void setActivation(SharedNothingLiveActivation activation) {
+ public void setActivation(Activation activation) {
this.activation = activation;
}
@@ -1145,19 +1136,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public boolean isReplicaSync() {
- if (activation instanceof SharedNothingLiveActivation) {
- ReplicationManager replicationManager = getReplicationManager();
-
- if (replicationManager == null) {
- return false;
- } else {
- return !replicationManager.isSynchronizing();
- }
- } else if (activation instanceof SharedNothingBackupActivation) {
- return ((SharedNothingBackupActivation) activation).isRemoteBackupUpToDate();
- } else {
- return false;
- }
+ return activation.isReplicaSync();
}
public void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) {
@@ -3116,7 +3095,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);
// This can't be created until node id is set
- clusterManager = new ClusterManager(executorFactory, this, postOffice, scheduledPool, managementService, configuration, nodeManager, haPolicy.isBackup());
+ clusterManager = new ClusterManager(executorFactory, this, postOffice, scheduledPool, managementService, configuration, nodeManager, haPolicy.useQuorumManager());
federationManager = new FederationManager(this);
@@ -4191,10 +4170,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* move any older data away and log a warning about it.
*/
void moveServerData(int maxSavedReplicated) throws IOException {
+ moveServerData(maxSavedReplicated, false);
+ }
+
+ void moveServerData(int maxSavedReplicated, boolean preserveLockFiles) throws IOException {
File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(), configuration.getPagingLocation(), configuration.getLargeMessagesLocation()};
for (File data : dataDirs) {
- FileMoveManager moveManager = new FileMoveManager(data, maxSavedReplicated);
+ final boolean isLockFolder = preserveLockFiles ? data.equals(configuration.getNodeManagerLockLocation()) : false;
+ final String[] lockPrefixes = isLockFolder ? new String[]{FileBasedNodeManager.SERVER_LOCK_NAME, "serverlock"} : null;
+ FileMoveManager moveManager = new FileMoveManager(data, maxSavedReplicated, lockPrefixes);
moveManager.doMove();
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
index 015339aafe..de4b409942 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
@@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
-import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
/**
@@ -47,8 +46,9 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
private String nodeID;
- public AnyLiveNodeLocatorForReplication(SharedNothingBackupQuorum backupQuorum, ActiveMQServerImpl server, long retryReplicationWait) {
- super(backupQuorum);
+ public AnyLiveNodeLocatorForReplication(BackupRegistrationListener backupRegistrationListener,
+ ActiveMQServerImpl server, long retryReplicationWait) {
+ super(backupRegistrationListener);
this.server = server;
this.retryReplicationWait = retryReplicationWait;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ClusterTopologySearch.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ClusterTopologySearch.java
new file mode 100644
index 0000000000..a8ceef42b1
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ClusterTopologySearch.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.ConfigurationUtils;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.jboss.logging.Logger;
+
+/**
+ * This class contains some utils to allow a broker to check presence and role of another broker in the cluster.
+ */
+final class ClusterTopologySearch {
+
+ private ClusterTopologySearch() {
+
+ }
+
+ /**
+ * Determines whether there is a live server already running with nodeID.
+ * This search isn't filtering the caller broker transport and is meant to be used
+ * when the broker acceptors aren't running yet.
+ */
+ public static boolean searchActiveLiveNodeId(String clusterName,
+ String nodeId,
+ long timeout,
+ TimeUnit unit,
+ Configuration serverConfiguration) throws ActiveMQException {
+ if (serverConfiguration.getClusterConfigurations().isEmpty())
+ return false;
+ final ClusterConnectionConfiguration clusterConnectionConfiguration = ConfigurationUtils.getReplicationClusterConfiguration(serverConfiguration, clusterName);
+
+ final LiveNodeIdListener liveNodeIdListener = new LiveNodeIdListener(nodeId, serverConfiguration.getClusterUser(), serverConfiguration.getClusterPassword());
+
+ try (ServerLocatorInternal locator = createLocator(serverConfiguration, clusterConnectionConfiguration)) {
+ // if would like to filter out a transport configuration:
+ // locator.setClusterTransportConfiguration(callerBrokerTransportConfiguration)
+ locator.addClusterTopologyListener(liveNodeIdListener);
+ locator.setReconnectAttempts(0);
+ try (ClientSessionFactoryInternal ignored = locator.connectNoWarnings()) {
+ return liveNodeIdListener.awaitNodePresent(timeout, unit);
+ } catch (Exception notConnected) {
+ if (!(notConnected instanceof ActiveMQException) || ActiveMQExceptionType.INTERNAL_ERROR.equals(((ActiveMQException) notConnected).getType())) {
+ // report all exceptions that aren't ActiveMQException and all INTERNAL_ERRORs
+ ActiveMQServerLogger.LOGGER.failedConnectingToCluster(notConnected);
+ }
+ return false;
+ }
+ }
+ }
+
+ private static final class LiveNodeIdListener implements ClusterTopologyListener {
+
+ private static final Logger logger = Logger.getLogger(LiveNodeIdListener.class);
+ private final String nodeId;
+ private final String user;
+ private final String password;
+ private final CountDownLatch searchCompleted;
+ private boolean isNodePresent = false;
+
+ LiveNodeIdListener(String nodeId, String user, String password) {
+ this.nodeId = nodeId;
+ this.user = user;
+ this.password = password;
+ this.searchCompleted = new CountDownLatch(1);
+ }
+
+ @Override
+ public void nodeUP(TopologyMember topologyMember, boolean last) {
+ boolean isOurNodeId = nodeId != null && nodeId.equals(topologyMember.getNodeId());
+ if (isOurNodeId && isActive(topologyMember.getLive())) {
+ isNodePresent = true;
+ }
+ if (isOurNodeId || last) {
+ searchCompleted.countDown();
+ }
+ }
+
+ public boolean awaitNodePresent(long timeout, TimeUnit unit) throws InterruptedException {
+ searchCompleted.await(timeout, unit);
+ return isNodePresent;
+ }
+
+ /**
+ * In a cluster of replicated live/backup pairs if a backup crashes and then its live crashes the cluster will
+ * retain the topology information of the live such that when the live server restarts it will check the
+ * cluster to see if its nodeID is present (which it will be) and then it will activate as a backup rather than
+ * a live. To prevent this situation an additional check is necessary to see if the server with the matching
+ * nodeID is actually active or not which is done by attempting to make a connection to it.
+ *
+ * @param transportConfiguration
+ * @return
+ */
+ private boolean isActive(TransportConfiguration transportConfiguration) {
+ try (ServerLocator serverLocator = ActiveMQClient.createServerLocator(false, transportConfiguration);
+ ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory();
+ ClientSession clientSession = clientSessionFactory.createSession(user, password, false, false, false, false, 0)) {
+ return true;
+ } catch (Exception e) {
+ logger.debug("isActive check failed", e);
+ return false;
+ }
+ }
+
+ @Override
+ public void nodeDown(long eventUID, String nodeID) {
+ // no-op
+ }
+ }
+
+ private static ServerLocatorInternal createLocator(Configuration configuration,
+ ClusterConnectionConfiguration config) throws ActiveMQException {
+ final ServerLocatorInternal locator;
+ if (config.getDiscoveryGroupName() != null) {
+ DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
+
+ if (dg == null) {
+ throw ActiveMQMessageBundle.BUNDLE.noDiscoveryGroupFound(null);
+ }
+ locator = (ServerLocatorInternal) ActiveMQClient.createServerLocatorWithHA(dg);
+ } else {
+ TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? configuration.getTransportConfigurations(config.getStaticConnectors()) : null;
+
+ locator = (ServerLocatorInternal) ActiveMQClient.createServerLocatorWithHA(tcConfigs);
+ }
+ return locator;
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
index cfbcb47d02..4a8b59f27d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java
@@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -27,19 +28,66 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
public abstract class FileBasedNodeManager extends NodeManager {
protected static final byte FIRST_TIME_START = '0';
public static final String SERVER_LOCK_NAME = "server.lock";
+ public static final String DATA_VERSION_NAME = "server.data.version";
private static final String ACCESS_MODE = "rw";
private final File directory;
protected FileChannel channel;
+ protected FileChannel dataVersionChannel;
public FileBasedNodeManager(boolean replicatedBackup, File directory) {
super(replicatedBackup);
this.directory = directory;
}
+ protected void useDataVersionChannel() throws IOException {
+ if (dataVersionChannel != null) {
+ return;
+ }
+ dataVersionChannel = FileChannel.open(newFile(DATA_VERSION_NAME).toPath(), READ, WRITE, CREATE_NEW);
+ }
+
+ @Override
+ public long readDataVersion() throws NodeManagerException {
+ if (!isStarted()) {
+ throw new NodeManagerException(new IllegalStateException("node manager must be started first"));
+ }
+ try {
+ useDataVersionChannel();
+ ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
+ if (dataVersionChannel.read(tmpBuffer, 0) != Long.BYTES) {
+ return 0;
+ }
+ tmpBuffer.flip();
+ return tmpBuffer.getLong(0);
+ } catch (IOException ie) {
+ throw new NodeManagerException(ie);
+ }
+ }
+
+ @Override
+ public void writeDataVersion(long version) throws NodeManagerException {
+ if (!isStarted()) {
+ throw new NodeManagerException(new IllegalStateException("node manager must be started first"));
+ }
+ try {
+ useDataVersionChannel();
+ ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
+ tmpBuffer.putLong(0, version);
+ dataVersionChannel.write(tmpBuffer, 0);
+ dataVersionChannel.force(false);
+ } catch (IOException ie) {
+ throw new NodeManagerException(ie);
+ }
+ }
+
/**
* Ensures existence of persistent information about the server's nodeID.
*
@@ -137,9 +185,20 @@ public abstract class FileBasedNodeManager extends NodeManager {
@Override
public synchronized void stop() throws Exception {
FileChannel channelCopy = channel;
- if (channelCopy != null)
- channelCopy.close();
- super.stop();
+ try {
+ if (channelCopy != null)
+ channelCopy.close();
+ } finally {
+ try {
+ FileChannel dataVersionChannel = this.dataVersionChannel;
+ this.dataVersionChannel = null;
+ if (dataVersionChannel != null) {
+ dataVersionChannel.close();
+ }
+ } finally {
+ super.stop();
+ }
+ }
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeIdLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeIdLocatorForReplication.java
new file mode 100644
index 0000000000..58cb32a4b9
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeIdLocatorForReplication.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.utils.ConcurrentUtil;
+
+/**
+ * It looks for a live server in the cluster with a specific NodeID
+ */
+public class NamedLiveNodeIdLocatorForReplication extends LiveNodeLocator {
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+ private final String nodeID;
+ private final long retryReplicationWait;
+ private final Queue> liveConfigurations = new LinkedList<>();
+ private final ArrayList> triedConfigurations = new ArrayList<>();
+ private boolean found;
+
+ public NamedLiveNodeIdLocatorForReplication(String nodeID,
+ BackupRegistrationListener backupRegistrationListener,
+ long retryReplicationWait) {
+ super(backupRegistrationListener);
+ this.nodeID = nodeID;
+ this.retryReplicationWait = retryReplicationWait;
+ }
+
+ @Override
+ public void locateNode() throws ActiveMQException {
+ locateNode(-1L);
+ }
+
+ @Override
+ public void locateNode(long timeout) throws ActiveMQException {
+ try {
+ lock.lock();
+ if (liveConfigurations.size() == 0) {
+ try {
+ if (timeout != -1L) {
+ ConcurrentUtil.await(condition, timeout);
+ } else {
+ while (liveConfigurations.size() == 0) {
+ condition.await(retryReplicationWait, TimeUnit.MILLISECONDS);
+ liveConfigurations.addAll(triedConfigurations);
+ triedConfigurations.clear();
+ }
+ }
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void nodeUP(TopologyMember topologyMember, boolean last) {
+ try {
+ lock.lock();
+ if (nodeID.equals(topologyMember.getNodeId()) && topologyMember.getLive() != null) {
+ Pair liveConfiguration = new Pair<>(topologyMember.getLive(), topologyMember.getBackup());
+ if (!liveConfigurations.contains(liveConfiguration)) {
+ liveConfigurations.add(liveConfiguration);
+ }
+ found = true;
+ condition.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void nodeDown(long eventUID, String nodeID) {
+ //no op
+ }
+
+ @Override
+ public String getNodeID() {
+ return found ? nodeID : null;
+ }
+
+ @Override
+ public Pair getLiveConfiguration() {
+ return liveConfigurations.peek();
+ }
+
+ @Override
+ public void notifyRegistrationFailed(boolean alreadyReplicating) {
+ try {
+ lock.lock();
+ triedConfigurations.add(liveConfigurations.poll());
+ super.notifyRegistrationFailed(alreadyReplicating);
+ } finally {
+ lock.unlock();
+ }
+ }
+}
+
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
index 624808d1f0..c4775d2758 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
@@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
-import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
/**
@@ -48,8 +47,10 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
private String nodeID;
- public NamedLiveNodeLocatorForReplication(String backupGroupName, SharedNothingBackupQuorum quorumManager, long retryReplicationWait) {
- super(quorumManager);
+ public NamedLiveNodeLocatorForReplication(String backupGroupName,
+ BackupRegistrationListener backupRegistrationListener,
+ long retryReplicationWait) {
+ super(backupRegistrationListener);
this.backupGroupName = backupGroupName;
this.retryReplicationWait = retryReplicationWait;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java
new file mode 100644
index 0000000000..4441bf0018
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.core.server.NodeManager;;
+import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure;
+
+/**
+ * This activation can be used by a primary while trying to fail-back ie {@code failback == true} or
+ * by a natural-born backup ie {@code failback == false}.
+ */
+public final class ReplicationBackupActivation extends Activation implements DistributedPrimitiveManager.UnavailableManagerListener {
+
+ private static final Logger LOGGER = Logger.getLogger(ReplicationBackupActivation.class);
+
+ private final boolean wasLive;
+ private final ReplicationBackupPolicy policy;
+ private final ActiveMQServerImpl activeMQServer;
+ // This field is != null iff this node is a primary during a fail-back ie acting as a backup in order to become live again.
+ private final String expectedNodeID;
+ @GuardedBy("this")
+ private boolean closed;
+ private final DistributedPrimitiveManager distributedManager;
+ // Used for monitoring purposes
+ private volatile ReplicationObserver replicationObserver;
+ // Used for testing purposes
+ private volatile ReplicationEndpoint replicationEndpoint;
+ // Used for testing purposes
+ private Consumer onReplicationEndpointCreation;
+ // Used to arbiter one-shot server stop/restart
+ private final AtomicBoolean stopping;
+
+ public ReplicationBackupActivation(final ActiveMQServerImpl activeMQServer,
+ final boolean wasLive,
+ final DistributedPrimitiveManager distributedManager,
+ final ReplicationBackupPolicy policy) {
+ this.wasLive = wasLive;
+ this.activeMQServer = activeMQServer;
+ if (policy.isTryFailback()) {
+ final SimpleString serverNodeID = activeMQServer.getNodeID();
+ if (serverNodeID == null || serverNodeID.isEmpty()) {
+ throw new IllegalStateException("A failback activation must be biased around a specific NodeID");
+ }
+ this.expectedNodeID = serverNodeID.toString();
+ } else {
+ this.expectedNodeID = null;
+ }
+ this.distributedManager = distributedManager;
+ this.policy = policy;
+ this.replicationObserver = null;
+ this.replicationEndpoint = null;
+ this.stopping = new AtomicBoolean(false);
+ }
+
+ /**
+ * used for testing purposes.
+ */
+ public DistributedPrimitiveManager getDistributedManager() {
+ return distributedManager;
+ }
+
+ @Override
+ public void onUnavailableManagerEvent() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ }
+ LOGGER.info("Unavailable quorum service detected: try restart server");
+ asyncRestartServer(activeMQServer, true);
+ }
+
+ /**
+ * This util class exists because {@link LiveNodeLocator} need a {@link LiveNodeLocator.BackupRegistrationListener}
+ * to forward backup registration failure events: this is used to switch on/off backup registration event listening
+ * on an existing locator.
+ */
+ private static final class RegistrationFailureForwarder implements LiveNodeLocator.BackupRegistrationListener, AutoCloseable {
+
+ private static final LiveNodeLocator.BackupRegistrationListener NOOP_LISTENER = ignore -> {
+ };
+ private volatile LiveNodeLocator.BackupRegistrationListener listener = NOOP_LISTENER;
+
+ public RegistrationFailureForwarder to(LiveNodeLocator.BackupRegistrationListener listener) {
+ this.listener = listener;
+ return this;
+ }
+
+ @Override
+ public void onBackupRegistrationFailed(boolean alreadyReplicating) {
+ listener.onBackupRegistrationFailed(alreadyReplicating);
+ }
+
+ @Override
+ public void close() {
+ listener = NOOP_LISTENER;
+ }
+ }
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ }
+ try {
+ LOGGER.info("Trying to reach majority of quorum service nodes");
+ distributedManager.start();
+ LOGGER.info("Quorum service available: starting broker");
+ distributedManager.addUnavailableManagerListener(this);
+ // Stop the previous node manager and create a new one with NodeManager::replicatedBackup == true:
+ // NodeManager::start skip setup lock file with NodeID, until NodeManager::stopBackup is called.
+ activeMQServer.resetNodeManager();
+ activeMQServer.getNodeManager().stop();
+ // A primary need to preserve NodeID across runs
+ activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(), policy.isTryFailback());
+ activeMQServer.getNodeManager().start();
+ if (!activeMQServer.initialisePart1(false)) {
+ return;
+ }
+ synchronized (this) {
+ if (closed)
+ return;
+ }
+ final ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
+ clusterController.awaitConnectionToReplicationCluster();
+ activeMQServer.getBackupManager().start();
+ ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(),
+ activeMQServer.getNodeManager().getNodeId());
+ activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
+ final DistributedLock liveLock = replicateAndFailover(clusterController);
+ if (liveLock == null) {
+ return;
+ }
+ startAsLive(liveLock);
+ } catch (Exception e) {
+ if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted()) {
+ // do not log these errors if the server is being stopped.
+ return;
+ }
+ ActiveMQServerLogger.LOGGER.initializationError(e);
+ }
+ }
+
+ private void startAsLive(final DistributedLock liveLock) throws Exception {
+ activeMQServer.setHAPolicy(policy.getLivePolicy());
+
+ synchronized (activeMQServer) {
+ if (!activeMQServer.isStarted()) {
+ liveLock.close();
+ return;
+ }
+ ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
+ // stopBackup is going to write the NodeID previously set on the NodeManager,
+ // because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true.
+ activeMQServer.getNodeManager().stopBackup();
+ activeMQServer.getStorageManager().start();
+ activeMQServer.getBackupManager().activated();
+ // IMPORTANT:
+ // we're setting this activation JUST because it would allow the server to use its
+ // getActivationChannelHandler to handle replication
+ final ReplicationPrimaryActivation primaryActivation = new ReplicationPrimaryActivation(activeMQServer, distributedManager, policy.getLivePolicy());
+ liveLock.addListener(primaryActivation);
+ activeMQServer.setActivation(primaryActivation);
+ activeMQServer.initialisePart2(false);
+ // calling primaryActivation.stateChanged !isHelByCaller is necessary in case the lock was unavailable
+ // before liveLock.addListener: just throwing an exception won't stop the broker.
+ final boolean stillLive;
+ try {
+ stillLive = liveLock.isHeldByCaller();
+ } catch (UnavailableStateException e) {
+ LOGGER.warn(e);
+ primaryActivation.onUnavailableLockEvent();
+ throw new ActiveMQIllegalStateException("This server cannot check its role as a live: activation is failed");
+ }
+ if (!stillLive) {
+ primaryActivation.onUnavailableLockEvent();
+ throw new ActiveMQIllegalStateException("This server is not live anymore: activation is failed");
+ }
+ if (activeMQServer.getIdentity() != null) {
+ ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+ } else {
+ ActiveMQServerLogger.LOGGER.serverIsLive();
+ }
+ activeMQServer.completeActivation(true);
+ }
+ }
+
+ private LiveNodeLocator createLiveNodeLocator(final LiveNodeLocator.BackupRegistrationListener registrationListener) {
+ if (expectedNodeID != null) {
+ assert policy.isTryFailback();
+ return new NamedLiveNodeIdLocatorForReplication(expectedNodeID, registrationListener, policy.getRetryReplicationWait());
+ }
+ return policy.getGroupName() == null ?
+ new AnyLiveNodeLocatorForReplication(registrationListener, activeMQServer, policy.getRetryReplicationWait()) :
+ new NamedLiveNodeLocatorForReplication(policy.getGroupName(), registrationListener, policy.getRetryReplicationWait());
+ }
+
+ private DistributedLock replicateAndFailover(final ClusterController clusterController) throws ActiveMQException, InterruptedException {
+ final RegistrationFailureForwarder registrationFailureForwarder = new RegistrationFailureForwarder();
+ // node locator isn't stateless and contains a live-list of candidate nodes to connect too, hence
+ // it MUST be reused for each replicateLive attempt
+ final LiveNodeLocator nodeLocator = createLiveNodeLocator(registrationFailureForwarder);
+ clusterController.addClusterTopologyListenerForReplication(nodeLocator);
+ try {
+ while (true) {
+ synchronized (this) {
+ if (closed) {
+ return null;
+ }
+ }
+ final ReplicationFailure failure = replicateLive(clusterController, nodeLocator, registrationFailureForwarder);
+ if (failure == null) {
+ Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
+ continue;
+ }
+ if (!activeMQServer.isStarted()) {
+ return null;
+ }
+ LOGGER.debugf("ReplicationFailure = %s", failure);
+ boolean voluntaryFailOver = false;
+ switch (failure) {
+ case VoluntaryFailOver:
+ voluntaryFailOver = true;
+ case NonVoluntaryFailover:
+ final DistributedLock liveLock = tryAcquireLiveLock();
+ // from now on we're meant to stop:
+ // - due to failover
+ // - due to restart/stop
+ assert stopping.get();
+ if (liveLock != null) {
+ return liveLock;
+ }
+ boolean restart = true;
+ if (voluntaryFailOver && isFirstFailbackAttempt()) {
+ restart = false;
+ LOGGER.error("Failed to fail-back: stopping broker based on quorum results");
+ } else {
+ ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
+ }
+ // let's ignore the stopping flag here, we're in control of it
+ asyncRestartServer(activeMQServer, restart, false);
+ return null;
+ case RegistrationError:
+ LOGGER.error("Stopping broker because of critical registration error");
+ asyncRestartServer(activeMQServer, false);
+ return null;
+ case AlreadyReplicating:
+ // can just retry here, data should be clean and nodeLocator
+ // should remove the live node that has answered this
+ LOGGER.info("Live broker was already replicating: retry sync with another live");
+ continue;
+ case ClosedObserver:
+ return null;
+ case BackupNotInSync:
+ LOGGER.info("Replication failure while initial sync not yet completed: restart as backup");
+ asyncRestartServer(activeMQServer, true);
+ return null;
+ case WrongNodeId:
+ LOGGER.error("Stopping broker because of wrong node ID communication from live: maybe a misbehaving live?");
+ asyncRestartServer(activeMQServer, false);
+ return null;
+ default:
+ throw new AssertionError("Unsupported failure " + failure);
+ }
+ }
+ } finally {
+ silentExecution("Errored on cluster topology listener for replication cleanup", () -> clusterController.removeClusterTopologyListenerForReplication(nodeLocator));
+ }
+ }
+
+ /**
+ * {@code wasLive} is {code true} only while transitioning from primary to backup.
+ * If a natural born backup become live and allows failback, while transitioning to back again
+ * {@code wasLive} is still {@code false}.
+ * The check on {@link ReplicationBackupPolicy#isTryFailback()} is redundant but still useful for correctness.
+ *
+ * In case of fail-back, any event that's going to restart this broker as backup (eg quorum service unavailable
+ * or some replication failures) will cause {@code wasLive} to be {@code false}, because the HA policy set isn't
+ * a primary anymore.
+ */
+ private boolean isFirstFailbackAttempt() {
+ return wasLive && policy.isTryFailback();
+ }
+
+ private DistributedLock tryAcquireLiveLock() throws InterruptedException {
+ // disable quorum service unavailability handling and just treat this imperatively
+ if (!stopping.compareAndSet(false, true)) {
+ // already unavailable quorum service: fail fast
+ return null;
+ }
+ distributedManager.removeUnavailableManagerListener(this);
+ assert activeMQServer.getNodeManager().getNodeId() != null;
+ final String liveID = activeMQServer.getNodeManager().getNodeId().toString();
+ final int voteRetries = policy.getVoteRetries();
+ final long maxAttempts = voteRetries >= 0 ? (voteRetries + 1) : -1;
+ if (maxAttempts == -1) {
+ LOGGER.error("It's not safe to retry an infinite amount of time to acquire a live lock: please consider setting a vote-retries value");
+ }
+ final long voteRetryWait = policy.getVoteRetryWait();
+ final DistributedLock liveLock = getLock(distributedManager, liveID);
+ if (liveLock == null) {
+ return null;
+ }
+ for (long attempt = 0; maxAttempts >= 0 ? (attempt < maxAttempts) : true; attempt++) {
+ try {
+ if (liveLock.tryLock(voteRetryWait, TimeUnit.MILLISECONDS)) {
+ LOGGER.debugf("%s live lock acquired after %d attempts.", liveID, (attempt + 1));
+ return liveLock;
+ }
+ } catch (UnavailableStateException e) {
+ LOGGER.warnf(e, "Failed to acquire live lock %s because of unavailable quorum service: stop trying", liveID);
+ distributedManager.stop();
+ return null;
+ }
+ }
+ LOGGER.warnf("Failed to acquire live lock %s after %d tries", liveID, maxAttempts);
+ distributedManager.stop();
+ return null;
+ }
+
+ private DistributedLock getLock(final DistributedPrimitiveManager manager,
+ final String lockId) throws InterruptedException {
+ if (!manager.isStarted()) {
+ return null;
+ }
+ try {
+ return manager.getDistributedLock(lockId);
+ } catch (ExecutionException e) {
+ LOGGER.warnf(e, "Errored while getting lock %s", lockId);
+ return null;
+ } catch (TimeoutException te) {
+ LOGGER.warnf(te, "Timeout while getting lock %s", lockId);
+ return null;
+ }
+ }
+
+ private ReplicationObserver replicationObserver() {
+ if (policy.isTryFailback()) {
+ return ReplicationObserver.failbackObserver(activeMQServer.getNodeManager(), activeMQServer.getBackupManager(), activeMQServer.getScheduledPool(), expectedNodeID);
+ }
+ return ReplicationObserver.failoverObserver(activeMQServer.getNodeManager(), activeMQServer.getBackupManager(), activeMQServer.getScheduledPool());
+ }
+
+ private ReplicationFailure replicateLive(final ClusterController clusterController,
+ final LiveNodeLocator liveLocator,
+ final RegistrationFailureForwarder registrationFailureForwarder) throws ActiveMQException {
+ try (ReplicationObserver replicationObserver = replicationObserver();
+ RegistrationFailureForwarder ignored = registrationFailureForwarder.to(replicationObserver)) {
+ this.replicationObserver = replicationObserver;
+ clusterController.addClusterTopologyListener(replicationObserver);
+ // ReplicationError notifies backup registration failures to live locator -> forwarder -> observer
+ final ReplicationError replicationError = new ReplicationError(liveLocator);
+ clusterController.addIncomingInterceptorForReplication(replicationError);
+ try {
+ final ClusterControl liveControl = tryLocateAndConnectToLive(liveLocator, clusterController);
+ if (liveControl == null) {
+ return null;
+ }
+ try {
+ final ReplicationEndpoint replicationEndpoint = tryAuthorizeAndAsyncRegisterAsBackupToLive(liveControl, replicationObserver);
+ if (replicationEndpoint == null) {
+ return ReplicationFailure.RegistrationError;
+ }
+ this.replicationEndpoint = replicationEndpoint;
+ assert replicationEndpoint != null;
+ try {
+ return replicationObserver.awaitReplicationFailure();
+ } finally {
+ this.replicationEndpoint = null;
+ ActiveMQServerImpl.stopComponent(replicationEndpoint);
+ closeChannelOf(replicationEndpoint);
+ }
+ } finally {
+ silentExecution("Errored on live control close", liveControl::close);
+ }
+ } finally {
+ silentExecution("Errored on cluster topology listener cleanup", () -> clusterController.removeClusterTopologyListener(replicationObserver));
+ silentExecution("Errored while removing incoming interceptor for replication", () -> clusterController.removeIncomingInterceptorForReplication(replicationError));
+ }
+ } finally {
+ this.replicationObserver = null;
+ }
+ }
+
+ private static void silentExecution(String debugErrorMessage, Runnable task) {
+ try {
+ task.run();
+ } catch (Throwable ignore) {
+ LOGGER.debug(debugErrorMessage, ignore);
+ }
+ }
+
+ private static void closeChannelOf(final ReplicationEndpoint replicationEndpoint) {
+ if (replicationEndpoint == null) {
+ return;
+ }
+ if (replicationEndpoint.getChannel() != null) {
+ silentExecution("Errored while closing replication endpoint channel", () -> replicationEndpoint.getChannel().close());
+ replicationEndpoint.setChannel(null);
+ }
+ }
+
+ private boolean asyncRestartServer(final ActiveMQServer server, boolean restart) {
+ return asyncRestartServer(server, restart, true);
+ }
+
+ private boolean asyncRestartServer(final ActiveMQServer server, boolean restart, boolean checkStopping) {
+ if (checkStopping) {
+ if (!stopping.compareAndSet(false, true)) {
+ return false;
+ }
+ }
+ new Thread(() -> {
+ if (server.getState() != ActiveMQServer.SERVER_STATE.STOPPED && server.getState() != ActiveMQServer.SERVER_STATE.STOPPING) {
+ try {
+ server.stop(!restart);
+ if (restart) {
+ server.start();
+ }
+ } catch (Exception e) {
+ if (restart) {
+ ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, server);
+ } else {
+ ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
+ }
+ }
+ }
+ }).start();
+ return true;
+ }
+
+ private ClusterControl tryLocateAndConnectToLive(final LiveNodeLocator liveLocator,
+ final ClusterController clusterController) throws ActiveMQException {
+ liveLocator.locateNode();
+ final Pair possibleLive = liveLocator.getLiveConfiguration();
+ final String nodeID = liveLocator.getNodeID();
+ if (nodeID == null) {
+ throw new RuntimeException("Could not establish the connection with any live");
+ }
+ if (!policy.isTryFailback()) {
+ assert expectedNodeID == null;
+ activeMQServer.getNodeManager().setNodeID(nodeID);
+ } else {
+ assert expectedNodeID.equals(nodeID);
+ }
+ if (possibleLive == null) {
+ return null;
+ }
+ final ClusterControl liveControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getA());
+ if (liveControl != null) {
+ return liveControl;
+ }
+ return tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getB());
+ }
+
+ private static ClusterControl tryConnectToNodeInReplicatedCluster(final ClusterController clusterController,
+ final TransportConfiguration tc) {
+ try {
+ if (tc != null) {
+ return clusterController.connectToNodeInReplicatedCluster(tc);
+ }
+ } catch (Exception e) {
+ LOGGER.debug(e.getMessage(), e);
+ }
+ return null;
+ }
+
+ @Override
+ public void close(final boolean permanently, final boolean restarting) throws Exception {
+ synchronized (this) {
+ closed = true;
+ final ReplicationObserver replicationObserver = this.replicationObserver;
+ if (replicationObserver != null) {
+ replicationObserver.close();
+ }
+ }
+ //we have to check as the server policy may have changed
+ try {
+ if (activeMQServer.getHAPolicy().isBackup()) {
+ // To avoid a NPE cause by the stop
+ final NodeManager nodeManager = activeMQServer.getNodeManager();
+
+ activeMQServer.interruptActivationThread(nodeManager);
+
+ if (nodeManager != null) {
+ nodeManager.stopBackup();
+ }
+ }
+ } finally {
+ // this one need to happen after interrupting the activation thread
+ // in order to unblock distributedManager::start
+ distributedManager.stop();
+ }
+ }
+
+ @Override
+ public void preStorageClose() throws Exception {
+ // TODO replication endpoint close?
+ }
+
+ private ReplicationEndpoint tryAuthorizeAndAsyncRegisterAsBackupToLive(final ClusterControl liveControl,
+ final ReplicationObserver liveObserver) {
+ ReplicationEndpoint replicationEndpoint = null;
+ try {
+ liveControl.getSessionFactory().setReconnectAttempts(1);
+ liveObserver.listenConnectionFailuresOf(liveControl.getSessionFactory());
+ liveControl.authorize();
+ replicationEndpoint = new ReplicationEndpoint(activeMQServer, policy.isTryFailback(), liveObserver);
+ final Consumer onReplicationEndpointCreation = this.onReplicationEndpointCreation;
+ if (onReplicationEndpointCreation != null) {
+ onReplicationEndpointCreation.accept(replicationEndpoint);
+ }
+ replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
+ connectToReplicationEndpoint(liveControl, replicationEndpoint);
+ replicationEndpoint.start();
+ liveControl.announceReplicatingBackupToLive(policy.isTryFailback(), policy.getClusterName());
+ return replicationEndpoint;
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.replicationStartProblem(e);
+ ActiveMQServerImpl.stopComponent(replicationEndpoint);
+ closeChannelOf(replicationEndpoint);
+ return null;
+ }
+ }
+
+ private static boolean connectToReplicationEndpoint(final ClusterControl liveControl,
+ final ReplicationEndpoint replicationEndpoint) {
+ final Channel replicationChannel = liveControl.createReplicationChannel();
+ replicationChannel.setHandler(replicationEndpoint);
+ replicationEndpoint.setChannel(replicationChannel);
+ return true;
+ }
+
+ @Override
+ public boolean isReplicaSync() {
+ // NOTE: this method is just for monitoring purposes, not suitable to perform logic!
+ // During a failover this backup won't have any active liveObserver and will report `false`!!
+ final ReplicationObserver liveObserver = this.replicationObserver;
+ if (liveObserver == null) {
+ return false;
+ }
+ return liveObserver.isBackupUpToDate();
+ }
+
+ public ReplicationEndpoint getReplicationEndpoint() {
+ return replicationEndpoint;
+ }
+
+ /**
+ * This must be used just for testing purposes.
+ */
+ public void spyReplicationEndpointCreation(Consumer onReplicationEndpointCreation) {
+ Objects.requireNonNull(onReplicationEndpointCreation);
+ this.onReplicationEndpointCreation = onReplicationEndpointCreation;
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java
new file mode 100644
index 0000000000..be2737c26c
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationObserver.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
+import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.server.LiveNodeLocator.BackupRegistrationListener;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.cluster.BackupManager;
+import org.jboss.logging.Logger;
+
+final class ReplicationObserver implements ClusterTopologyListener, SessionFailureListener, BackupRegistrationListener, ReplicationEndpoint.ReplicationEndpointEventListener, AutoCloseable {
+
+ private static final Logger LOGGER = Logger.getLogger(ReplicationObserver.class);
+
+ public enum ReplicationFailure {
+ VoluntaryFailOver, BackupNotInSync, NonVoluntaryFailover, RegistrationError, AlreadyReplicating, ClosedObserver, WrongNodeId;
+ }
+
+ private final NodeManager nodeManager;
+ private final BackupManager backupManager;
+ private final ScheduledExecutorService scheduledPool;
+ private final boolean failback;
+ private final String expectedNodeID;
+ private final CompletableFuture replicationFailure;
+
+ @GuardedBy("this")
+ private ClientSessionFactoryInternal sessionFactory;
+ @GuardedBy("this")
+ private CoreRemotingConnection connection;
+ @GuardedBy("this")
+ private ScheduledFuture> forcedFailover;
+
+ private volatile String liveID;
+ private volatile boolean backupUpToDate;
+ private volatile boolean closed;
+
+ /**
+ * This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage}
+ * with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with
+ * {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#FAIL_OVER}.
+ *
+ * If the second message does come within this dead line, we fail over anyway.
+ */
+ public static final int WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG = 60;
+
+ private ReplicationObserver(final NodeManager nodeManager,
+ final BackupManager backupManager,
+ final ScheduledExecutorService scheduledPool,
+ final boolean failback,
+ final String expectedNodeID) {
+ this.nodeManager = nodeManager;
+ this.backupManager = backupManager;
+ this.scheduledPool = scheduledPool;
+ this.failback = failback;
+ this.expectedNodeID = expectedNodeID;
+ this.replicationFailure = new CompletableFuture<>();
+
+ this.sessionFactory = null;
+ this.connection = null;
+ this.forcedFailover = null;
+
+ this.liveID = null;
+ this.backupUpToDate = false;
+ this.closed = false;
+ }
+
+ public static ReplicationObserver failbackObserver(final NodeManager nodeManager,
+ final BackupManager backupManager,
+ final ScheduledExecutorService scheduledPool,
+ final String expectedNodeID) {
+ Objects.requireNonNull(expectedNodeID);
+ return new ReplicationObserver(nodeManager, backupManager, scheduledPool, true, expectedNodeID);
+ }
+
+ public static ReplicationObserver failoverObserver(final NodeManager nodeManager,
+ final BackupManager backupManager,
+ final ScheduledExecutorService scheduledPool) {
+ return new ReplicationObserver(nodeManager, backupManager, scheduledPool, false, null);
+ }
+
+ private void onLiveDown(boolean voluntaryFailover) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ synchronized (this) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ stopForcedFailoverAfterDelay();
+ unlistenConnectionFailures();
+ if (!isRemoteBackupUpToDate()) {
+ replicationFailure.complete(ReplicationFailure.BackupNotInSync);
+ } else if (voluntaryFailover) {
+ replicationFailure.complete(ReplicationFailure.VoluntaryFailOver);
+ } else {
+ replicationFailure.complete(ReplicationFailure.NonVoluntaryFailover);
+ }
+ }
+ }
+
+ @Override
+ public void nodeDown(long eventUID, String nodeID) {
+ // ignore it during a failback:
+ // a failing slave close all connections but the one used for replication
+ // triggering a nodeDown before the restarted master receive a STOP_CALLED from it.
+ // This can make master to fire a useless quorum vote during a normal failback.
+ if (failback) {
+ return;
+ }
+ if (nodeID.equals(liveID)) {
+ onLiveDown(false);
+ }
+ }
+
+ @Override
+ public void nodeUP(TopologyMember member, boolean last) {
+ }
+
+ /**
+ * if the connection to our replicated live goes down then decide on an action
+ */
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+ onLiveDown(false);
+ }
+
+ @Override
+ public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
+ connectionFailed(me, failedOver);
+ }
+
+ @Override
+ public void beforeReconnect(ActiveMQException exception) {
+ //noop
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ unlistenConnectionFailures();
+ closed = true;
+ replicationFailure.complete(ReplicationFailure.ClosedObserver);
+ }
+ }
+
+ /**
+ * @param liveSessionFactory the session factory used to connect to the live server
+ */
+ public synchronized void listenConnectionFailuresOf(final ClientSessionFactoryInternal liveSessionFactory) {
+ if (closed) {
+ throw new IllegalStateException("the observer is closed: cannot listen to any failures");
+ }
+ if (sessionFactory != null || connection != null) {
+ throw new IllegalStateException("this observer is already listening to other session factory failures");
+ }
+ this.sessionFactory = liveSessionFactory;
+ //belts and braces, there are circumstances where the connection listener doesn't get called but the session does.
+ this.sessionFactory.addFailureListener(this);
+ connection = (CoreRemotingConnection) liveSessionFactory.getConnection();
+ connection.addFailureListener(this);
+ }
+
+ public synchronized void unlistenConnectionFailures() {
+ if (connection != null) {
+ connection.removeFailureListener(this);
+ connection = null;
+ }
+ if (sessionFactory != null) {
+ sessionFactory.removeFailureListener(this);
+ sessionFactory = null;
+ }
+ }
+
+ @Override
+ public void onBackupRegistrationFailed(boolean alreadyReplicating) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ synchronized (this) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ stopForcedFailoverAfterDelay();
+ unlistenConnectionFailures();
+ replicationFailure.complete(alreadyReplicating ? ReplicationFailure.AlreadyReplicating : ReplicationFailure.RegistrationError);
+ }
+ }
+
+ public ReplicationFailure awaitReplicationFailure() {
+ try {
+ return replicationFailure.get();
+ } catch (Throwable e) {
+ return ReplicationFailure.ClosedObserver;
+ }
+ }
+
+ private synchronized void scheduleForcedFailoverAfterDelay() {
+ if (forcedFailover != null) {
+ return;
+ }
+ forcedFailover = scheduledPool.schedule(() -> onLiveDown(false), WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS);
+ }
+
+ private synchronized void stopForcedFailoverAfterDelay() {
+ if (forcedFailover == null) {
+ return;
+ }
+ forcedFailover.cancel(false);
+ forcedFailover = null;
+ }
+
+ @Override
+ public void onRemoteBackupUpToDate() {
+ if (backupUpToDate || closed || replicationFailure.isDone()) {
+ return;
+ }
+ synchronized (this) {
+ if (backupUpToDate || closed || replicationFailure.isDone()) {
+ return;
+ }
+ assert liveID != null;
+ backupManager.announceBackup();
+ backupUpToDate = true;
+ }
+ }
+
+ public boolean isBackupUpToDate() {
+ return backupUpToDate;
+ }
+
+ public String getLiveID() {
+ return liveID;
+ }
+
+ private boolean validateNodeId(String nodeID) {
+ if (nodeID == null) {
+ return false;
+ }
+ final String existingNodeId = this.liveID;
+ if (existingNodeId == null) {
+ if (!failback) {
+ return true;
+ }
+ return nodeID.equals(expectedNodeID);
+ }
+ return existingNodeId.equals(nodeID);
+ }
+
+ @Override
+ public void onLiveNodeId(String nodeId) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ final String existingNodeId = this.liveID;
+ if (existingNodeId != null && existingNodeId.equals(nodeId)) {
+ return;
+ }
+ synchronized (this) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ if (!validateNodeId(nodeId)) {
+ stopForcedFailoverAfterDelay();
+ unlistenConnectionFailures();
+ replicationFailure.complete(ReplicationFailure.WrongNodeId);
+ } else if (liveID == null) {
+ liveID = nodeId;
+ nodeManager.setNodeID(nodeId);
+ }
+ }
+ }
+
+ public boolean isRemoteBackupUpToDate() {
+ return backupUpToDate;
+ }
+
+ @Override
+ public void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ synchronized (this) {
+ if (closed || replicationFailure.isDone()) {
+ return;
+ }
+ switch (finalMessage) {
+ case STOP_CALLED:
+ scheduleForcedFailoverAfterDelay();
+ break;
+ case FAIL_OVER:
+ onLiveDown(true);
+ break;
+ default:
+ LOGGER.errorf("unsupported LiveStopping type: %s", finalMessage);
+ }
+ }
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java
new file mode 100644
index 0000000000..285f516220
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.remoting.server.RemotingService;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.core.server.impl.ClusterTopologySearch.searchActiveLiveNodeId;
+
+/**
+ * This is going to be {@link #run()} just by natural born primary, at the first start.
+ * Both during a failover or a failback, {@link #run()} isn't going to be used, but only {@link #getActivationChannelHandler(Channel, Acceptor)}.
+ */
+public class ReplicationPrimaryActivation extends LiveActivation implements DistributedLock.UnavailableLockListener {
+
+ private static final Logger LOGGER = Logger.getLogger(ReplicationPrimaryActivation.class);
+ private static final long DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS = 20_000;
+ private static final long BLOCKING_CALLS_TIMEOUT_MILLIS = 5_000;
+
+ private final ReplicationPrimaryPolicy policy;
+
+ private final ActiveMQServerImpl activeMQServer;
+
+ @GuardedBy("replicationLock")
+ private ReplicationManager replicationManager;
+
+ private final Object replicationLock;
+
+ private final DistributedPrimitiveManager distributedManager;
+
+ private volatile boolean stoppingServer;
+
+ public ReplicationPrimaryActivation(final ActiveMQServerImpl activeMQServer,
+ final DistributedPrimitiveManager distributedManager,
+ final ReplicationPrimaryPolicy policy) {
+ this.activeMQServer = activeMQServer;
+ this.policy = policy;
+ this.replicationLock = new Object();
+ this.distributedManager = distributedManager;
+ }
+
+ /**
+ * used for testing purposes.
+ */
+ public DistributedPrimitiveManager getDistributedManager() {
+ return distributedManager;
+ }
+
+ @Override
+ public void freezeConnections(RemotingService remotingService) {
+ final ReplicationManager replicationManager = getReplicationManager();
+
+ if (remotingService != null && replicationManager != null) {
+ remotingService.freeze(null, replicationManager.getBackupTransportConnection());
+ } else if (remotingService != null) {
+ remotingService.freeze(null, null);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ final NodeManager nodeManager = activeMQServer.getNodeManager();
+
+ final String nodeId = nodeManager.readNodeId().toString();
+
+ final long dataVersion = nodeManager.readDataVersion();
+
+ final DistributedLock liveLock = searchLiveOrAcquireLiveLock(nodeId, BLOCKING_CALLS_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ if (liveLock == null) {
+ return;
+ }
+
+ activeMQServer.initialisePart1(false);
+
+ activeMQServer.initialisePart2(false);
+
+ // must be registered before checking the caller
+ liveLock.addListener(this);
+
+ // This control is placed here because initialisePart2 is going to load the journal that
+ // could pause the JVM for enough time to lose lock ownership
+ if (!liveLock.isHeldByCaller()) {
+ throw new IllegalStateException("This broker isn't live anymore, probably due to application pauses eg GC, OS etc: failing now");
+ }
+
+ activeMQServer.completeActivation(true);
+
+ if (activeMQServer.getIdentity() != null) {
+ ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+ } else {
+ ActiveMQServerLogger.LOGGER.serverIsLive();
+ }
+ } catch (Exception e) {
+ // async stop it, we don't need to await this to complete
+ distributedManager.stop();
+ ActiveMQServerLogger.LOGGER.initializationError(e);
+ activeMQServer.callActivationFailureListeners(e);
+ }
+ }
+
+ private DistributedLock searchLiveOrAcquireLiveLock(final String nodeId,
+ final long blockingCallTimeout,
+ final TimeUnit unit) throws ActiveMQException, InterruptedException {
+ if (policy.isCheckForLiveServer()) {
+ LOGGER.infof("Searching a live server with NodeID = %s", nodeId);
+ if (searchActiveLiveNodeId(policy.getClusterName(), nodeId, blockingCallTimeout, unit, activeMQServer.getConfiguration())) {
+ LOGGER.infof("Found a live server with NodeID = %s: restarting as backup", nodeId);
+ activeMQServer.setHAPolicy(policy.getBackupPolicy());
+ return null;
+ }
+ }
+ startDistributedPrimitiveManager();
+ return acquireDistributeLock(getDistributeLock(nodeId), blockingCallTimeout, unit);
+ }
+
+ private void startDistributedPrimitiveManager() throws InterruptedException, ActiveMQException {
+ LOGGER.infof("Trying to reach the majority of quorum nodes in %d ms.", DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS);
+ try {
+ if (distributedManager.start(DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (Throwable t) {
+ LOGGER.debug(t);
+ }
+ assert !distributedManager.isStarted();
+ throw new ActiveMQException("Cannot reach the majority of quorum nodes");
+ }
+
+ private DistributedLock getDistributeLock(final String nodeId) throws InterruptedException, ActiveMQException {
+ try {
+ return distributedManager.getDistributedLock(nodeId);
+ } catch (Throwable t) {
+ try {
+ distributedManager.stop();
+ } catch (Throwable ignore) {
+ // don't care
+ }
+ if (t instanceof InterruptedException) {
+ throw (InterruptedException) t;
+ }
+ throw new ActiveMQException("Cannot obtain a live lock instance");
+ }
+ }
+
+ private DistributedLock acquireDistributeLock(final DistributedLock liveLock,
+ final long acquireLockTimeout,
+ final TimeUnit unit) throws InterruptedException, ActiveMQException {
+ try {
+ if (liveLock.tryLock(acquireLockTimeout, unit)) {
+ return liveLock;
+ }
+ } catch (UnavailableStateException e) {
+ LOGGER.debug(e);
+ }
+ try {
+ distributedManager.stop();
+ } catch (Throwable ignore) {
+ // don't care
+ }
+ throw new ActiveMQException("Failed to become live");
+ }
+
+ @Override
+ public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed) {
+ if (stoppingServer) {
+ return null;
+ }
+ return packet -> {
+ if (packet.getType() == PacketImpl.BACKUP_REGISTRATION) {
+ onBackupRegistration(channel, acceptorUsed, (BackupRegistrationMessage) packet);
+ }
+ };
+ }
+
+ private void onBackupRegistration(final Channel channel,
+ final Acceptor acceptorUsed,
+ final BackupRegistrationMessage msg) {
+ try {
+ startAsyncReplication(channel.getConnection(), acceptorUsed.getClusterConnection(), msg.getConnector(), msg.isFailBackRequest());
+ } catch (ActiveMQAlreadyReplicatingException are) {
+ channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING));
+ } catch (ActiveMQException e) {
+ LOGGER.debug("Failed to process backup registration packet", e);
+ channel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
+ }
+ }
+
+ private void startAsyncReplication(final CoreRemotingConnection remotingConnection,
+ final ClusterConnection clusterConnection,
+ final TransportConfiguration backupTransport,
+ final boolean isFailBackRequest) throws ActiveMQException {
+ synchronized (replicationLock) {
+ if (replicationManager != null) {
+ throw new ActiveMQAlreadyReplicatingException();
+ }
+ if (!activeMQServer.isStarted()) {
+ throw new ActiveMQIllegalStateException();
+ }
+ final ReplicationFailureListener listener = new ReplicationFailureListener();
+ remotingConnection.addCloseListener(listener);
+ remotingConnection.addFailureListener(listener);
+ final ReplicationManager replicationManager = new ReplicationManager(activeMQServer, remotingConnection, clusterConnection.getCallTimeout(), policy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
+ this.replicationManager = replicationManager;
+ replicationManager.start();
+ final Thread replicatingThread = new Thread(() -> replicate(replicationManager, clusterConnection, isFailBackRequest, backupTransport));
+ replicatingThread.setName("async-replication-thread");
+ replicatingThread.start();
+ }
+ }
+
+ private void replicate(final ReplicationManager replicationManager,
+ final ClusterConnection clusterConnection,
+ final boolean isFailBackRequest,
+ final TransportConfiguration backupTransport) {
+ try {
+ final String nodeID = activeMQServer.getNodeID().toString();
+ activeMQServer.getStorageManager().startReplication(replicationManager, activeMQServer.getPagingManager(), nodeID, isFailBackRequest && policy.isAllowAutoFailBack(), policy.getInitialReplicationSyncTimeout());
+
+ clusterConnection.nodeAnnounced(System.currentTimeMillis(), nodeID, policy.getGroupName(), policy.getScaleDownGroupName(), new Pair<>(null, backupTransport), true);
+
+ if (isFailBackRequest && policy.isAllowAutoFailBack()) {
+ awaitBackupAnnouncementOnFailbackRequest(clusterConnection);
+ }
+ } catch (Exception e) {
+ if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STARTED) {
+ /*
+ * The reasoning here is that the exception was either caused by (1) the
+ * (interaction with) the backup, or (2) by an IO Error at the storage. If (1), we
+ * can swallow the exception and ignore the replication request. If (2) the live
+ * will crash shortly.
+ */
+ ActiveMQServerLogger.LOGGER.errorStartingReplication(e);
+ }
+ try {
+ ActiveMQServerImpl.stopComponent(replicationManager);
+ } catch (Exception amqe) {
+ ActiveMQServerLogger.LOGGER.errorStoppingReplication(amqe);
+ } finally {
+ synchronized (replicationLock) {
+ this.replicationManager = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * This is handling awaiting backup announcement before trying to failover.
+ * This broker is a backup broker, acting as a live and ready to restart as a backup
+ */
+ private void awaitBackupAnnouncementOnFailbackRequest(ClusterConnection clusterConnection) throws Exception {
+ final String nodeID = activeMQServer.getNodeID().toString();
+ final BackupTopologyListener topologyListener = new BackupTopologyListener(nodeID, clusterConnection.getConnector());
+ clusterConnection.addClusterTopologyListener(topologyListener);
+ try {
+ if (topologyListener.waitForBackup()) {
+ restartAsBackupAfterFailback();
+ } else {
+ ActiveMQServerLogger.LOGGER.failbackMissedBackupAnnouncement();
+ }
+ } finally {
+ clusterConnection.removeClusterTopologyListener(topologyListener);
+ }
+ }
+
+ /**
+ * If {@link #asyncStopServer()} happens before this call, the restart just won't happen.
+ * If {@link #asyncStopServer()} happens after this call, will make the server to stop right after being restarted.
+ */
+ private void restartAsBackupAfterFailback() throws Exception {
+ if (stoppingServer) {
+ return;
+ }
+ synchronized (this) {
+ if (stoppingServer) {
+ return;
+ }
+ distributedManager.stop();
+ activeMQServer.fail(true);
+ ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
+ activeMQServer.setHAPolicy(policy.getBackupPolicy());
+ activeMQServer.start();
+ }
+ }
+
+ private void asyncStopServer() {
+ if (stoppingServer) {
+ return;
+ }
+ synchronized (this) {
+ if (stoppingServer) {
+ return;
+ }
+ stoppingServer = true;
+ new Thread(() -> {
+ try {
+ activeMQServer.stop();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer);
+ }
+ }).start();
+ }
+ }
+
+ @Override
+ public void onUnavailableLockEvent() {
+ LOGGER.error("Quorum UNAVAILABLE: async stopping broker.");
+ asyncStopServer();
+ }
+
+ private final class ReplicationFailureListener implements FailureListener, CloseListener {
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+ onReplicationConnectionClose();
+ }
+
+ @Override
+ public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
+ connectionFailed(me, failedOver);
+ }
+
+ @Override
+ public void connectionClosed() {
+ onReplicationConnectionClose();
+ }
+ }
+
+ private void onReplicationConnectionClose() {
+ ExecutorService executorService = activeMQServer.getThreadPool();
+ if (executorService != null) {
+ synchronized (replicationLock) {
+ if (replicationManager == null) {
+ return;
+ }
+ }
+ executorService.execute(() -> {
+ synchronized (replicationLock) {
+ if (replicationManager == null) {
+ return;
+ }
+ // this is going to stop the replication manager
+ activeMQServer.getStorageManager().stopReplication();
+ assert !replicationManager.isStarted();
+ replicationManager = null;
+ }
+ });
+ }
+ }
+
+ @Override
+ public void close(boolean permanently, boolean restarting) throws Exception {
+ synchronized (replicationLock) {
+ replicationManager = null;
+ }
+ distributedManager.stop();
+ // To avoid a NPE cause by the stop
+ final NodeManager nodeManager = activeMQServer.getNodeManager();
+ if (nodeManager != null) {
+ if (permanently) {
+ nodeManager.crashLiveServer();
+ } else {
+ nodeManager.pauseLiveServer();
+ }
+ }
+ }
+
+ @Override
+ public void sendLiveIsStopping() {
+ final ReplicationManager replicationManager = getReplicationManager();
+ if (replicationManager == null) {
+ return;
+ }
+ replicationManager.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.STOP_CALLED);
+ // this pool gets a 'hard' shutdown, no need to manage the Future of this Runnable.
+ activeMQServer.getScheduledPool().schedule(replicationManager::clearReplicationTokens, 30, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public ReplicationManager getReplicationManager() {
+ synchronized (replicationLock) {
+ return replicationManager;
+ }
+ }
+
+ @Override
+ public boolean isReplicaSync() {
+ final ReplicationManager replicationManager = getReplicationManager();
+ if (replicationManager == null) {
+ return false;
+ }
+ return !replicationManager.isSynchronizing();
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 0249cdfe53..3876185803 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint.ReplicationEndpointEventListener;
import org.apache.activemq.artemis.core.server.ActivationParams;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -54,7 +55,7 @@ import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothi
import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAIL_OVER;
import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.STOP;
-public final class SharedNothingBackupActivation extends Activation {
+public final class SharedNothingBackupActivation extends Activation implements ReplicationEndpointEventListener {
private static final Logger logger = Logger.getLogger(SharedNothingBackupActivation.class);
@@ -96,7 +97,7 @@ public final class SharedNothingBackupActivation extends Activation {
assert replicationEndpoint == null;
activeMQServer.resetNodeManager();
backupUpToDate = false;
- replicationEndpoint = new ReplicationEndpoint(activeMQServer, ioCriticalErrorListener, attemptFailBack, this);
+ replicationEndpoint = new ReplicationEndpoint(activeMQServer, attemptFailBack, this);
}
@Override
@@ -156,9 +157,6 @@ public final class SharedNothingBackupActivation extends Activation {
logger.debug("Starting backup manager");
activeMQServer.getBackupManager().start();
- logger.debug("Set backup Quorum");
- replicationEndpoint.setBackupQuorum(backupQuorum);
-
replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
EndpointConnector endpointConnector = new EndpointConnector();
@@ -461,7 +459,13 @@ public final class SharedNothingBackupActivation extends Activation {
return backupUpToDate;
}
- public void setRemoteBackupUpToDate() {
+ @Override
+ public void onLiveNodeId(String nodeId) {
+ backupQuorum.liveIDSet(nodeId);
+ }
+
+ @Override
+ public void onRemoteBackupUpToDate() {
activeMQServer.getBackupManager().announceBackup();
backupUpToDate = true;
backupSyncLatch.countDown();
@@ -470,7 +474,8 @@ public final class SharedNothingBackupActivation extends Activation {
/**
* @throws ActiveMQException
*/
- public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws ActiveMQException {
+ @Override
+ public void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws ActiveMQException {
if (logger.isTraceEnabled()) {
logger.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" +
backupUpToDate);
@@ -526,4 +531,9 @@ public final class SharedNothingBackupActivation extends Activation {
return replicationEndpoint;
}
}
+
+ @Override
+ public boolean isReplicaSync() {
+ return isRemoteBackupUpToDate();
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 9de4be057b..f876a7604b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -462,4 +462,13 @@ public class SharedNothingLiveActivation extends LiveActivation {
private TransportConfiguration[] connectorNameListToArray(final List connectorNames) {
return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
}
+
+ @Override
+ public boolean isReplicaSync() {
+ final ReplicationManager replicationManager = getReplicationManager();
+ if (replicationManager == null) {
+ return false;
+ }
+ return !replicationManager.isSynchronizing();
+ }
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 9dc5b5c0fc..9e79a54b7f 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2605,7 +2605,7 @@
-
+
A key-value pair option for the DataSource
@@ -2682,7 +2682,7 @@
-
+
@@ -2726,6 +2726,36 @@
+
+
+
+
+
+ The distributed-primitive-manager class name
+
+
+
+
+
+
+ A list of options for the distributed-primitive-manager
+
+
+
+
+
+
+
+ A key-value pair option for the distributed-primitive-manager
+
+
+
+
+
+
+
+
+
@@ -2749,6 +2779,20 @@
+
+
+
+ A primary server configured to replicate.
+
+
+
+
+
+
+ A backup server configured to replicate.
+
+
+
@@ -3119,6 +3163,155 @@
+
+
+
+
+
+ It's the manager used to manager distributed locks used for this type of replication.
+
+
+
+
+
+
+ used for replication, if set, (remote) backup servers will only pair with live servers with matching
+ group-name
+
+
+
+
+
+
+ Name of the cluster configuration to use for replication. This setting is only necessary in case you
+ configure multiple cluster connections. It is used by a replicating backups and by live servers that
+ may attempt fail-back.
+
+
+
+
+
+
+ Whether to check the cluster for a (live) server using our own server ID when starting
+ up. This option is only necessary for performing 'fail-back' on replicating
+ servers. Strictly speaking this setting only applies to live servers and not to
+ backups.
+
+
+
+
+
+
+ The amount of time to wait for the replica to acknowledge it has received all the necessary data from
+ the replicating server at the final step of the initial replication synchronization process.
+
+
+
+
+
+
+ If we start as a replica and lose connection to the master, how many times should we attempt to vote
+ for quorum before restarting
+
+
+
+
+
+
+ How long to wait (in milliseconds) between each vote
+
+
+
+
+
+
+ If we start as a replica how long to wait (in milliseconds) before trying to replicate again after failing to find a replica
+
+
+
+
+
+
+
+
+
+
+
+ It's the manager used to manager distributed locks used for this type of replication.
+
+
+
+
+
+
+ used for replication, if set, (remote) backup servers will only pair with live servers with matching
+ group-name
+
+
+
+
+
+
+ Name of the cluster configuration to use for replication. This setting is only necessary in case you
+ configure multiple cluster connections. It is used by a replicating backups and by live servers that
+ may attempt fail-back.
+
+
+
+
+
+
+ This specifies how many times a replicated backup server can restart after moving its files on start.
+ Once there are this number of backup journal files the server will stop permanently after if fails
+ back.
+
+
+
+
+
+
+ Whether a server will automatically stop when a another places a request to take over
+ its place. The use case is when a regular server stops and its backup takes over its
+ duties, later the main server restarts and requests the server (the former backup) to
+ stop operating.
+
+
+
+
+
+
+ If we have to start as a replicated server this is the amount of time to wait for the replica to
+ acknowledge it has received all the necessary data from the replicating server at the final step
+ of the initial replication synchronization process.
+
+
+
+
+
+
+ If we lose connection to the master, how many times should we attempt to vote for quorum before restarting
+
+
+
+
+
+
+ How long to wait (in milliseconds) between each vote
+
+
+
+
+
+
+ How long to wait (in milliseconds) before trying to replicate again after failing to find a replica
+
+
+
+
+
+
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java
index 425ff7b244..4b37a76125 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java
@@ -17,7 +17,12 @@
package org.apache.activemq.artemis.core.config.impl;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
@@ -27,6 +32,8 @@ import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.LiveOnlyPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolicy;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
@@ -35,11 +42,19 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ColocatedActivation;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
+import org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation;
+import org.apache.activemq.artemis.core.server.impl.ReplicationPrimaryActivation;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
import org.apache.activemq.artemis.core.server.impl.SharedStoreBackupActivation;
import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.MutableLong;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.IsInstanceOf;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -124,6 +139,248 @@ public class HAPolicyConfigurationTest extends ActiveMQTestBase {
liveOnlyTest("live-only-hapolicy-config5.xml");
}
+ public static class FakeDistributedPrimitiveManager implements DistributedPrimitiveManager {
+
+ private final Map config;
+ private boolean started;
+ private DistributedLock lock;
+
+ public FakeDistributedPrimitiveManager(Map config) {
+ this.config = config;
+ this.started = false;
+ }
+
+ public Map getConfig() {
+ return config;
+ }
+
+ @Override
+ public void addUnavailableManagerListener(UnavailableManagerListener listener) {
+ // no op
+ }
+
+ @Override
+ public void removeUnavailableManagerListener(UnavailableManagerListener listener) {
+ // no op
+ }
+
+ @Override
+ public boolean start(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
+ started = true;
+ return true;
+ }
+
+ @Override
+ public void start() throws InterruptedException, ExecutionException {
+ started = true;
+ }
+
+ @Override
+ public boolean isStarted() {
+ return started;
+ }
+
+ @Override
+ public void stop() {
+ started = false;
+ if (lock != null) {
+ lock.close();
+ }
+ lock = null;
+ }
+
+ @Override
+ public DistributedLock getDistributedLock(String lockId) {
+ if (!started) {
+ throw new IllegalStateException("need to start first");
+ }
+ if (lock == null) {
+ lock = new DistributedLock() {
+
+ private boolean held;
+
+ @Override
+ public String getLockId() {
+ return lockId;
+ }
+
+ @Override
+ public boolean isHeldByCaller() throws UnavailableStateException {
+ return held;
+ }
+
+ @Override
+ public boolean tryLock() throws UnavailableStateException, InterruptedException {
+ if (held) {
+ return false;
+ }
+ held = true;
+ return true;
+ }
+
+ @Override
+ public void unlock() throws UnavailableStateException {
+ held = false;
+ }
+
+ @Override
+ public void addListener(UnavailableLockListener listener) {
+
+ }
+
+ @Override
+ public void removeListener(UnavailableLockListener listener) {
+
+ }
+
+ @Override
+ public void close() {
+ held = false;
+ }
+ };
+ } else if (!lock.getLockId().equals(lockId)) {
+ throw new IllegalStateException("This shouldn't happen");
+ }
+ return lock;
+ }
+
+ @Override
+ public MutableLong getMutableLong(String mutableLongId) throws InterruptedException, ExecutionException, TimeoutException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public void close() {
+ stop();
+ }
+ }
+
+ private static void validateManagerConfig(Map config) {
+ assertEquals("127.0.0.1:6666", config.get("connect-string"));
+ assertEquals("16000", config.get("session-ms"));
+ assertEquals("2000", config.get("connection-ms"));
+ assertEquals("2", config.get("retries"));
+ assertEquals("2000", config.get("retries-ms"));
+ assertEquals("test", config.get("namespace"));
+ assertEquals("10", config.get("session-percent"));
+ assertEquals(7, config.size());
+ }
+
+ @Test
+ public void PrimaryReplicationTest() throws Exception {
+ Configuration configuration = createConfiguration("primary-hapolicy-config.xml");
+ ActiveMQServerImpl server = new ActiveMQServerImpl(configuration);
+ try {
+ server.start();
+ Activation activation = server.getActivation();
+ assertTrue(activation instanceof ReplicationPrimaryActivation);
+ HAPolicy haPolicy = server.getHAPolicy();
+ assertTrue(haPolicy instanceof ReplicationPrimaryPolicy);
+ ReplicationPrimaryPolicy policy = (ReplicationPrimaryPolicy) haPolicy;
+ assertFalse(policy.isAllowAutoFailBack());
+ assertEquals(9876, policy.getInitialReplicationSyncTimeout());
+ assertFalse(policy.canScaleDown());
+ assertFalse(policy.isBackup());
+ assertFalse(policy.isSharedStore());
+ assertTrue(policy.isCheckForLiveServer());
+ assertTrue(policy.isWaitForActivation());
+ assertEquals("purple", policy.getGroupName());
+ assertEquals("purple", policy.getBackupGroupName());
+ assertEquals("abcdefg", policy.getClusterName());
+ assertFalse(policy.useQuorumManager());
+ // check failback companion backup policy
+ ReplicationBackupPolicy failbackPolicy = policy.getBackupPolicy();
+ assertNotNull(failbackPolicy);
+ assertSame(policy, failbackPolicy.getLivePolicy());
+ assertEquals(policy.getGroupName(), failbackPolicy.getGroupName());
+ assertEquals(policy.getBackupGroupName(), failbackPolicy.getBackupGroupName());
+ assertEquals(policy.getClusterName(), failbackPolicy.getClusterName());
+ assertEquals(failbackPolicy.getMaxSavedReplicatedJournalsSize(), ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize());
+ assertEquals(1, failbackPolicy.getVoteRetries());
+ assertEquals(1000, failbackPolicy.getVoteRetryWait());
+ assertTrue(failbackPolicy.isTryFailback());
+ assertTrue(failbackPolicy.isBackup());
+ assertFalse(failbackPolicy.isSharedStore());
+ assertTrue(failbackPolicy.isWaitForActivation());
+ assertFalse(failbackPolicy.useQuorumManager());
+ assertEquals(12345, failbackPolicy.getRetryReplicationWait());
+ // check scale-down properties
+ assertFalse(failbackPolicy.canScaleDown());
+ assertNull(failbackPolicy.getScaleDownClustername());
+ assertNull(failbackPolicy.getScaleDownGroupName());
+ // validate manager
+ DistributedPrimitiveManager manager = ((ReplicationPrimaryActivation) activation).getDistributedManager();
+ assertNotNull(manager);
+ assertEquals(FakeDistributedPrimitiveManager.class.getName(), manager.getClass().getName());
+ MatcherAssert.assertThat(manager, IsInstanceOf.instanceOf(FakeDistributedPrimitiveManager.class));
+ FakeDistributedPrimitiveManager forwardingManager = (FakeDistributedPrimitiveManager) manager;
+ // validate manager config
+ validateManagerConfig(forwardingManager.getConfig());
+ } finally {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void BackupReplicationTest() throws Exception {
+ Configuration configuration = createConfiguration("backup-hapolicy-config.xml");
+ ActiveMQServerImpl server = new ActiveMQServerImpl(configuration);
+ try {
+ server.start();
+ Activation activation = server.getActivation();
+ assertTrue(activation instanceof ReplicationBackupActivation);
+ HAPolicy haPolicy = server.getHAPolicy();
+ assertTrue(haPolicy instanceof ReplicationBackupPolicy);
+ ReplicationBackupPolicy policy = (ReplicationBackupPolicy) haPolicy;
+ assertEquals("tiddles", policy.getGroupName());
+ assertEquals("tiddles", policy.getBackupGroupName());
+ assertEquals("33rrrrr", policy.getClusterName());
+ assertEquals(22, policy.getMaxSavedReplicatedJournalsSize());
+ assertEquals(1, policy.getVoteRetries());
+ assertEquals(1000, policy.getVoteRetryWait());
+ assertFalse(policy.isTryFailback());
+ assertTrue(policy.isBackup());
+ assertFalse(policy.isSharedStore());
+ assertTrue(policy.isWaitForActivation());
+ assertFalse(policy.useQuorumManager());
+ assertEquals(12345, policy.getRetryReplicationWait());
+ // check scale-down properties
+ assertFalse(policy.canScaleDown());
+ assertNull(policy.getScaleDownClustername());
+ assertNull(policy.getScaleDownGroupName());
+ // check failover companion live policy
+ ReplicationPrimaryPolicy failoverLivePolicy = policy.getLivePolicy();
+ assertNotNull(failoverLivePolicy);
+ assertSame(policy, failoverLivePolicy.getBackupPolicy());
+ assertFalse(failoverLivePolicy.isAllowAutoFailBack());
+ assertEquals(9876, failoverLivePolicy.getInitialReplicationSyncTimeout());
+ assertFalse(failoverLivePolicy.canScaleDown());
+ assertFalse(failoverLivePolicy.isBackup());
+ assertFalse(failoverLivePolicy.isSharedStore());
+ assertFalse(failoverLivePolicy.isCheckForLiveServer());
+ assertTrue(failoverLivePolicy.isWaitForActivation());
+ assertEquals(policy.getGroupName(), failoverLivePolicy.getGroupName());
+ assertEquals(policy.getClusterName(), failoverLivePolicy.getClusterName());
+ assertEquals(policy.getBackupGroupName(), failoverLivePolicy.getBackupGroupName());
+ assertFalse(failoverLivePolicy.useQuorumManager());
+ // check scale-down properties
+ assertFalse(failoverLivePolicy.canScaleDown());
+ assertNull(failoverLivePolicy.getScaleDownClustername());
+ assertNull(failoverLivePolicy.getScaleDownGroupName());
+ // validate manager
+ DistributedPrimitiveManager manager = ((ReplicationBackupActivation) activation).getDistributedManager();
+ assertNotNull(manager);
+ assertEquals(FakeDistributedPrimitiveManager.class.getName(), manager.getClass().getName());
+ MatcherAssert.assertThat(manager, IsInstanceOf.instanceOf(FakeDistributedPrimitiveManager.class));
+ FakeDistributedPrimitiveManager forwardingManager = (FakeDistributedPrimitiveManager) manager;
+ // validate manager config
+ validateManagerConfig(forwardingManager.getConfig());
+ } finally {
+ server.stop();
+ }
+ }
+
@Test
public void ReplicatedTest() throws Exception {
Configuration configuration = createConfiguration("replicated-hapolicy-config.xml");
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index adfceeed84..5a16f3e5f6 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -113,6 +113,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -129,6 +130,7 @@ import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
+import org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -1384,6 +1386,8 @@ public abstract class ActiveMQTestBase extends Assert {
if (isReplicated) {
if (activation instanceof SharedNothingBackupActivation) {
isRemoteUpToDate = backup.isReplicaSync();
+ } else if (activation instanceof ReplicationBackupActivation) {
+ isRemoteUpToDate = backup.isReplicaSync();
} else {
//we may have already failed over and changed the Activation
if (actualServer.isStarted()) {
@@ -2517,6 +2521,17 @@ public abstract class ActiveMQTestBase extends Assert {
return !hadToInterrupt;
}
+ protected static ReplicationEndpoint getReplicationEndpoint(ActiveMQServer server) {
+ final Activation activation = server.getActivation();
+ if (activation instanceof SharedNothingBackupActivation) {
+ return ((SharedNothingBackupActivation) activation).getReplicationEndpoint();
+ }
+ if (activation instanceof ReplicationBackupActivation) {
+ return ((ReplicationBackupActivation) activation).getReplicationEndpoint();
+ }
+ return null;
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
diff --git a/artemis-server/src/test/resources/backup-hapolicy-config.xml b/artemis-server/src/test/resources/backup-hapolicy-config.xml
new file mode 100644
index 0000000000..be55236676
--- /dev/null
+++ b/artemis-server/src/test/resources/backup-hapolicy-config.xml
@@ -0,0 +1,54 @@
+
+
+
+
+
+
+
+
+
+ tiddles
+ 22
+ 33rrrrr
+ 9876
+ 12345
+ 1
+ 1000
+ false
+
+
+ org.apache.activemq.artemis.core.config.impl.HAPolicyConfigurationTest$FakeDistributedPrimitiveManager
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/artemis-server/src/test/resources/primary-hapolicy-config.xml b/artemis-server/src/test/resources/primary-hapolicy-config.xml
new file mode 100644
index 0000000000..5b88bcd80f
--- /dev/null
+++ b/artemis-server/src/test/resources/primary-hapolicy-config.xml
@@ -0,0 +1,52 @@
+
+
+
+
+
+
+
+ purple
+ abcdefg
+ 9876
+ 12345
+ true
+ 1
+ 1000
+
+
+ org.apache.activemq.artemis.core.config.impl.HAPolicyConfigurationTest$FakeDistributedPrimitiveManager
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md
index b2f61d5d8b..96045e95e4 100644
--- a/docs/user-manual/en/ha.md
+++ b/docs/user-manual/en/ha.md
@@ -98,6 +98,36 @@ or
```
+*Replication* allows too to configure 2 new roles to enable *pluggable quorum* provider configuration, by using:
+```xml
+
+
+
+
+
+```
+to configure the classic *master* role, and
+```xml
+
+
+
+
+
+```
+for the classic *slave* one.
+
+If *replication* is configured using such new roles some additional element is required to complete configuration, detailed later.
+
+### IMPORTANT NOTE ON PLUGGABLE QUORUM VOTE FEATURE
+
+This feature is still **EXPERIMENTAL** and not meant to be run in production yet.
+
+It means:
+- its configuration can change until declared as **officially stable**
+- it has to solve yet an inherent data misalignment issue with replication (it can happen with `classic` replication as well)
+
+More info about this issue are on [ARTEMIS-3340](https://issues.apache.org/jira/browse/ARTEMIS-3340).
+
### Data Replication
When using replication, the live and the backup servers do not share the
@@ -199,16 +229,26 @@ Much like in the shared-store case, when the live server stops or
crashes, its replicating backup will become active and take over its
duties. Specifically, the backup will become active when it loses
connection to its live server. This can be problematic because this can
-also happen because of a temporary network problem. In order to address
-this issue, the backup will try to determine whether it still can
+also happen because of a temporary network problem.
+
+This issue is solved in 2 different ways depending on which replication roles are configured:
+- **classic replication** (`master`/`slave` roles): backup will try to determine whether it still can
connect to the other servers in the cluster. If it can connect to more
than half the servers, it will become active, if more than half the
servers also disappeared with the live, the backup will wait and try
reconnecting with the live. This avoids a split brain situation.
+- **pluggable quorum vote replication** (`primary`/`backup` roles): backup relies on a pluggable quorum provider
+ (configurable via `manager` xml element) to detect if there's any active live.
+
+> ***NOTE***
+>
+> A backup in the **pluggable quorum vote replication** still need to carefully configure
+> [connection-ttl](connection-ttl.md) in order to promptly issue a request to become live to the quorum service
+> before failing-over.
#### Configuration
-To configure the live and backup servers to be a replicating pair,
+To configure a classic replication's live and backup servers to be a replicating pair,
configure the live server in ' `broker.xml` to have:
```xml
@@ -235,6 +275,30 @@ The backup server must be similarly configured but as a `slave`
```
+To configure a pluggable quorum replication's primary and backup instead:
+
+```xml
+
+
+
+
+
+...
+
+
+ ...
+
+
+```
+and
+```xml
+
+
+
+
+
+```
+
#### All Replication Configuration
The following table lists all the `ha-policy` configuration elements for
@@ -308,6 +372,142 @@ replica to acknowledge it has received all the necessary data. The
default is 30,000 milliseconds. **Note:** during this interval any
journal related operations will be blocked.
+#### Pluggable Quorum Vote Replication configurations
+Pluggable Quorum Vote replication configuration options are a bit different
+from classic replication, mostly because of its customizable nature.
+
+[Apache curator](https://curator.apache.org/) is used by the default quorum provider.
+
+Below some example configurations to show how it works.
+
+For `primary`:
+```xml
+
+
+
+
+ org.apache.activemq.artemis.quorum.zookeeper.CuratorDistributedPrimitiveManager
+
+
+
+
+ true
+
+
+
+```
+And `backup`:
+```xml
+
+
+
+
+ org.apache.activemq.artemis.quorum.zookeeper.CuratorDistributedPrimitiveManager
+
+
+
+
+ true
+
+
+
+```
+The configuration of `class-name` as follows
+```xml
+org.apache.activemq.artemis.quorum.zookeeper.CuratorDistributedPrimitiveManager
+```
+isn't really needed, because Apache Curator is the default provider, but has been shown for completeness.
+
+The `properties` element, instead
+```xml
+
+
+
+```
+Can specify a list of `property` elements in the form of key-value pairs, depending the ones
+accepted by the specified `class-name` provider.
+
+Apache Curator's provider allow to configure these properties:
+
+- [`connect-string`](https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#connectString(java.lang.String)): (no default)
+- [`session-ms`](https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#sessionTimeoutMs(int)): (default is 18000 ms)
+- [`session-percent`](https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#simulatedSessionExpirationPercent(int)): (default is 33); should be <= default,
+ see https://cwiki.apache.org/confluence/display/CURATOR/TN14 for more info
+- [`connection-ms`](https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#connectionTimeoutMs(int)): (default is 8000 ms)
+- [`retries`](https://curator.apache.org/apidocs/org/apache/curator/retry/RetryNTimes.html#%3Cinit%3E(int,int)): (default is 1)
+- [`retries-ms`](https://curator.apache.org/apidocs/org/apache/curator/retry/RetryNTimes.html#%3Cinit%3E(int,int)): (default is 1000 ms)
+- [`namespace`](https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#namespace(java.lang.String)): (no default)
+
+Configuration of the [Apache Zookeeper](https://zookeeper.apache.org/) nodes is left to the user, but there are few
+**suggestions to improve the reliability of the quorum service**:
+- broker `session_ms` must be `>= 2 * server tick time` and `<= 20 * server tick time` as by
+ [Zookeeper 3.6.3 admin guide](https://zookeeper.apache.org/doc/r3.6.3/zookeeperAdmin.html): it directly impacts how fast a backup
+ can failover to an isolated/killed/unresponsive live; the higher, the slower.
+- GC on broker machine should allow keeping GC pauses within 1/3 of `session_ms` in order to let the Zookeeper heartbeat protocol
+to work reliably: if it's not possible, better increase `session_ms` accepting a slower failover
+- Zookeeper must have enough resources to keep GC (and OS) pauses much smaller than server tick time: please consider carefully if
+ broker and Zookeeper node should share the same physical machine, depending on the expected load of the broker
+- network isolation protection requires configuring >=3 Zookeeper nodes
+
+#### *Important*: Notes on pluggable quorum replication configuration
+
+The first `classic` replication configuration that won't apply to the pluggable quorum replication
+is `vote-on-replication-failure` and configure it produces a startup error: pluggable quorum replication
+always behave like `vote-on-replication-failure` `true` ie shutting down a live broker (and its JVM) in case of quorum loss.
+
+The second deprecated `classic` replication configuration is `quorum-vote-wait`: given that the pluggable quorum vote replication
+requires backup to have an always-on reliable quorum service, there's no need to specify the timeout to reach
+the majority of quorum nodes. A backup remains inactive (ie JVM still up, console too, unable to sync with live, to failover etc etc)
+until the majority of quorum nodes is reachable again, re-activating if happens.
+
+The only exception is with primary failing-back to an existing live backup using `true`:
+if the quorum service isn't immediately available the primary (and its JVM) just stop, allowing fail-fast failing-back.
+
+There are few *semantic differences* of other existing properties:
+- `vote-retry-wait`: in `classic` replication means how long to wait between each quorum vote try, while with pluggable quorum replication
+ means how long request to failover for each attempt
+- `vote-retries`: differently from `classic`, the amount of vote attempt is `1 + vote-retries` (with classic is just `vote-retries`).
+ Setting `0` means no retries, leaving backup to still perform an initial attempt.
+
+**Notes on replication configuration with [Apache curator](https://curator.apache.org/) quorum provider**
+
+As said some paragraphs above, `session-ms` affect the failover duration: a backup can
+failover after `session-ms` expires or if the live broker voluntary give up its role
+eg during a fail-back/manual broker stop, it happens immediately.
+
+For the former case (session expiration with live no longer present), the backup broker can detect an unresponsive live by using:
+1. cluster connection PINGs (affected by [connection-ttl](connection-ttl.md) tuning)
+2. closed TCP connection notification (depends by TCP configuration and networking stack/topology)
+
+These 2 cases have 2 different failover duration depending on different factors:
+1. `connection-ttl` affect how much time of the expiring `session-ms` is used to just detect a missing live broker: the higher `connection-tt`,
+ the slower it reacts; backup can attempt to failover for the remaining `session-ms - connection-ttl`
+2. `session-ms` expiration is immediately detected: backup must try to failover for >=`session-ms` to be sure to catch
+ the session expiration and complete failover
+
+The previous comments are meant to suggest to the careful reader that the minimum time to attempt to failover
+cannot be below the full `session-ms` expires.
+In short, it means
+```
+ total failover attempt time > session-ms
+```
+with
+```
+ total failover attempt time = vote-retry-wait * (vote-retries + 1)
+```
+and by consequence:
+```
+ vote-retry-wait * (vote-retries + 1) > session-ms
+```
+For example with `session-ms = 18000 ms`, safe values for failover timeout are:
+```xml
+ 11
+ 2000
+```
+Because `11 * 2000 = 22000 ms` that's bigger then `18000 ms`.
+
+There's no risk that a backup broker will early stop attempting to failover, losing its chance to become live.
+
### Shared Store
When using a shared store, both live and backup servers share the *same*
@@ -406,8 +606,32 @@ stop. This configuration would look like:
```
-In replication HA mode you need to set an extra property
-`check-for-live-server` to `true` in the `master` configuration. If set
+The same configuration option can be set for both replications, classic:
+```xml
+
+
+
+ true
+
+
+
+```
+and with pluggable quorum provider:
+```xml
+
+
+
+
+
+
+ true
+
+
+
+```
+
+In both replication HA mode you need to set an extra property
+`check-for-live-server` to `true` in the `master`/`primary` configuration. If set
to true, during start-up a live server will first search the cluster for
another server using its nodeID. If it finds one, it will contact this
server and try to "fail-back". Since this is a remote replication
@@ -418,7 +642,7 @@ to shutdown for it to take over. This is necessary because otherwise the
live server has no means to know whether there was a fail-over or not,
and if there was if the server that took its duties is still running or
not. To configure this option at your `broker.xml`
-configuration file as follows:
+configuration file as follows, for classic replication:
```xml
@@ -430,6 +654,29 @@ configuration file as follows:
```
+And pluggable quorum replication:
+
+```xml
+
+
+
+
+
+
+ true
+
+
+
+```
+
+The key difference from classic replication is that if `master` cannot reach any
+live server with its same nodeID, it's going straight to become live, while `primary`
+request it to the quorum provider, searching again for any existing live if
+the quorum provider is not available (eg connectivity loss, consensus absence) or
+if there's another live broker with the same nodeID alive, in an endless loop.
+
+In short: a started `primary` cannot become live without consensus.
+
> **Warning**
>
> Be aware that if you restart a live server while after failover has
diff --git a/pom.xml b/pom.xml
index 8d76bbbc22..5e59e4999c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,8 @@
artemis-distribution
tests
artemis-features
+ artemis-quorum-api
+ artemis-quorum-ri
ActiveMQ Artemis Parent
@@ -105,6 +107,9 @@
3.11.2
2.1.2
4.1.66.Final
+ 5.1.0
+
+ 3.6.3
2.0.40.Final
@@ -851,6 +856,32 @@
jakarta.security.auth.message-api
${jakarta.security.auth.message-api.version}