This commit is contained in:
Clebert Suconic 2017-05-03 16:40:16 -04:00
commit 2d7dbc501c
16 changed files with 216 additions and 24 deletions

View File

@ -395,6 +395,9 @@ public final class ActiveMQDefaultConfiguration {
// Will this backup server come live on a normal server shutdown
private static boolean DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN = false;
// Will a shared-store master startup wait for activation
private static boolean DEFAULT_WAIT_FOR_ACTIVATION = true;
// Will the broker populate the message with the name of the validated user
private static boolean DEFAULT_POPULATE_VALIDATED_USER = false;
@ -1113,6 +1116,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
}
/**
* Will a shared-store master startup wait for activation
*/
public static boolean isDefaultWaitForActivation() {
return DEFAULT_WAIT_FOR_ACTIVATION;
}
/**
* Will the broker populate the message with the name of the validated user
*/

View File

@ -80,7 +80,7 @@ public final class ConfigurationUtils {
}
case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;
return new SharedStoreMasterPolicy(pc.isFailoverOnServerShutdown());
return new SharedStoreMasterPolicy(pc.isFailoverOnServerShutdown(), pc.isWaitForActivation());
}
case SHARED_STORE_SLAVE: {
SharedStoreSlavePolicyConfiguration pc = (SharedStoreSlavePolicyConfiguration) conf;

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfiguration {
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
private boolean waitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
public SharedStoreMasterPolicyConfiguration() {
}
@ -49,4 +50,13 @@ public class SharedStoreMasterPolicyConfiguration implements HAPolicyConfigurati
this.failoverOnServerShutdown = failoverOnServerShutdown;
return this;
}
public boolean isWaitForActivation() {
return waitForActivation;
}
public SharedStoreMasterPolicyConfiguration setWaitForActivation(Boolean waitForActivation) {
this.waitForActivation = waitForActivation;
return this;
}
}

View File

@ -1241,6 +1241,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
SharedStoreMasterPolicyConfiguration configuration = new SharedStoreMasterPolicyConfiguration();
configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown()));
configuration.setWaitForActivation(getBoolean(policyNode, "wait-for-activation", configuration.isWaitForActivation()));
return configuration;
}

View File

@ -40,6 +40,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import io.netty.channel.Channel;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -998,8 +999,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void errorCompletingCallbackOnReplicationManager(@Cause Throwable e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222158, value = "{0} backup activation thread did not finish.", format = Message.Format.MESSAGE_FORMAT)
void backupActivationDidntFinish(ActiveMQServer server);
@Message(id = 222158, value = "{0} activation thread did not finish.", format = Message.Format.MESSAGE_FORMAT)
void activationDidntFinish(ActiveMQServer server);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222159, value = "unable to send notification when broadcast group is stopped", format = Message.Format.MESSAGE_FORMAT)
@ -1209,9 +1210,9 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222201,
value = "Timed out waiting for backup activation to exit",
value = "Timed out waiting for activation to exit",
format = Message.Format.MESSAGE_FORMAT)
void backupActivationTimeout();
void activationTimeout();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222202,

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -39,6 +40,10 @@ public interface HAPolicy<T extends Activation> {
boolean isBackup();
default boolean isWaitForActivation() {
return ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
}
boolean canScaleDown();
/*

View File

@ -26,14 +26,16 @@ import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation;
public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
private boolean waitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
private SharedStoreSlavePolicy sharedStoreSlavePolicy;
public SharedStoreMasterPolicy() {
}
public SharedStoreMasterPolicy(boolean failoverOnServerShutdown) {
public SharedStoreMasterPolicy(boolean failoverOnServerShutdown, boolean waitForActivation) {
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.waitForActivation = waitForActivation;
}
@Deprecated
@ -53,6 +55,15 @@ public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
this.failoverOnServerShutdown = failoverOnServerShutdown;
}
@Override
public boolean isWaitForActivation() {
return waitForActivation;
}
public void setWaitForActivation(boolean waitForActivation) {
this.waitForActivation = waitForActivation;
}
public SharedStoreSlavePolicy getSharedStoreSlavePolicy() {
return sharedStoreSlavePolicy;
}

View File

@ -29,6 +29,8 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
private boolean isWaitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
//this is how we act once we have failed over
private SharedStoreMasterPolicy sharedStoreMasterPolicy;
@ -64,7 +66,7 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
public SharedStoreMasterPolicy getSharedStoreMasterPolicy() {
if (sharedStoreMasterPolicy == null) {
sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown);
sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown, isWaitForActivation);
}
return sharedStoreMasterPolicy;
}
@ -91,6 +93,10 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
this.allowAutoFailBack = allowAutoFailBack;
}
public void setIsWaitForActivation(boolean isWaitForActivation) {
this.isWaitForActivation = isWaitForActivation;
}
@Override
public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive,

View File

@ -306,7 +306,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Used to identify the server on tests... useful on debugging testcases
private String identity;
private Thread backupActivationThread;
private Thread activationThread;
private Activation activation;
@ -514,7 +514,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (!haPolicy.isBackup()) {
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
activation.run();
if (haPolicy.isWaitForActivation()) {
activation.run();
} else {
if (logger.isTraceEnabled()) {
logger.trace("starting activation");
}
activationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
activationThread.start();
}
}
// The activation on fail-back may change the value of isBackup, for that reason we are
// checking again here
@ -528,8 +536,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (logger.isTraceEnabled()) {
logger.trace("starting backupActivation");
}
backupActivationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
backupActivationThread.start();
activationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
activationThread.start();
} else {
ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : "");
}
@ -583,24 +591,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return state;
}
public void interrupBackupThread(NodeManager nodeManagerInUse) throws InterruptedException {
public void interruptActivationThread(NodeManager nodeManagerInUse) throws InterruptedException {
long timeout = 30000;
long start = System.currentTimeMillis();
while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout) {
while (activationThread.isAlive() && System.currentTimeMillis() - start < timeout) {
if (nodeManagerInUse != null) {
nodeManagerInUse.interrupt();
}
backupActivationThread.interrupt();
activationThread.interrupt();
backupActivationThread.join(1000);
activationThread.join(1000);
}
if (System.currentTimeMillis() - start >= timeout) {
ActiveMQServerLogger.LOGGER.backupActivationTimeout();
ActiveMQServerLogger.LOGGER.activationTimeout();
threadDump();
}
}
@ -1021,16 +1029,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
if (backupActivationThread != null) {
if (activationThread != null) {
try {
backupActivationThread.join(30000);
activationThread.join(30000);
} catch (InterruptedException e) {
ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(backupActivationThread.getClass().getName());
ActiveMQServerLogger.LOGGER.interruptWhilstStoppingComponent(activationThread.getClass().getName());
}
if (backupActivationThread.isAlive()) {
ActiveMQServerLogger.LOGGER.backupActivationDidntFinish(this);
backupActivationThread.interrupt();
if (activationThread.isAlive()) {
ActiveMQServerLogger.LOGGER.activationDidntFinish(this);
activationThread.interrupt();
}
}

View File

@ -362,7 +362,7 @@ public final class SharedNothingBackupActivation extends Activation {
// To avoid a NPE cause by the stop
NodeManager nodeManagerInUse = activeMQServer.getNodeManager();
activeMQServer.interrupBackupThread(nodeManagerInUse);
activeMQServer.interruptActivationThread(nodeManagerInUse);
if (nodeManagerInUse != null) {
nodeManagerInUse.stopBackup();

View File

@ -141,7 +141,7 @@ public final class SharedStoreBackupActivation extends Activation {
//we need to check as the servers policy may have changed
if (activeMQServer.getHAPolicy().isBackup()) {
activeMQServer.interrupBackupThread(nodeManagerInUse);
activeMQServer.interruptActivationThread(nodeManagerInUse);
if (nodeManagerInUse != null) {
nodeManagerInUse.stopBackup();

View File

@ -58,6 +58,10 @@ public final class SharedStoreLiveActivation extends LiveActivation {
logger.debug("announcing backup to the former live" + this);
}
activeMQServer.getBackupManager().start();
if (!sharedStoreMasterPolicy.isWaitForActivation())
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
activeMQServer.getBackupManager().announceBackup();
}

View File

@ -2159,6 +2159,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="wait-for-activation" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Will the master startup wait until it is activated
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>
<xsd:complexType name="sharedStoreSlavePolicyType">

View File

@ -24,7 +24,9 @@ import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
@ -102,6 +104,23 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
assertEquals('*', wildCard.getSingleWord());
}
@Test
public void testParsingHaSharedStoreWaitForActivation() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
String configStr = firstPart + "<ha-policy><shared-store><master><wait-for-activation>false</wait-for-activation></master></shared-store></ha-policy>" + lastPart;
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration config = parser.parseMainConfig(input);
HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
assertTrue(haConfig instanceof SharedStoreMasterPolicyConfiguration);
SharedStoreMasterPolicyConfiguration masterConfig = (SharedStoreMasterPolicyConfiguration) haConfig;
assertFalse(masterConfig.isWaitForActivation());
}
@Test
public void testParsingDefaultServerConfig() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();

View File

@ -505,6 +505,12 @@ HA strategy shared store for `master`:
Note that if false you want failover to occur the you
can use the the management API as explained at [Management](management.md)</td>
</tr>
<tr>
<td>`wait-for-activation`</td>
<td>If set to true then server startup will wait until it is activated.
If set to false then server startup will be done in the background.
Default is true.</td>
</tr>
</tbody>
</table>

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.Before;
import org.junit.Test;
public class SharedStoreDontWaitForActivationTest extends ClusterTestBase {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
setupServers();
}
private void setupServers() throws Exception {
// Two live servers with same shared storage, using a shared lock file
// 1. configure 0 as backup of one to share the same node manager and file
// storage locations
setupBackupServer(0, 1, isFileStorage(), true, isNetty());
setupLiveServer(1, isFileStorage(), true, isNetty(), false);
// now reconfigure the HA policy for both servers to master with automatic
// failover and wait-for-activation disabled.
setupSharedStoreMasterPolicy(0);
setupSharedStoreMasterPolicy(1);
// configure cluster for bother servers
setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
}
private void setupSharedStoreMasterPolicy(int node) {
ActiveMQServer server = getServer(node);
SharedStoreMasterPolicyConfiguration liveConfiguration = new SharedStoreMasterPolicyConfiguration();
liveConfiguration.setFailoverOnServerShutdown(true);
liveConfiguration.setWaitForActivation(false);
Configuration config = server.getConfiguration();
config.setHAPolicyConfiguration(liveConfiguration);
}
private boolean isNetty() {
return true;
}
@Test
public void startupLiveAndBackups() throws Exception {
ActiveMQServer server0 = getServer(0);
ActiveMQServer server1 = getServer(1);
server0.start();
// server 0 is live
assertTrue(server1.waitForActivation(5, TimeUnit.SECONDS));
server1.start();
// server 1 is backup
assertFalse(server1.waitForActivation(1, TimeUnit.SECONDS));
setupSessionFactory(0, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
server0.stop();
// now server 1 becomes live
assertTrue(server1.waitForActivation(5, TimeUnit.SECONDS));
server0.start();
// after restart, server 0 becomes backup
assertFalse(server0.waitForActivation(1, TimeUnit.SECONDS));
server1.stop();
// now server 0 becomes live again
assertTrue(server0.waitForActivation(5, TimeUnit.SECONDS));
server1.start();
// after restart, server 1 becomes backup again
assertFalse(server1.waitForActivation(1, TimeUnit.SECONDS));
}
}