ARTEMIS-1112: Added wait-for-activation option to shared-store-master config

Added a wait-for-activation option to shared-store master HA policies.
This option is enabled by default to ensure unchanged server startup behavior.

If this option is enabled, ActiveMQServer.start() with a shared-store master server will not return
before the server has been activated.
If this options is disabled, start() will return after a background activation thread has been started.
The caller can use waitForActivation() to wait until server is activated, or just check the current activation status.
This commit is contained in:
Bernd Gutjahr 2017-05-03 14:46:47 +02:00 committed by Clebert Suconic
parent cdaae1578b
commit 6017e305d9
16 changed files with 216 additions and 24 deletions

View File

@ -395,6 +395,9 @@ public final class ActiveMQDefaultConfiguration {
// Will this backup server come live on a normal server shutdown // Will this backup server come live on a normal server shutdown
private static boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false; private static boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false;
// Will a shared-store master startup wait for activation
private static boolean DEFAULT_WAIT_FOR_ACTIVATION = true;
// Will the broker populate the message with the name of the validated user // Will the broker populate the message with the name of the validated user
private static boolean DEFAULT_POPULATE_VALIDATED_USER = false; private static boolean DEFAULT_POPULATE_VALIDATED_USER = false;
@ -1113,6 +1116,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN; return DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
} }
/**
* Will a shared-store master startup wait for activation
*/
public static boolean isDefaultWaitForActivation() {
return DEFAULT_WAIT_FOR_ACTIVATION;
}
/** /**
* Will the broker populate the message with the name of the validated user * Will the broker populate the message with the name of the validated user
*/ */

View File

@ -80,7 +80,7 @@ public final class ConfigurationUtils {
} }
case SHARED_STORE_MASTER: { case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;
return new SharedStoreMasterPolicy(pc.isFailoverOnServerShutdown()); return new SharedStoreMasterPolicy(pc.isFailoverOnServerShutdown(), pc.isWaitForActivation());
} }
case SHARED_STORE_SLAVE: { case SHARED_STORE_SLAVE: {
SharedStoreSlavePolicyConfiguration pc = (SharedStoreSlavePolicyConfiguration) conf; SharedStoreSlavePolicyConfiguration pc = (SharedStoreSlavePolicyConfiguration) conf;

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfiguration { public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfiguration {
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
private boolean waitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
public SharedStoreMasterPolicyConfiguration() { public SharedStoreMasterPolicyConfiguration() {
} }
@ -49,4 +50,13 @@ public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfigurati
this.failoverOnServerShutdown = failoverOnServerShutdown; this.failoverOnServerShutdown = failoverOnServerShutdown;
return this; return this;
} }
public boolean isWaitForActivation() {
return waitForActivation;
}
public SharedStoreMasterPolicyConfiguration setWaitForActivation(Boolean waitForActivation) {
this.waitForActivation = waitForActivation;
return this;
}
} }

View File

@ -1241,6 +1241,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
SharedStoreMasterPolicyConfiguration configuration = new SharedStoreMasterPolicyConfiguration(); SharedStoreMasterPolicyConfiguration configuration = new SharedStoreMasterPolicyConfiguration();
configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown())); configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown()));
configuration.setWaitForActivation(getBoolean(policyNode, "wait-for-activation", configuration.isWaitForActivation()));
return configuration; return configuration;
} }

View File

@ -40,6 +40,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -998,8 +999,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void errorCompletingCallbackOnReplicationManager(@Cause Throwable e); void errorCompletingCallbackOnReplicationManager(@Cause Throwable e);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 222158, value = "{0} backup activation thread did not finish.", format = Message.Format.MESSAGE_FORMAT) @Message(id = 222158, value = "{0} activation thread did not finish.", format = Message.Format.MESSAGE_FORMAT)
void backupActivationDidntFinish(ActiveMQServer server); void activationDidntFinish(ActiveMQServer server);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 222159, value = "unable to send notification when broadcast group is stopped", format = Message.Format.MESSAGE_FORMAT) @Message(id = 222159, value = "unable to send notification when broadcast group is stopped", format = Message.Format.MESSAGE_FORMAT)
@ -1209,9 +1210,9 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 222201, @Message(id = 222201,
value = "Timed out waiting for backup activation to exit", value = "Timed out waiting for activation to exit",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void backupActivationTimeout(); void activationTimeout();
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 222202, @Message(id = 222202,

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -39,6 +40,10 @@ public interface HAPolicy<T extends Activation> {
boolean isBackup(); boolean isBackup();
default boolean isWaitForActivation() {
return ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
}
boolean canScaleDown(); boolean canScaleDown();
/* /*

View File

@ -26,14 +26,16 @@ import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation;
public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> { public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
private boolean waitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
private SharedStoreSlavePolicy sharedStoreSlavePolicy; private SharedStoreSlavePolicy sharedStoreSlavePolicy;
public SharedStoreMasterPolicy() { public SharedStoreMasterPolicy() {
} }
public SharedStoreMasterPolicy(boolean failoverOnServerShutdown) { public SharedStoreMasterPolicy(boolean failoverOnServerShutdown, boolean waitForActivation) {
this.failoverOnServerShutdown = failoverOnServerShutdown; this.failoverOnServerShutdown = failoverOnServerShutdown;
this.waitForActivation = waitForActivation;
} }
@Deprecated @Deprecated
@ -53,6 +55,15 @@ public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
this.failoverOnServerShutdown = failoverOnServerShutdown; this.failoverOnServerShutdown = failoverOnServerShutdown;
} }
@Override
public boolean isWaitForActivation() {
return waitForActivation;
}
public void setWaitForActivation(boolean waitForActivation) {
this.waitForActivation = waitForActivation;
}
public SharedStoreSlavePolicy getSharedStoreSlavePolicy() { public SharedStoreSlavePolicy getSharedStoreSlavePolicy() {
return sharedStoreSlavePolicy; return sharedStoreSlavePolicy;
} }

View File

@ -29,6 +29,8 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback(); private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
private boolean isWaitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
//this is how we act once we have failed over //this is how we act once we have failed over
private SharedStoreMasterPolicy sharedStoreMasterPolicy; private SharedStoreMasterPolicy sharedStoreMasterPolicy;
@ -64,7 +66,7 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
public SharedStoreMasterPolicy getSharedStoreMasterPolicy() { public SharedStoreMasterPolicy getSharedStoreMasterPolicy() {
if (sharedStoreMasterPolicy == null) { if (sharedStoreMasterPolicy == null) {
sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown); sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown, isWaitForActivation);
} }
return sharedStoreMasterPolicy; return sharedStoreMasterPolicy;
} }
@ -91,6 +93,10 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
this.allowAutoFailBack = allowAutoFailBack; this.allowAutoFailBack = allowAutoFailBack;
} }
public void setIsWaitForActivation(boolean isWaitForActivation) {
this.isWaitForActivation = isWaitForActivation;
}
@Override @Override
public Activation createActivation(ActiveMQServerImpl server, public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,

View File

@ -306,7 +306,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Used to identify the server on tests... useful on debugging testcases // Used to identify the server on tests... useful on debugging testcases
private String identity; private String identity;
private Thread backupActivationThread; private Thread activationThread;
private Activation activation; private Activation activation;
@ -514,7 +514,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (!haPolicy.isBackup()) { if (!haPolicy.isBackup()) {
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
activation.run(); if (haPolicy.isWaitForActivation()) {
activation.run();
} else {
if (logger.isTraceEnabled()) {
logger.trace("starting activation");
}
activationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
activationThread.start();
}
} }
// The activation on fail-back may change the value of isBackup, for that reason we are // The activation on fail-back may change the value of isBackup, for that reason we are
// checking again here // checking again here
@ -528,8 +536,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("starting backupActivation"); logger.trace("starting backupActivation");
} }
backupActivationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this)); activationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
backupActivationThread.start(); activationThread.start();
} else { } else {
ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : ""); ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : "");
} }
@ -583,24 +591,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return state; return state;
} }
public void interrupBackupThread(NodeManager nodeManagerInUse) throws InterruptedException { public void interruptActivationThread(NodeManager nodeManagerInUse) throws InterruptedException {
long timeout = 30000; long timeout = 30000;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout) { while (activationThread.isAlive() && System.currentTimeMillis() - start < timeout) {
if (nodeManagerInUse != null) { if (nodeManagerInUse != null) {
nodeManagerInUse.interrupt(); nodeManagerInUse.interrupt();
} }
backupActivationThread.interrupt(); activationThread.interrupt();
backupActivationThread.join(1000); activationThread.join(1000);
} }
if (System.currentTimeMillis() - start >= timeout) { if (System.currentTimeMillis() - start >= timeout) {
ActiveMQServerLogger.LOGGER.backupActivationTimeout(); ActiveMQServerLogger.LOGGER.activationTimeout();
threadDump(); threadDump();
} }
} }
@ -1021,16 +1029,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
} }
if (backupActivationThread != null) { if (activationThread != null) {
try { try {
backupActivationThread.join(30000); activationThread.join(30000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(backupActivationThread.getClass().getName()); ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(activationThread.getClass().getName());
} }
if (backupActivationThread.isAlive()) { if (activationThread.isAlive()) {
ActiveMQServerLogger.LOGGER.backupActivationDidntFinish(this); ActiveMQServerLogger.LOGGER.activationDidntFinish(this);
backupActivationThread.interrupt(); activationThread.interrupt();
} }
} }

View File

@ -362,7 +362,7 @@ public final class SharedNothingBackupActivation extends Activation {
// To avoid a NPE cause by the stop // To avoid a NPE cause by the stop
NodeManager nodeManagerInUse = activeMQServer.getNodeManager(); NodeManager nodeManagerInUse = activeMQServer.getNodeManager();
activeMQServer.interrupBackupThread(nodeManagerInUse); activeMQServer.interruptActivationThread(nodeManagerInUse);
if (nodeManagerInUse != null) { if (nodeManagerInUse != null) {
nodeManagerInUse.stopBackup(); nodeManagerInUse.stopBackup();

View File

@ -141,7 +141,7 @@ public final class SharedStoreBackupActivation extends Activation {
//we need to check as the servers policy may have changed //we need to check as the servers policy may have changed
if (activeMQServer.getHAPolicy().isBackup()) { if (activeMQServer.getHAPolicy().isBackup()) {
activeMQServer.interrupBackupThread(nodeManagerInUse); activeMQServer.interruptActivationThread(nodeManagerInUse);
if (nodeManagerInUse != null) { if (nodeManagerInUse != null) {
nodeManagerInUse.stopBackup(); nodeManagerInUse.stopBackup();

View File

@ -58,6 +58,10 @@ public final class SharedStoreLiveActivation extends LiveActivation {
logger.debug("announcing backup to the former live" + this); logger.debug("announcing backup to the former live" + this);
} }
activeMQServer.getBackupManager().start(); activeMQServer.getBackupManager().start();
if (!sharedStoreMasterPolicy.isWaitForActivation())
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
activeMQServer.getBackupManager().announceBackup(); activeMQServer.getBackupManager().announceBackup();
} }

View File

@ -2159,6 +2159,13 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="wait-for-activation" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Will the master startup wait until it is activated
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
</xsd:complexType> </xsd:complexType>
<xsd:complexType name="sharedStoreSlavePolicyType"> <xsd:complexType name="sharedStoreSlavePolicyType">

View File

@ -24,7 +24,9 @@ import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser; import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
@ -102,6 +104,23 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
assertEquals('*', wildCard.getSingleWord()); assertEquals('*', wildCard.getSingleWord());
} }
@Test
public void testParsingHaSharedStoreWaitForActivation() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
String configStr = firstPart + "<ha-policy><shared-store><master><wait-for-activation>false</wait-for-activation></master></shared-store></ha-policy>" + lastPart;
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration config = parser.parseMainConfig(input);
HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
assertTrue(haConfig instanceof SharedStoreMasterPolicyConfiguration);
SharedStoreMasterPolicyConfiguration masterConfig = (SharedStoreMasterPolicyConfiguration) haConfig;
assertFalse(masterConfig.isWaitForActivation());
}
@Test @Test
public void testParsingDefaultServerConfig() throws Exception { public void testParsingDefaultServerConfig() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser(); FileConfigurationParser parser = new FileConfigurationParser();

View File

@ -505,6 +505,12 @@ HA strategy shared store for `master`:
Note that if false you want failover to occur the you Note that if false you want failover to occur the you
can use the the management API as explained at [Management](management.md)</td> can use the the management API as explained at [Management](management.md)</td>
</tr> </tr>
<tr>
<td>`wait-for-activation`</td>
<td>If set to true then server startup will wait until it is activated.
If set to false then server startup will be done in the background.
Default is true.</td>
</tr>
</tbody> </tbody>
</table> </table>

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.Before;
import org.junit.Test;
public class SharedStoreDontWaitForActivationTest extends ClusterTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
setupServers();
}
private void setupServers() throws Exception {
// Two live servers with same shared storage, using a shared lock file
// 1. configure 0 as backup of one to share the same node manager and file
// storage locations
setupBackupServer(0, 1, isFileStorage(), true, isNetty());
setupLiveServer(1, isFileStorage(), true, isNetty(), false);
// now reconfigure the HA policy for both servers to master with automatic
// failover and wait-for-activation disabled.
setupSharedStoreMasterPolicy(0);
setupSharedStoreMasterPolicy(1);
// configure cluster for bother servers
setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
}
private void setupSharedStoreMasterPolicy(int node) {
ActiveMQServer server = getServer(node);
SharedStoreMasterPolicyConfiguration liveConfiguration = new SharedStoreMasterPolicyConfiguration();
liveConfiguration.setFailoverOnServerShutdown(true);
liveConfiguration.setWaitForActivation(false);
Configuration config = server.getConfiguration();
config.setHAPolicyConfiguration(liveConfiguration);
}
private boolean isNetty() {
return true;
}
@Test
public void startupLiveAndBackups() throws Exception {
ActiveMQServer server0 = getServer(0);
ActiveMQServer server1 = getServer(1);
server0.start();
// server 0 is live
assertTrue(server1.waitForActivation(5, TimeUnit.SECONDS));
server1.start();
// server 1 is backup
assertFalse(server1.waitForActivation(1, TimeUnit.SECONDS));
setupSessionFactory(0, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
server0.stop();
// now server 1 becomes live
assertTrue(server1.waitForActivation(5, TimeUnit.SECONDS));
server0.start();
// after restart, server 0 becomes backup
assertFalse(server0.waitForActivation(1, TimeUnit.SECONDS));
server1.stop();
// now server 0 becomes live again
assertTrue(server0.waitForActivation(5, TimeUnit.SECONDS));
server1.start();
// after restart, server 1 becomes backup again
assertFalse(server1.waitForActivation(1, TimeUnit.SECONDS));
}
}