This closes #895

This commit is contained in:
jbertram 2016-11-23 10:14:14 -06:00
commit a43c92a686
40 changed files with 1504 additions and 68 deletions

View File

@ -63,6 +63,11 @@ public class Create extends InputAbstract {
private static final Integer MQTT_PORT = 1883; private static final Integer MQTT_PORT = 1883;
/* **********************************************************************************
* Note for developers: These are tested at StreamClassPathTest on the unit test.
* This is to make sure maven or something else is not hiding these resources.
* ********************************************************************************** */
public static final String BIN_ARTEMIS_CMD = "bin/artemis.cmd"; public static final String BIN_ARTEMIS_CMD = "bin/artemis.cmd";
public static final String BIN_ARTEMIS_SERVICE_EXE = "bin/artemis-service.exe"; public static final String BIN_ARTEMIS_SERVICE_EXE = "bin/artemis-service.exe";
public static final String BIN_ARTEMIS_SERVICE_XML = "bin/artemis-service.xml"; public static final String BIN_ARTEMIS_SERVICE_XML = "bin/artemis-service.xml";
@ -90,6 +95,8 @@ public class Create extends InputAbstract {
public static final String ETC_HORNETQ_ACCEPTOR_TXT = "etc/hornetq-acceptor.txt"; public static final String ETC_HORNETQ_ACCEPTOR_TXT = "etc/hornetq-acceptor.txt";
public static final String ETC_MQTT_ACCEPTOR_TXT = "etc/mqtt-acceptor.txt"; public static final String ETC_MQTT_ACCEPTOR_TXT = "etc/mqtt-acceptor.txt";
public static final String ETC_STOMP_ACCEPTOR_TXT = "etc/stomp-acceptor.txt"; public static final String ETC_STOMP_ACCEPTOR_TXT = "etc/stomp-acceptor.txt";
public static final String ETC_PING_TXT = "etc/ping-settings.txt";
public static final String ETC_COMMENTED_PING_TXT = "etc/commented-ping-settings.txt";
@Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true) @Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true)
File directory; File directory;
@ -97,6 +104,9 @@ public class Create extends InputAbstract {
@Option(name = "--host", description = "The host name of the broker (Default: 0.0.0.0 or input if clustered)") @Option(name = "--host", description = "The host name of the broker (Default: 0.0.0.0 or input if clustered)")
String host; String host;
@Option(name = "--ping", description = "A comma separated string to be passed on to the broker config as network-check-list. The broker will shutdown when all these addresses are unreachable.")
String ping;
@Option(name = "--default-port", description = "The port number to use for the main 'artemis' acceptor (Default: 61616)") @Option(name = "--default-port", description = "The port number to use for the main 'artemis' acceptor (Default: 61616)")
int defaultPort = DEFAULT_PORT; int defaultPort = DEFAULT_PORT;
@ -512,7 +522,6 @@ public class Create extends InputAbstract {
} }
public Object run(ActionContext context) throws Exception { public Object run(ActionContext context) throws Exception {
if (forceLibaio && forceNIO) { if (forceLibaio && forceNIO) {
throw new RuntimeException("You can't specify --nio and --aio in the same execution."); throw new RuntimeException("You can't specify --nio and --aio in the same execution.");
} }
@ -535,6 +544,13 @@ public class Create extends InputAbstract {
filters.put("${persistence-enabled}", isDisablePersistence() ? "false" : "true"); filters.put("${persistence-enabled}", isDisablePersistence() ? "false" : "true");
if (ping != null && !ping.isEmpty()) {
filters.put("${ping}", ping);
filters.put("${ping-config.settings}", applyFilters(readTextFile(ETC_PING_TXT), filters));
} else {
filters.put("${ping-config.settings}", readTextFile(ETC_COMMENTED_PING_TXT));
}
if (replicated) { if (replicated) {
clustered = true; clustered = true;
filters.put("${replicated.settings}", applyFilters(readTextFile(ETC_REPLICATED_SETTINGS_TXT), filters)); filters.put("${replicated.settings}", applyFilters(readTextFile(ETC_REPLICATED_SETTINGS_TXT), filters));

View File

@ -151,7 +151,7 @@ public class Run extends LockAbstract {
} }
}, 500, 500); }, 500, 500);
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread("shutdown-hook") {
@Override @Override
public void run() { public void run() {
try { try {

View File

@ -50,8 +50,9 @@ under the License.
<journal-min-files>2</journal-min-files> <journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files> <journal-pool-files>-1</journal-pool-files>
${journal-buffer.settings}
${connector-config.settings} ${ping-config.settings}${journal-buffer.settings}${connector-config.settings}
<!-- how often we are looking for how many bytes are being used on the disk in ms --> <!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period> <disk-scan-period>5000</disk-scan-period>

View File

@ -0,0 +1,27 @@
<!--
You can specify the NIC you want to use to verify if the network
<network-check-NIC>theNickName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->

View File

@ -0,0 +1,26 @@
<!--
You can specify the NIC you want to use to verify if the network
<network-check-NIC>theNickName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<network-check-period>10000</network-check-period>
<network-check-timeout>1000</network-check-timeout>
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to check if your network is up.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, any successful ping will make the server OK to continue running -->
<network-check-list>${ping}</network-check-list>
<!-- use this to customize the ping used for ipv4 addresses -->
<network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command>
<!-- use this to customize the ping used for ipv6 addresses -->
<network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command>

View File

@ -47,6 +47,8 @@ import org.apache.activemq.artemis.cli.commands.user.RemoveUser;
import org.apache.activemq.artemis.cli.commands.user.ResetUser; import org.apache.activemq.artemis.cli.commands.user.ResetUser;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@ -605,6 +607,29 @@ public class ArtemisTest {
} }
} }
@Test
public void testPing() throws Exception {
File instanceFolder = temporaryFolder.newFolder("pingTest");
setupAuth(instanceFolder);
String queues = "q1,t2";
String topics = "t1,t2";
// This is usually set when run from the command line via artemis.profile
Run.setEmbedded(true);
Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--queues", queues, "--topics", topics, "--no-autotune", "--require-login", "--ping", "127.0.0.1");
System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
FileConfiguration fc = new FileConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(new File(instanceFolder, "./etc/broker.xml").toURI().toString());
deploymentManager.addDeployable(fc);
deploymentManager.readConfiguration();
Assert.assertEquals("127.0.0.1", fc.getNetworkCheckList());
}
private void testCli(String... args) { private void testCli(String... args) {
try { try {
Artemis.main(args); Artemis.main(args);

View File

@ -54,6 +54,9 @@ public class StreamClassPathTest {
openStream(Create.ETC_MQTT_ACCEPTOR_TXT); openStream(Create.ETC_MQTT_ACCEPTOR_TXT);
openStream(Create.ETC_HORNETQ_ACCEPTOR_TXT); openStream(Create.ETC_HORNETQ_ACCEPTOR_TXT);
openStream(Create.ETC_STOMP_ACCEPTOR_TXT); openStream(Create.ETC_STOMP_ACCEPTOR_TXT);
openStream(Create.ETC_PING_TXT);
openStream(Create.ETC_COMMENTED_PING_TXT);
} }
private void openStream(String source) throws Exception { private void openStream(String source) throws Exception {

View File

@ -17,12 +17,16 @@
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
/** /**
@ -31,7 +35,8 @@ import org.jboss.logging.Logger;
public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable { public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class); private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
private final ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService scheduledExecutorService;
private boolean startedOwnScheduler;
private long period; private long period;
private long millisecondsPeriod; private long millisecondsPeriod;
private TimeUnit timeUnit; private TimeUnit timeUnit;
@ -50,24 +55,41 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
boolean onDemand) { boolean onDemand) {
this.executor = executor; this.executor = executor;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
if (this.scheduledExecutorService == null) {
throw new NullPointerException("scheduled Executor is null");
}
this.period = checkPeriod; this.period = checkPeriod;
this.timeUnit = timeUnit; this.timeUnit = timeUnit;
this.onDemand = onDemand; this.onDemand = onDemand;
} }
/**
* This is useful for cases where we want our own scheduler executor.
*
* @param checkPeriod
* @param timeUnit
* @param onDemand
*/
public ActiveMQScheduledComponent(long checkPeriod, TimeUnit timeUnit, boolean onDemand) {
this(null, null, checkPeriod, timeUnit, onDemand);
}
@Override @Override
public synchronized void start() { public synchronized void start() {
if (future != null) { if (future != null) {
// already started
return;
}
if (scheduledExecutorService == null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, getThreadFactory());
startedOwnScheduler = true;
}
if (onDemand) {
return; return;
} }
this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS); this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
if (onDemand) {
return;
}
if (period >= 0) { if (period >= 0) {
future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, period, period, timeUnit); future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, period, period, timeUnit);
} else { } else {
@ -75,6 +97,20 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
} }
} }
protected ActiveMQThreadFactory getThreadFactory() {
return new ActiveMQThreadFactory(this.getClass().getSimpleName() + "-scheduled-threads", false, getThisClassLoader());
}
private ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
@Override
public ClassLoader run() {
return ActiveMQScheduledComponent.this.getClass().getClassLoader();
}
});
}
public void delay() { public void delay() {
int value = delayed.incrementAndGet(); int value = delayed.incrementAndGet();
if (value > 10) { if (value > 10) {
@ -109,12 +145,15 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
@Override @Override
public synchronized void stop() { public synchronized void stop() {
if (future == null) { if (future != null) {
return; // no big deal
}
future.cancel(false); future.cancel(false);
future = null; future = null;
}
if (startedOwnScheduler) {
this.scheduledExecutorService.shutdownNow();
scheduledExecutorService = null;
startedOwnScheduler = false;
}
} }
@ -154,7 +193,11 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
final Runnable runForScheduler = new Runnable() { final Runnable runForScheduler = new Runnable() {
@Override @Override
public void run() { public void run() {
if (executor != null) {
executor.execute(runForExecutor); executor.execute(runForExecutor);
} else {
runForExecutor.run();
}
} }
}; };

View File

@ -0,0 +1,371 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.URL;
import java.net.URLConnection;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.jboss.logging.Logger;
/**
* This will use {@link InetAddress#isReachable(int)} to determine if the network is alive.
* It will have a set of addresses, and if any address is reached the network will be considered alive.
*/
public class NetworkHealthCheck extends ActiveMQScheduledComponent {
private static final Logger logger = Logger.getLogger(NetworkHealthCheck.class);
private final Set<ActiveMQComponent> componentList = new ConcurrentHashSet<>();
private final Set<InetAddress> addresses = new ConcurrentHashSet<>();
private final Set<URL> urls = new ConcurrentHashSet<>();
private NetworkInterface networkInterface;
public static final String IPV6_DEFAULT_COMMAND = "ping6 -c 1 %2$s";
public static final String IPV4_DEFAULT_COMMAND = "ping -c 1 -t %d %s";
private String ipv4Command = IPV4_DEFAULT_COMMAND;
private String ipv6Command = IPV6_DEFAULT_COMMAND;
/**
* The timeout to be used on isReachable
*/
private int networkTimeout;
public NetworkHealthCheck() {
this(null, 1000, 1000);
}
public NetworkHealthCheck(String nicName, long checkPeriod, int networkTimeout) {
super(null, null, checkPeriod, TimeUnit.MILLISECONDS, false);
this.networkTimeout = networkTimeout;
this.setNICName(nicName);
}
public NetworkHealthCheck setNICName(String nicName) {
NetworkInterface netToUse;
try {
if (nicName != null) {
netToUse = NetworkInterface.getByName(nicName);
} else {
netToUse = null;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
netToUse = null;
}
this.networkInterface = netToUse;
return this;
}
public String getNICName() {
if (networkInterface != null) {
return networkInterface.getName();
} else {
return null;
}
}
public NetworkHealthCheck parseAddressList(String addressList) {
if (addressList != null) {
String[] addresses = addressList.split(",");
for (String address : addresses) {
try {
this.addAddress(InetAddress.getByName(address));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
return this;
}
public NetworkHealthCheck parseURIList(String addressList) {
if (addressList != null) {
String[] addresses = addressList.split(",");
for (String address : addresses) {
try {
this.addURL(new URL(address));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
return this;
}
@Override
protected ActiveMQThreadFactory getThreadFactory() {
return new ActiveMQThreadFactory("NetworkChecker", "Network-Checker-", false, getThisClassLoader());
}
private ClassLoader getThisClassLoader() {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
@Override
public ClassLoader run() {
return NetworkHealthCheck.this.getClass().getClassLoader();
}
});
}
public int getNetworkTimeout() {
return networkTimeout;
}
@Override
public NetworkHealthCheck setPeriod(long period) {
super.setPeriod(period);
return this;
}
@Override
public NetworkHealthCheck setTimeUnit(TimeUnit timeUnit) {
super.setTimeUnit(timeUnit);
return this;
}
public NetworkHealthCheck setNetworkTimeout(int networkTimeout) {
this.networkTimeout = networkTimeout;
return this;
}
public NetworkHealthCheck addComponent(ActiveMQComponent component) {
componentList.add(component);
checkStart();
return this;
}
public NetworkHealthCheck clearComponents() {
componentList.clear();
return this;
}
public NetworkHealthCheck addAddress(InetAddress address) {
if (!check(address)) {
logger.warn("Ping Address " + address + " wasn't reacheable");
}
addresses.add(address);
checkStart();
return this;
}
public NetworkHealthCheck removeAddress(InetAddress address) {
addresses.remove(address);
return this;
}
public NetworkHealthCheck clearAddresses() {
addresses.clear();
return this;
}
public NetworkHealthCheck addURL(URL url) {
if (!check(url)) {
logger.warn("Ping url " + url + " wasn't reacheable");
}
urls.add(url);
checkStart();
return this;
}
public NetworkHealthCheck removeURL(URL url) {
urls.remove(url);
return this;
}
public NetworkHealthCheck clearURL() {
urls.clear();
return this;
}
public String getIpv4Command() {
return ipv4Command;
}
public NetworkHealthCheck setIpv4Command(String ipv4Command) {
this.ipv4Command = ipv4Command;
return this;
}
public String getIpv6Command() {
return ipv6Command;
}
public NetworkHealthCheck setIpv6Command(String ipv6Command) {
this.ipv6Command = ipv6Command;
return this;
}
private void checkStart() {
if (!isStarted() && (!addresses.isEmpty() || !urls.isEmpty()) && !componentList.isEmpty()) {
start();
}
}
@Override
public void run() {
boolean healthy = check();
if (healthy) {
for (ActiveMQComponent component : componentList) {
if (!component.isStarted()) {
try {
logger.info("Network is healthy, starting service " + component);
component.start();
} catch (Exception e) {
logger.warn("Error starting component " + component, e);
}
}
}
} else {
for (ActiveMQComponent component : componentList) {
if (component.isStarted()) {
try {
logger.info("Network is unhealthy, stopping service " + component);
component.stop();
} catch (Exception e) {
logger.warn("Error stopping component " + component, e);
}
}
}
}
}
public boolean check() {
boolean isEmpty = true;
for (InetAddress address : addresses) {
isEmpty = false;
if (check(address)) {
return true;
}
}
for (URL url : urls) {
isEmpty = false;
if (check(url)) {
return true;
}
}
// This should return true if no checks were done, on this case it's empty
// This is tested by {@link NetworkHe
return isEmpty;
}
public boolean check(InetAddress address) {
try {
if (address.isReachable(networkInterface, 0, networkTimeout)) {
if (logger.isTraceEnabled()) {
logger.tracef(address + " OK");
}
return true;
} else {
return purePing(address);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
}
public boolean purePing(InetAddress address) throws IOException, InterruptedException {
long timeout = Math.max(1, TimeUnit.MILLISECONDS.toSeconds(networkTimeout));
// it did not work with a simple isReachable, it could be because there's no root access, so we will try ping executable
if (logger.isTraceEnabled()) {
logger.trace("purePing on canonical address " + address.getCanonicalHostName());
}
ProcessBuilder processBuilder;
if (address instanceof Inet6Address) {
processBuilder = buildProcess(ipv6Command, timeout, address.getCanonicalHostName());
} else {
processBuilder = buildProcess(ipv4Command, timeout, address.getCanonicalHostName());
}
Process pingProcess = processBuilder.start();
readStream(pingProcess.getInputStream(), false);
readStream(pingProcess.getErrorStream(), true);
return pingProcess.waitFor() == 0;
}
private ProcessBuilder buildProcess(String expressionCommand, long timeout, String host) {
String command = String.format(expressionCommand, timeout, host);
if (logger.isDebugEnabled()) {
logger.debug("executing ping:: " + command);
}
ProcessBuilder builder = new ProcessBuilder(command.split(" "));
return builder;
}
private void readStream(InputStream stream, boolean error) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
String inputLine;
while ((inputLine = reader.readLine()) != null) {
if (error) {
logger.warn(inputLine);
} else {
logger.trace(inputLine);
}
}
reader.close();
}
public boolean check(URL url) {
try {
URLConnection connection = url.openConnection();
connection.setReadTimeout(networkTimeout);
InputStream is = connection.getInputStream();
is.close();
return true;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
}
}

View File

@ -36,6 +36,8 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
private final AccessControlContext acc; private final AccessControlContext acc;
private final String prefix;
/** /**
* Construct a new instance. The access control context of the calling thread will be the one used to create * Construct a new instance. The access control context of the calling thread will be the one used to create
* new threads if a security manager is installed. * new threads if a security manager is installed.
@ -45,8 +47,22 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
* @param tccl the context class loader of newly created threads * @param tccl the context class loader of newly created threads
*/ */
public ActiveMQThreadFactory(final String groupName, final boolean daemon, final ClassLoader tccl) { public ActiveMQThreadFactory(final String groupName, final boolean daemon, final ClassLoader tccl) {
this(groupName, "Thread-", daemon, tccl);
}
/**
* Construct a new instance. The access control context of the calling thread will be the one used to create
* new threads if a security manager is installed.
*
* @param groupName the name of the thread group to assign threads to by default
* @param daemon whether the created threads should be daemon threads
* @param tccl the context class loader of newly created threads
*/
public ActiveMQThreadFactory(final String groupName, String prefix, final boolean daemon, final ClassLoader tccl) {
group = new ThreadGroup(groupName + "-" + System.identityHashCode(this)); group = new ThreadGroup(groupName + "-" + System.identityHashCode(this));
this.prefix = prefix;
this.threadPriority = Thread.NORM_PRIORITY; this.threadPriority = Thread.NORM_PRIORITY;
this.tccl = tccl; this.tccl = tccl;
@ -81,7 +97,7 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
} }
private Thread createThread(final Runnable command) { private Thread createThread(final Runnable command) {
final Thread t = new Thread(group, command, "Thread-" + threadCount.getAndIncrement() + " (" + group.getName() + ")"); final Thread t = new Thread(group, command, prefix + threadCount.getAndIncrement() + " (" + group.getName() + ")");
t.setDaemon(daemon); t.setDaemon(daemon);
t.setPriority(threadPriority); t.setPriority(threadPriority);
t.setContextClassLoader(tccl); t.setContextClassLoader(tccl);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.utils; package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -77,4 +78,65 @@ public class ActiveMQScheduledComponentTest {
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5); Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
} }
@Test
public void testUsingOwnExecutors() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(10, TimeUnit.MILLISECONDS, false) {
@Override
public void run() {
latch.countDown();
}
};
local.start();
local.start(); // should be ok to call start again
try {
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
// re-scheduling the executor at a big interval..
// just to make sure it won't hung
local.setTimeUnit(TimeUnit.HOURS);
local.setPeriod(1);
} finally {
local.stop();
local.stop(); // should be ok to call stop again
}
}
@Test
public void testUsingOwnExecutorsOnDemand() throws Throwable {
final ReusableLatch latch = new ReusableLatch(1);
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(10, TimeUnit.MILLISECONDS, true) {
@Override
public void run() {
latch.countDown();
}
};
local.start();
local.start(); // should be ok to call start again
try {
Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));
local.delay();
Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS));
latch.setCount(1);
Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));
// re-scheduling the executor at a big interval..
// just to make sure it won't hung
local.setTimeUnit(TimeUnit.HOURS);
local.setPeriod(1);
} finally {
local.stop();
local.stop(); // calling stop again should not be an issue.
}
}
} }

View File

@ -0,0 +1,216 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class NetworkHealthTest {
private static final InetAddress INVALID_ADDRESS;
private static String IPV6_LOCAL = "::1";
static {
InetAddress address = null;
try {
address = InetAddress.getByName("203.0.113.1");
} catch (Exception e) {
e.printStackTrace();
}
INVALID_ADDRESS = address;
}
Set<NetworkHealthCheck> list = new HashSet<>();
NetworkHealthCheck addCheck(NetworkHealthCheck check) {
list.add(check);
return check;
}
HttpServer httpServer;
final ReusableLatch latch = new ReusableLatch(1);
ActiveMQComponent component = new ActiveMQComponent() {
boolean started = true;
@Override
public void start() throws Exception {
started = true;
latch.countDown();
}
@Override
public void stop() throws Exception {
started = false;
latch.countDown();
}
@Override
public boolean isStarted() {
return started;
}
};
@Before
public void before() throws Exception {
latch.setCount(1);
}
private void startHTTPServer() throws IOException {
Assert.assertNull(httpServer);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8080);
httpServer = HttpServer.create(address, 100);
httpServer.start();
httpServer.createContext("/", new HttpHandler() {
@Override
public void handle(HttpExchange t) throws IOException {
String response = "<html><body><b>This is a unit test</b></body></html>";
t.sendResponseHeaders(200, response.length());
OutputStream os = t.getResponseBody();
os.write(response.getBytes());
os.close();
}
});
}
private void stopHTTPServer() {
if (httpServer != null) {
try {
httpServer.stop(0);
} catch (Throwable ignored) {
}
httpServer = null;
}
}
@After
public void after() {
stopHTTPServer();
for (NetworkHealthCheck check : this.list) {
check.stop();
}
}
@Test
public void testCheck6() throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));
check.addComponent(component);
InetAddress address = InetAddress.getByName(IPV6_LOCAL);
Assert.assertTrue(address instanceof Inet6Address);
Assert.assertTrue(check.purePing(address));
Assert.assertTrue(check.check(address));
}
@Test
public void testPings() throws Exception {
doCheck("127.0.0.1");
}
private void doCheck(String localaddress) throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));
check.addComponent(component);
// Any external IP, to make sure we would use a PING
InetAddress address = InetAddress.getByName(localaddress);
Assert.assertTrue(check.check(address));
Assert.assertTrue(check.purePing(address));
Assert.assertFalse(check.purePing(INVALID_ADDRESS));
}
@Test
public void testPingsIPV6() throws Exception {
doCheck(IPV6_LOCAL);
}
@Test
public void testCheckNoNodes() throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck());
Assert.assertTrue(check.check());
}
@Test
public void testCheckUsingHTTP() throws Exception {
startHTTPServer();
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 1000));
Assert.assertTrue(check.check(new URL("http://localhost:8080")));
stopHTTPServer();
Assert.assertFalse(check.check(new URL("http://localhost:8080")));
check.addComponent(component);
URL url = new URL("http://localhost:8080");
Assert.assertFalse(check.check(url));
startHTTPServer();
Assert.assertTrue(check.check(url));
check.addURL(url);
Assert.assertFalse(latch.await(500, TimeUnit.MILLISECONDS));
Assert.assertTrue(component.isStarted());
// stopping the web server should stop the component
stopHTTPServer();
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Assert.assertFalse(component.isStarted());
latch.setCount(1);
startHTTPServer();
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Assert.assertTrue(component.isStarted());
}
}

View File

@ -440,6 +440,16 @@ public final class ActiveMQDefaultConfiguration {
public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig."; public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig.";
public static String DEFAULT_NETWORK_CHECK_LIST = null;
public static String DEFAULT_NETWORK_CHECK_URL_LIST = null;
public static long DEFAULT_NETWORK_CHECK_PERIOD = 5000;
public static int DEFAULT_NETWORK_CHECK_TIMEOUT = 1000;
public static String DEFAULT_NETWORK_CHECK_NIC = null;
/** /**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/ */
@ -1181,4 +1191,25 @@ public final class ActiveMQDefaultConfiguration {
public static String getDefaultSystemPropertyPrefix() { public static String getDefaultSystemPropertyPrefix() {
return DEFAULT_SYSTEM_PROPERTY_PREFIX; return DEFAULT_SYSTEM_PROPERTY_PREFIX;
} }
public static String getDefaultNetworkCheckList() {
return DEFAULT_NETWORK_CHECK_LIST;
}
public static String getDefaultNetworkCheckURLList() {
return DEFAULT_NETWORK_CHECK_URL_LIST;
}
public static long getDefaultNetworkCheckPeriod() {
return DEFAULT_NETWORK_CHECK_PERIOD;
}
public static int getDefaultNetworkCheckTimeout() {
return DEFAULT_NETWORK_CHECK_TIMEOUT;
}
public static String getDefaultNetworkCheckNic() {
return DEFAULT_NETWORK_CHECK_NIC;
}
} }

View File

@ -1027,4 +1027,40 @@ public interface Configuration {
int getDiskScanPeriod(); int getDiskScanPeriod();
/** A comma separated list of IPs we could use to validate if the network is UP.
* In case of none of these Ips are reached (if configured) the server will be shutdown. */
Configuration setNetworkCheckList(String list);
String getNetworkCheckList();
/** A comma separated list of URIs we could use to validate if the network is UP.
* In case of none of these Ips are reached (if configured) the server will be shutdown.
* The difference from networkCheckList is that we will use HTTP to make this validation. */
Configuration setNetworkCheckURLList(String uris);
String getNetworkCheckURLList();
/** The interval on which we will perform network checks. */
Configuration setNetworkCheckPeriod(long period);
long getNetworkCheckPeriod();
/** Time in ms for how long we should wait for a ping to finish. */
Configuration setNetworkCheckTimeout(int timeout);
int getNetworkCheckTimeout();
/** The NIC name to be used on network checks */
Configuration setNetworCheckNIC(String nic);
String getNetworkCheckNIC();
String getNetworkCheckPingCommand();
Configuration setNetworkCheckPingCommand(String command);
String getNetworkCheckPing6Command();
Configuration setNetworkCheckPing6Command(String command);
} }

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ColocatedPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ColocatedPolicy;
@ -52,7 +53,7 @@ public final class ConfigurationUtils {
throw new ActiveMQIllegalStateException("Missing cluster-configuration for replication-clustername '" + replicationCluster + "'."); throw new ActiveMQIllegalStateException("Missing cluster-configuration for replication-clustername '" + replicationCluster + "'.");
} }
public static HAPolicy getHAPolicy(HAPolicyConfiguration conf) throws ActiveMQIllegalStateException { public static HAPolicy getHAPolicy(HAPolicyConfiguration conf, ActiveMQServer server) throws ActiveMQIllegalStateException {
if (conf == null) { if (conf == null) {
return new LiveOnlyPolicy(); return new LiveOnlyPolicy();
} }
@ -64,11 +65,11 @@ public final class ConfigurationUtils {
} }
case REPLICATED: { case REPLICATED: {
ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf; ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout()); return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(), pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck());
} }
case REPLICA: { case REPLICA: {
ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf; ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration())); return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(), pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(), getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck());
} }
case SHARED_STORE_MASTER: { case SHARED_STORE_MASTER: {
SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf; SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration) conf;
@ -85,22 +86,22 @@ public final class ConfigurationUtils {
HAPolicy livePolicy; HAPolicy livePolicy;
//if null default to colocated //if null default to colocated
if (liveConf == null) { if (liveConf == null) {
livePolicy = new ReplicatedPolicy(); livePolicy = new ReplicatedPolicy(server.getNetworkHealthCheck());
} else { } else {
livePolicy = getHAPolicy(liveConf); livePolicy = getHAPolicy(liveConf, server);
} }
HAPolicyConfiguration backupConf = pc.getBackupConfig(); HAPolicyConfiguration backupConf = pc.getBackupConfig();
BackupPolicy backupPolicy; BackupPolicy backupPolicy;
if (backupConf == null) { if (backupConf == null) {
if (livePolicy instanceof ReplicatedPolicy) { if (livePolicy instanceof ReplicatedPolicy) {
backupPolicy = new ReplicaPolicy(); backupPolicy = new ReplicaPolicy(server.getNetworkHealthCheck());
} else if (livePolicy instanceof SharedStoreMasterPolicy) { } else if (livePolicy instanceof SharedStoreMasterPolicy) {
backupPolicy = new SharedStoreSlavePolicy(); backupPolicy = new SharedStoreSlavePolicy();
} else { } else {
throw ActiveMQMessageBundle.BUNDLE.liveBackupMismatch(); throw ActiveMQMessageBundle.BUNDLE.liveBackupMismatch();
} }
} else { } else {
backupPolicy = (BackupPolicy) getHAPolicy(backupConf); backupPolicy = (BackupPolicy) getHAPolicy(backupConf, server);
} }
if ((livePolicy instanceof ReplicatedPolicy && !(backupPolicy instanceof ReplicaPolicy)) || (livePolicy instanceof SharedStoreMasterPolicy && !(backupPolicy instanceof SharedStoreSlavePolicy))) { if ((livePolicy instanceof ReplicatedPolicy && !(backupPolicy instanceof ReplicaPolicy)) || (livePolicy instanceof SharedStoreMasterPolicy && !(backupPolicy instanceof SharedStoreSlavePolicy))) {

View File

@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -261,6 +262,20 @@ public class ConfigurationImpl implements Configuration, Serializable {
private String systemPropertyPrefix = ActiveMQDefaultConfiguration.getDefaultSystemPropertyPrefix(); private String systemPropertyPrefix = ActiveMQDefaultConfiguration.getDefaultSystemPropertyPrefix();
private String networkCheckList = ActiveMQDefaultConfiguration.getDefaultNetworkCheckList();
private String networkURLList = ActiveMQDefaultConfiguration.getDefaultNetworkCheckURLList();
private long networkCheckPeriod = ActiveMQDefaultConfiguration.getDefaultNetworkCheckPeriod();
private int networkCheckTimeout = ActiveMQDefaultConfiguration.getDefaultNetworkCheckTimeout();
private String networkCheckNIC = ActiveMQDefaultConfiguration.getDefaultNetworkCheckNic();
private String networkCheckPingCommand = NetworkHealthCheck.IPV4_DEFAULT_COMMAND;
private String networkCheckPing6Command = NetworkHealthCheck.IPV6_DEFAULT_COMMAND;
/** /**
* Parent folder for all data folders. * Parent folder for all data folders.
*/ */
@ -279,7 +294,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return systemPropertyPrefix; return systemPropertyPrefix;
} }
@Override @Override
public Configuration parseSystemProperties() throws Exception { public Configuration parseSystemProperties() throws Exception {
parseSystemProperties(System.getProperties()); parseSystemProperties(System.getProperties());
@ -291,7 +305,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
Map<String, Object> beanProperties = new HashMap<>(); Map<String, Object> beanProperties = new HashMap<>();
for (Map.Entry<Object, Object> entry : properties.entrySet()) { for (Map.Entry<Object, Object> entry : properties.entrySet()) {
if (entry.getKey().toString().startsWith(systemPropertyPrefix)) { if (entry.getKey().toString().startsWith(systemPropertyPrefix)) {
String key = entry.getKey().toString().substring(systemPropertyPrefix.length()); String key = entry.getKey().toString().substring(systemPropertyPrefix.length());
@ -307,7 +320,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this; return this;
} }
@Override @Override
public boolean isClustered() { public boolean isClustered() {
return !getClusterConfigurations().isEmpty(); return !getClusterConfigurations().isEmpty();
@ -1874,6 +1886,89 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this; return this;
} }
@Override
public ConfigurationImpl setNetworkCheckList(String list) {
this.networkCheckList = list;
return this;
}
@Override
public String getNetworkCheckList() {
return networkCheckList;
}
@Override
public ConfigurationImpl setNetworkCheckURLList(String urls) {
this.networkURLList = urls;
return this;
}
@Override
public String getNetworkCheckURLList() {
return networkURLList;
}
/**
* The interval on which we will perform network checks.
*/
@Override
public ConfigurationImpl setNetworkCheckPeriod(long period) {
this.networkCheckPeriod = period;
return this;
}
@Override
public long getNetworkCheckPeriod() {
return this.networkCheckPeriod;
}
/**
* Time in ms for how long we should wait for a ping to finish.
*/
@Override
public ConfigurationImpl setNetworkCheckTimeout(int timeout) {
this.networkCheckTimeout = timeout;
return this;
}
@Override
public int getNetworkCheckTimeout() {
return this.networkCheckTimeout;
}
@Override
public Configuration setNetworCheckNIC(String nic) {
this.networkCheckNIC = nic;
return this;
}
@Override
public String getNetworkCheckNIC() {
return networkCheckNIC;
}
@Override
public String getNetworkCheckPingCommand() {
return networkCheckPingCommand;
}
@Override
public ConfigurationImpl setNetworkCheckPingCommand(String command) {
this.networkCheckPingCommand = command;
return this;
}
@Override
public String getNetworkCheckPing6Command() {
return networkCheckPing6Command;
}
@Override
public Configuration setNetworkCheckPing6Command(String command) {
this.networkCheckPing6Command = command;
return this;
}
/** /**
* It will find the right location of a subFolder, related to artemisInstance * It will find the right location of a subFolder, related to artemisInstance
*/ */

View File

@ -538,7 +538,21 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setMemoryWarningThreshold(getInteger(e, "memory-warning-threshold", config.getMemoryWarningThreshold(), Validators.PERCENTAGE)); config.setMemoryWarningThreshold(getInteger(e, "memory-warning-threshold", config.getMemoryWarningThreshold(), Validators.PERCENTAGE));
config.setMemoryMeasureInterval(getLong(e, "memory-measure-interval", config.getMemoryMeasureInterval(), Validators.MINUS_ONE_OR_GT_ZERO)); // in config.setMemoryMeasureInterval(getLong(e, "memory-measure-interval", config.getMemoryMeasureInterval(), Validators.MINUS_ONE_OR_GT_ZERO));
config.setNetworkCheckList(getString(e, "network-check-list", config.getNetworkCheckList(), Validators.NO_CHECK));
config.setNetworkCheckURLList(getString(e, "network-check-URL-list", config.getNetworkCheckURLList(), Validators.NO_CHECK));
config.setNetworkCheckPeriod(getLong(e, "network-check-period", config.getNetworkCheckPeriod(), Validators.GT_ZERO));
config.setNetworkCheckTimeout(getInteger(e, "network-check-timeout", config.getNetworkCheckTimeout(), Validators.GT_ZERO));
config.setNetworCheckNIC(getString(e, "network-check-NIC", config.getNetworkCheckNIC(), Validators.NO_CHECK));
config.setNetworkCheckPing6Command(getString(e, "network-check-ping6-command", config.getNetworkCheckPing6Command(), Validators.NO_CHECK));
config.setNetworkCheckPingCommand(getString(e, "network-check-ping-command", config.getNetworkCheckPingCommand(), Validators.NO_CHECK));
parseAddressSettings(e, config); parseAddressSettings(e, config);

View File

@ -87,9 +87,12 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void setContext(OperationContext context); void setContext(OperationContext context);
/** /**
* @param ioCriticalError is the server being stopped due to an IO critical error *
* @param ioCriticalError is the server being stopped due to an IO critical error.
* @param sendFailover this is to send the replication stopping in case of replication.
* @throws Exception
*/ */
void stop(boolean ioCriticalError) throws Exception; void stop(boolean ioCriticalError, boolean sendFailover) throws Exception;
// Message related operations // Message related operations

View File

@ -1437,7 +1437,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
@Override @Override
public void stop() throws Exception { public void stop() throws Exception {
stop(false); stop(false, true);
} }
@Override @Override
@ -1454,7 +1454,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
protected abstract void performCachedLargeMessageDeletes(); protected abstract void performCachedLargeMessageDeletes();
@Override @Override
public synchronized void stop(boolean ioCriticalError) throws Exception { public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
if (!started) { if (!started) {
return; return;
} }

View File

@ -75,7 +75,7 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
} }
@Override @Override
public synchronized void stop(boolean ioCriticalError) throws Exception { public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
if (!started) { if (!started) {
return; return;
} }

View File

@ -175,7 +175,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override @Override
public void stop() throws Exception { public void stop() throws Exception {
stop(false); stop(false, true);
} }
public boolean isReplicated() { public boolean isReplicated() {
@ -193,7 +193,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
} }
@Override @Override
public synchronized void stop(boolean ioCriticalError) throws Exception { public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
if (!started) { if (!started) {
return; return;
} }
@ -224,6 +224,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// and we want to ensure a stop here just in case // and we want to ensure a stop here just in case
ReplicationManager replicatorInUse = replicator; ReplicationManager replicatorInUse = replicator;
if (replicatorInUse != null) { if (replicatorInUse != null) {
if (sendFailover) {
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER); final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
if (token != null) { if (token != null) {
try { try {
@ -232,6 +233,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// ignore it // ignore it
} }
} }
}
replicatorInUse.stop(); replicatorInUse.stop();
} }
bindingsJournal.stop(); bindingsJournal.stop();

View File

@ -507,7 +507,7 @@ public class NullStorageManager implements StorageManager {
} }
@Override @Override
public void stop(final boolean ioCriticalError) throws Exception { public void stop(final boolean ioCriticalError, boolean sendFailover) throws Exception {
} }
@Override @Override

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityAuth;
@ -92,10 +93,17 @@ public interface ActiveMQServer extends ActiveMQComponent {
ActiveMQSecurityManager getSecurityManager(); ActiveMQSecurityManager getSecurityManager();
NetworkHealthCheck getNetworkHealthCheck();
Version getVersion(); Version getVersion();
NodeManager getNodeManager(); NodeManager getNodeManager();
/**
* @return
*/
ReplicationEndpoint getReplicationEndpoint();
/** /**
* it will release hold a lock for the activation. * it will release hold a lock for the activation.
*/ */

View File

@ -1265,6 +1265,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222212, value = "Disk Full! Blocking message production on address ''{0}''. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT) @Message(id = 222212, value = "Disk Full! Blocking message production on address ''{0}''. Clients will report blocked.", format = Message.Format.MESSAGE_FORMAT)
void blockingDiskFull(SimpleString addressName); void blockingDiskFull(SimpleString addressName);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222213,
value = "There was an issue on the network, server is isolated!",
format = Message.Format.MESSAGE_FORMAT)
void serverIsolatedOnNetwork();
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
@ -40,7 +41,10 @@ public class ReplicaPolicy extends BackupPolicy {
private ReplicatedPolicy replicatedPolicy; private ReplicatedPolicy replicatedPolicy;
public ReplicaPolicy() { private final NetworkHealthCheck networkHealthCheck;
public ReplicaPolicy(final NetworkHealthCheck networkHealthCheck) {
this.networkHealthCheck = networkHealthCheck;
} }
public ReplicaPolicy(String clusterName, public ReplicaPolicy(String clusterName,
@ -49,7 +53,8 @@ public class ReplicaPolicy extends BackupPolicy {
boolean restartBackup, boolean restartBackup,
boolean allowFailback, boolean allowFailback,
long initialReplicationSyncTimeout, long initialReplicationSyncTimeout,
ScaleDownPolicy scaleDownPolicy) { ScaleDownPolicy scaleDownPolicy,
NetworkHealthCheck networkHealthCheck) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName; this.groupName = groupName;
@ -57,16 +62,19 @@ public class ReplicaPolicy extends BackupPolicy {
this.allowFailback = allowFailback; this.allowFailback = allowFailback;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.scaleDownPolicy = scaleDownPolicy; this.scaleDownPolicy = scaleDownPolicy;
this.networkHealthCheck = networkHealthCheck;
} }
public ReplicaPolicy(String clusterName, public ReplicaPolicy(String clusterName,
int maxSavedReplicatedJournalsSize, int maxSavedReplicatedJournalsSize,
String groupName, String groupName,
ReplicatedPolicy replicatedPolicy) { ReplicatedPolicy replicatedPolicy,
NetworkHealthCheck networkHealthCheck) {
this.clusterName = clusterName; this.clusterName = clusterName;
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
this.groupName = groupName; this.groupName = groupName;
this.replicatedPolicy = replicatedPolicy; this.replicatedPolicy = replicatedPolicy;
this.networkHealthCheck = networkHealthCheck;
} }
public String getClusterName() { public String getClusterName() {
@ -87,7 +95,7 @@ public class ReplicaPolicy extends BackupPolicy {
public ReplicatedPolicy getReplicatedPolicy() { public ReplicatedPolicy getReplicatedPolicy() {
if (replicatedPolicy == null) { if (replicatedPolicy == null) {
replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this); replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout, groupName, clusterName, this, networkHealthCheck);
} }
return replicatedPolicy; return replicatedPolicy;
} }
@ -162,7 +170,7 @@ public class ReplicaPolicy extends BackupPolicy {
boolean wasLive, boolean wasLive,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception { ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception {
SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this); SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this, networkHealthCheck);
backupActivation.init(); backupActivation.init();
return backupActivation; return backupActivation;
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveActivation; import org.apache.activemq.artemis.core.server.impl.LiveActivation;
import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation; import org.apache.activemq.artemis.core.server.impl.SharedNothingLiveActivation;
@ -44,18 +45,23 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
* */ * */
private ReplicaPolicy replicaPolicy; private ReplicaPolicy replicaPolicy;
public ReplicatedPolicy() { private final NetworkHealthCheck networkHealthCheck;
replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this);
public ReplicatedPolicy(NetworkHealthCheck networkHealthCheck) {
replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this, networkHealthCheck);
this.networkHealthCheck = networkHealthCheck;
} }
public ReplicatedPolicy(boolean checkForLiveServer, public ReplicatedPolicy(boolean checkForLiveServer,
String groupName, String groupName,
String clusterName, String clusterName,
long initialReplicationSyncTimeout) { long initialReplicationSyncTimeout,
NetworkHealthCheck networkHealthCheck) {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.groupName = groupName; this.groupName = groupName;
this.clusterName = clusterName; this.clusterName = clusterName;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.networkHealthCheck = networkHealthCheck;
/* /*
* we create this with sensible defaults in case we start after a failover * we create this with sensible defaults in case we start after a failover
* */ * */
@ -66,13 +72,15 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
long initialReplicationSyncTimeout, long initialReplicationSyncTimeout,
String groupName, String groupName,
String clusterName, String clusterName,
ReplicaPolicy replicaPolicy) { ReplicaPolicy replicaPolicy,
NetworkHealthCheck networkHealthCheck) {
this.checkForLiveServer = checkForLiveServer; this.checkForLiveServer = checkForLiveServer;
this.clusterName = clusterName; this.clusterName = clusterName;
this.groupName = groupName; this.groupName = groupName;
this.allowAutoFailBack = allowAutoFailBack; this.allowAutoFailBack = allowAutoFailBack;
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.replicaPolicy = replicaPolicy; this.replicaPolicy = replicaPolicy;
this.networkHealthCheck = networkHealthCheck;
} }
public boolean isCheckForLiveServer() { public boolean isCheckForLiveServer() {
@ -114,7 +122,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public ReplicaPolicy getReplicaPolicy() { public ReplicaPolicy getReplicaPolicy() {
if (replicaPolicy == null) { if (replicaPolicy == null) {
replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this); replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this, networkHealthCheck);
} }
return replicaPolicy; return replicaPolicy;
} }

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener { public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {
@ -52,6 +53,8 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
private CoreRemotingConnection connection; private CoreRemotingConnection connection;
private final NetworkHealthCheck networkHealthCheck;
/** /**
* This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage} * This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage}
* with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with * with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with
@ -63,11 +66,13 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
public SharedNothingBackupQuorum(StorageManager storageManager, public SharedNothingBackupQuorum(StorageManager storageManager,
NodeManager nodeManager, NodeManager nodeManager,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool,
NetworkHealthCheck networkHealthCheck) {
this.storageManager = storageManager; this.storageManager = storageManager;
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
this.latch = new CountDownLatch(1); this.latch = new CountDownLatch(1);
this.nodeManager = nodeManager; this.nodeManager = nodeManager;
this.networkHealthCheck = networkHealthCheck;
} }
private volatile BACKUP_ACTIVATION signal; private volatile BACKUP_ACTIVATION signal;
@ -90,6 +95,9 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
//we may get called via multiple paths so need to guard //we may get called via multiple paths so need to guard
synchronized (decisionGuard) { synchronized (decisionGuard) {
if (signal == BACKUP_ACTIVATION.FAIL_OVER) { if (signal == BACKUP_ACTIVATION.FAIL_OVER) {
if (networkHealthCheck != null && !networkHealthCheck.check()) {
signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
}
return; return;
} }
if (!isLiveDown()) { if (!isLiveDown()) {
@ -102,8 +110,14 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
ActiveMQServerLogger.LOGGER.errorReConnecting(e); ActiveMQServerLogger.LOGGER.errorReConnecting(e);
} }
} }
if (networkHealthCheck != null && networkHealthCheck.check()) {
// live is assumed to be down, backup fails-over // live is assumed to be down, backup fails-over
signal = BACKUP_ACTIVATION.FAIL_OVER; signal = BACKUP_ACTIVATION.FAIL_OVER;
} else {
ActiveMQServerLogger.LOGGER.serverIsolatedOnNetwork();
signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
}
} }
latch.countDown(); latch.countDown();
} }

View File

@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl; import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
@ -111,6 +112,7 @@ import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MemoryManager; import org.apache.activemq.artemis.core.server.MemoryManager;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.PostQueueCreationCallback; import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback; import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
@ -240,6 +242,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
*/ */
private volatile ExecutorFactory ioExecutorFactory; private volatile ExecutorFactory ioExecutorFactory;
private final NetworkHealthCheck networkHealthCheck = new NetworkHealthCheck(ActiveMQDefaultConfiguration.getDefaultNetworkCheckNic(), ActiveMQDefaultConfiguration.getDefaultNetworkCheckPeriod(), ActiveMQDefaultConfiguration.getDefaultNetworkCheckTimeout());
private final HierarchicalRepository<Set<Role>> securityRepository; private final HierarchicalRepository<Set<Role>> securityRepository;
private volatile ResourceManager resourceManager; private volatile ResourceManager resourceManager;
@ -325,6 +329,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final ConcurrentMap<String, AtomicInteger> connectedClientIds = new ConcurrentHashMap(); private final ConcurrentMap<String, AtomicInteger> connectedClientIds = new ConcurrentHashMap();
private final ActiveMQComponent networkCheckMonitor = new ActiveMQComponent() {
@Override
public void start() throws Exception {
internalStart();
}
@Override
public void stop() throws Exception {
internalStop();
}
@Override
public String toString() {
return ActiveMQServerImpl.this.toString();
}
@Override
public boolean isStarted() {
return ActiveMQServerImpl.this.isStarted();
}
};
// Constructors // Constructors
// --------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------
@ -405,6 +431,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return reloadManager; return reloadManager;
} }
@Override
public NetworkHealthCheck getNetworkHealthCheck() {
return networkHealthCheck;
}
// life-cycle methods // life-cycle methods
// ---------------------------------------------------------------- // ----------------------------------------------------------------
@ -430,6 +461,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override @Override
public final synchronized void start() throws Exception { public final synchronized void start() throws Exception {
SERVER_STATE originalState = state;
try {
internalStart();
} finally {
if (originalState == SERVER_STATE.STOPPED) {
networkHealthCheck.setTimeUnit(TimeUnit.MILLISECONDS).setPeriod(configuration.getNetworkCheckPeriod()).
setNetworkTimeout(configuration.getNetworkCheckTimeout()).
parseAddressList(configuration.getNetworkCheckList()).
parseURIList(configuration.getNetworkCheckURLList()).
setNICName(configuration.getNetworkCheckNIC()).
setIpv4Command(configuration.getNetworkCheckPingCommand()).
setIpv6Command(configuration.getNetworkCheckPing6Command());
networkHealthCheck.addComponent(networkCheckMonitor);
}
}
}
private void internalStart() throws Exception {
if (state != SERVER_STATE.STOPPED) { if (state != SERVER_STATE.STOPPED) {
logger.debug("Server already started!"); logger.debug("Server already started!");
return; return;
@ -442,7 +492,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
state = SERVER_STATE.STARTING; state = SERVER_STATE.STARTING;
if (haPolicy == null) { if (haPolicy == null) {
haPolicy = ConfigurationUtils.getHAPolicy(configuration.getHAPolicyConfiguration()); haPolicy = ConfigurationUtils.getHAPolicy(configuration.getHAPolicyConfiguration(), this);
} }
activationLatch.setCount(1); activationLatch.setCount(1);
@ -492,6 +542,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
} }
@Override
public ReplicationEndpoint getReplicationEndpoint() {
if (activation instanceof SharedNothingBackupActivation) {
return ((SharedNothingBackupActivation) activation).getReplicationEndpoint();
}
return null;
}
@Override @Override
public void unlockActivation() { public void unlockActivation() {
activationLock.release(); activationLock.release();
@ -611,6 +669,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override @Override
public final void stop() throws Exception { public final void stop() throws Exception {
try {
internalStop();
} finally {
networkHealthCheck.stop();
}
}
private void internalStop() throws Exception {
stop(false); stop(false);
} }
@ -774,7 +840,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
fileStoreMonitor = null; fileStoreMonitor = null;
} }
if (failoverOnServerShutdown) {
activation.sendLiveIsStopping(); activation.sendLiveIsStopping();
}
stopComponent(connectorsService); stopComponent(connectorsService);
@ -838,7 +906,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (storageManager != null) if (storageManager != null)
try { try {
storageManager.stop(criticalIOError); storageManager.stop(criticalIOError, failoverOnServerShutdown);
} catch (Throwable t) { } catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, storageManager.getClass().getName()); ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, storageManager.getClass().getName());
} }
@ -1847,7 +1915,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
this.executorFactory = new OrderedExecutorFactory(threadPool); this.executorFactory = new OrderedExecutorFactory(threadPool);
if (serviceRegistry.getIOExecutorService() != null) { if (serviceRegistry.getIOExecutorService() != null) {
this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService()); this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService());
} else { } else {

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LiveNodeLocator; import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterControl; import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
@ -71,6 +72,7 @@ public final class SharedNothingBackupActivation extends Activation {
ClusterControl clusterControl; ClusterControl clusterControl;
private boolean closed; private boolean closed;
private volatile boolean backupUpToDate = true; private volatile boolean backupUpToDate = true;
private final NetworkHealthCheck networkHealthCheck;
private final ReusableLatch backupSyncLatch = new ReusableLatch(0); private final ReusableLatch backupSyncLatch = new ReusableLatch(0);
@ -78,13 +80,15 @@ public final class SharedNothingBackupActivation extends Activation {
boolean attemptFailBack, boolean attemptFailBack,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO, ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO,
ReplicaPolicy replicaPolicy) { ReplicaPolicy replicaPolicy,
NetworkHealthCheck networkHealthCheck) {
this.activeMQServer = activeMQServer; this.activeMQServer = activeMQServer;
this.attemptFailBack = attemptFailBack; this.attemptFailBack = attemptFailBack;
this.activationParams = activationParams; this.activationParams = activationParams;
this.shutdownOnCriticalIO = shutdownOnCriticalIO; this.shutdownOnCriticalIO = shutdownOnCriticalIO;
this.replicaPolicy = replicaPolicy; this.replicaPolicy = replicaPolicy;
backupSyncLatch.setCount(1); backupSyncLatch.setCount(1);
this.networkHealthCheck = networkHealthCheck;
} }
public void init() throws Exception { public void init() throws Exception {
@ -117,7 +121,7 @@ public final class SharedNothingBackupActivation extends Activation {
synchronized (this) { synchronized (this) {
if (closed) if (closed)
return; return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool()); backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck);
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
} }
@ -269,9 +273,10 @@ public final class SharedNothingBackupActivation extends Activation {
public void run() { public void run() {
try { try {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Calling activeMQServer.stop()"); logger.trace("Calling activeMQServer.stop() and start() to restart the server");
} }
activeMQServer.stop(); activeMQServer.stop();
activeMQServer.start();
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer); ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer);
} }

View File

@ -871,6 +871,66 @@
</xsd:sequence> </xsd:sequence>
</xsd:complexType> </xsd:complexType>
</xsd:element> </xsd:element>
<xsd:element name="network-check-list" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
A comma separated list of IPs to be used to validate if the broker should be kept up
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="network-check-URL-list" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
A comma separated list of URLs to be used to validate if the broker should be kept up
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="network-check-period" type="xsd:long" default="10000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
A frequency in milliseconds to how often we should check if the network is still up
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="network-check-timeout" type="xsd:long" default="1000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
A timeout used in milliseconds to be used on the ping.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="network-check-NIC" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The network interface card name to be used to validate the address.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="network-check-ping-command" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The ping command used to ping IPV4 addresses.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="network-check-ping6-command" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The ping command used to ping IPV6 addresses.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
</xsd:complexType> </xsd:complexType>

View File

@ -106,6 +106,13 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval()); Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval());
Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod()); Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod());
Assert.assertEquals("127.0.0.1", conf.getNetworkCheckList());
Assert.assertEquals("some-nick", conf.getNetworkCheckNIC());
Assert.assertEquals(123, conf.getNetworkCheckPeriod());
Assert.assertEquals(321, conf.getNetworkCheckTimeout());
Assert.assertEquals("ping-four", conf.getNetworkCheckPingCommand());
Assert.assertEquals("ping-six", conf.getNetworkCheckPing6Command());
Assert.assertEquals("largemessagesdir", conf.getLargeMessagesDirectory()); Assert.assertEquals("largemessagesdir", conf.getLargeMessagesDirectory());
Assert.assertEquals(95, conf.getMemoryWarningThreshold()); Assert.assertEquals(95, conf.getMemoryWarningThreshold());

View File

@ -229,7 +229,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
} }
@Override @Override
public void stop(boolean ioCriticalError) throws Exception { public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
} }

View File

@ -236,6 +236,13 @@
<memory-warning-threshold>95</memory-warning-threshold> <memory-warning-threshold>95</memory-warning-threshold>
<memory-measure-interval>54321</memory-measure-interval> <memory-measure-interval>54321</memory-measure-interval>
<large-messages-directory>largemessagesdir</large-messages-directory> <large-messages-directory>largemessagesdir</large-messages-directory>
<network-check-list>127.0.0.1</network-check-list>
<network-check-NIC>some-nick</network-check-NIC>
<network-check-period>123</network-check-period>
<network-check-timeout>321</network-check-timeout>
<network-check-URL-list>www.apache.org</network-check-URL-list>
<network-check-ping-command>ping-four</network-check-ping-command>
<network-check-ping6-command>ping-six</network-check-ping6-command>
<security-settings> <security-settings>
<security-setting match="a1"> <security-setting match="a1">
<permission type="createNonDurableQueue" roles="a1.1"/> <permission type="createNonDurableQueue" roles="a1.1"/>

View File

@ -20,6 +20,7 @@
* [Configuration Reload](config-reload.md) * [Configuration Reload](config-reload.md)
* [Detecting Dead Connections](connection-ttl.md) * [Detecting Dead Connections](connection-ttl.md)
* [Detecting Slow Consumers](slow-consumers.md) * [Detecting Slow Consumers](slow-consumers.md)
* [Avoiding Network Isolation](network-isolation.md)
* [Resource Manager Configuration](transaction-config.md) * [Resource Manager Configuration](transaction-config.md)
* [Flow Control](flow-control.md) * [Flow Control](flow-control.md)
* [Guarantees of sends and commits](send-guarantees.md) * [Guarantees of sends and commits](send-guarantees.md)

View File

@ -112,6 +112,11 @@ system-property-prefix | Prefix for replacing configuration settings using Bean
[transaction-timeout](transaction-config.md "Resource Manager Configuration") | how long (in ms) before a transaction can be removed from the resource manager after create time. Default=300000 [transaction-timeout](transaction-config.md "Resource Manager Configuration") | how long (in ms) before a transaction can be removed from the resource manager after create time. Default=300000
[transaction-timeout-scan-period](transaction-config.md "Resource Manager Configuration") | how often (in ms) to scan for timeout transactions. Default=1000 [transaction-timeout-scan-period](transaction-config.md "Resource Manager Configuration") | how often (in ms) to scan for timeout transactions. Default=1000
[wild-card-routing-enabled](wildcard-routing.md "Routing Messages With Wild Cards") | true means that the server supports wild card routing. Default=true [wild-card-routing-enabled](wildcard-routing.md "Routing Messages With Wild Cards") | true means that the server supports wild card routing. Default=true
[network-check-NIC](network-isolation.md) | The network internet card to be used on InetAddress.isReacheable
[network-check-URL](network-isolation.md) | The list of http URIs to be used to validate the network
[network-check-list](network-isolation.md) | The list of pings to be used on ping or InetAddress.isReacheable
[network-check-ping-command](network-isolation.md) | The command used to oping IPV4 addresses
[network-check-ping6-command](network-isolation.md) | The command used to oping IPV6 addresses
#address-setting type #address-setting type

View File

@ -0,0 +1,106 @@
# Network Isolation
In case the server is isolated, say for a network failure, the server will be isolated for its peers on a network of brokers. If you are playing with replication the backup may think the backup failed and you may endup with two live nodes, what is called the split brain.
# Pinging the network
You may configure one more addresses on the broker.xml that are part of your network topology, that will be pinged through the life cycle of the server.
The server will stop itself until the network is back on such case.
If you execute the create command passing a -ping argument, you will create a default xml that is ready to be used with network checks:
```
./artemis create /myDir/myServer --ping 10.0.0.1
```
This XML part will be added to your broker.xml:
```xml
<!--
You can specify the NIC you want to use to verify if the network
<network-check-NIC>theNickName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<network-check-period>10000</network-check-period>
<network-check-timeout>1000</network-check-timeout>
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to check if your network is up.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, any successful ping will make the server OK to continue running -->
<network-check-list>10.0.0.1</network-check-list>
<!-- use this to customize the ping used for ipv4 addresses -->
<network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command>
<!-- use this to customize the ping used for ipv addresses -->
<network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command>
```
Once you lose connectivity towards 10.0.0.1 on the given example
, you will see see this output at the server:
```
09:49:24,562 WARN [org.apache.activemq.artemis.core.server.NetworkHealthCheck] Ping Address /10.0.0.1 wasn't reacheable
09:49:36,577 INFO [org.apache.activemq.artemis.core.server.NetworkHealthCheck] Network is unhealthy, stopping service ActiveMQServerImpl::serverUUID=04fd5dd8-b18c-11e6-9efe-6a0001921ad0
09:49:36,625 INFO [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 1.6.0 [04fd5dd8-b18c-11e6-9efe-6a0001921ad0] stopped, uptime 14.787 seconds
09:50:00,653 WARN [org.apache.activemq.artemis.core.server.NetworkHealthCheck] ping: sendto: No route to host
09:50:10,656 WARN [org.apache.activemq.artemis.core.server.NetworkHealthCheck] Host is down: java.net.ConnectException: Host is down
at java.net.Inet6AddressImpl.isReachable0(Native Method) [rt.jar:1.8.0_73]
at java.net.Inet6AddressImpl.isReachable(Inet6AddressImpl.java:77) [rt.jar:1.8.0_73]
at java.net.InetAddress.isReachable(InetAddress.java:502) [rt.jar:1.8.0_73]
at org.apache.activemq.artemis.core.server.NetworkHealthCheck.check(NetworkHealthCheck.java:295) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
at org.apache.activemq.artemis.core.server.NetworkHealthCheck.check(NetworkHealthCheck.java:276) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
at org.apache.activemq.artemis.core.server.NetworkHealthCheck.run(NetworkHealthCheck.java:244) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
at org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent$2.run(ActiveMQScheduledComponent.java:189) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
at org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent$3.run(ActiveMQScheduledComponent.java:199) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [rt.jar:1.8.0_73]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [rt.jar:1.8.0_73]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [rt.jar:1.8.0_73]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [rt.jar:1.8.0_73]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_73]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_73]
at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_73]
```
Once you re establish your network connections towards the configured check list:
```
09:53:23,461 INFO [org.apache.activemq.artemis.core.server.NetworkHealthCheck] Network is healthy, starting service ActiveMQServerImpl::
09:53:23,462 INFO [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=./data/journal,bindingsDirectory=./data/bindings,largeMessagesDirectory=./data/large-messages,pagingDirectory=./data/paging)
09:53:23,462 INFO [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
09:53:23,462 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
09:53:23,463 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
09:53:23,463 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
09:53:23,463 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
09:53:23,464 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
09:53:23,464 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
09:53:23,541 INFO [org.apache.activemq.artemis.core.server] AMQ221003: Deploying queue jms.queue.DLQ
09:53:23,541 INFO [org.apache.activemq.artemis.core.server] AMQ221003: Deploying queue jms.queue.ExpiryQueue
09:53:23,549 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
09:53:23,550 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:5445 for protocols [HORNETQ,STOMP]
09:53:23,554 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:5672 for protocols [AMQP]
09:53:23,555 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:1883 for protocols [MQTT]
09:53:23,556 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:61613 for protocols [STOMP]
09:53:23,556 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
09:53:23,556 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 1.6.0 [0.0.0.0, nodeID=04fd5dd8-b18c-11e6-9efe-6a0001921ad0]
```
# Warning
> Make sure you understand your network topology as this is meant to validate if your network.
> Using IPs that could eventually disappear or be partially visible may defeat the purpose.
> You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running

View File

@ -278,13 +278,18 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
} }
protected void crash(final ClientSession... sessions) throws Exception { protected void crash(final ClientSession... sessions) throws Exception {
liveServer.crash(sessions); this.crash(true, sessions);
} }
protected void crash(final boolean waitFailure, final ClientSession... sessions) throws Exception { protected void crash(final boolean waitFailure, final ClientSession... sessions) throws Exception {
liveServer.crash(waitFailure, sessions); this.crash(true, waitFailure, sessions);
} }
protected void crash(final boolean failover,
final boolean waitFailure,
final ClientSession... sessions) throws Exception {
liveServer.crash(failover, waitFailure, sessions);
}
// Private ------------------------------------------------------- // Private -------------------------------------------------------
// Inner classes ------------------------------------------------- // Inner classes -------------------------------------------------

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.junit.Assert;
import org.junit.Test;
public class NetworkIsolationReplicationTest extends FailoverTestBase {
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getNettyAcceptor(live, 1);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getNettyConnector(live, 1);
}
protected ClientSession createSession(ClientSessionFactory sf1,
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks) throws Exception {
return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
}
@Test
public void testDoNotActivateOnIsolation() throws Exception {
ServerLocator locator = getServerLocator();
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
ClientSession session = createSession(sf, false, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());
crash(false, true, session);
for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
Thread.sleep(10);
}
Assert.assertTrue(backupServer.isStarted());
Assert.assertFalse(backupServer.isActive());
liveServer.start();
for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
Thread.sleep(10);
}
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
// This will make sure the backup got synchronized after the network was activated again
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
}
@Test
public void testLiveIsolated() throws Exception {
backupServer.stop();
liveServer.stop();
liveServer.getServer().getConfiguration().setNetworkCheckList("203.0.113.1").
setNetworkCheckPeriod(100).setNetworkCheckTimeout(100);
liveServer.start();
Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());
liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);
Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());
long timeout = System.currentTimeMillis() + 30000;
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}
Assert.assertFalse(liveServer.isStarted());
liveServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));
timeout = System.currentTimeMillis() + 30000;
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}
Assert.assertTrue(liveServer.isStarted());
}
@Override
protected void createConfigs() throws Exception {
createReplicatedConfigs();
}
@Override
protected void crash(boolean failover, boolean waitFailure, ClientSession... sessions) throws Exception {
if (sessions.length > 0) {
for (ClientSession session : sessions) {
waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
}
} else {
waitForRemoteBackup(null, 5, true, backupServer.getServer());
}
super.crash(failover, waitFailure, sessions);
}
}

View File

@ -76,6 +76,11 @@ public class SameProcessActiveMQServer implements TestableServer {
@Override @Override
public CountDownLatch crash(boolean waitFailure, ClientSession... sessions) throws Exception { public CountDownLatch crash(boolean waitFailure, ClientSession... sessions) throws Exception {
return crash(true, waitFailure, sessions);
}
@Override
public CountDownLatch crash(boolean failover, boolean waitFailure, ClientSession... sessions) throws Exception {
CountDownLatch latch = new CountDownLatch(sessions.length); CountDownLatch latch = new CountDownLatch(sessions.length);
CountDownSessionFailureListener[] listeners = new CountDownSessionFailureListener[sessions.length]; CountDownSessionFailureListener[] listeners = new CountDownSessionFailureListener[sessions.length];
for (int i = 0; i < sessions.length; i++) { for (int i = 0; i < sessions.length; i++) {
@ -87,7 +92,7 @@ public class SameProcessActiveMQServer implements TestableServer {
clusterManager.flushExecutor(); clusterManager.flushExecutor();
clusterManager.clear(); clusterManager.clear();
Assert.assertTrue("server should be running!", server.isStarted()); Assert.assertTrue("server should be running!", server.isStarted());
server.stop(true); server.stop(failover);
if (waitFailure) { if (waitFailure) {
// Wait to be informed of failure // Wait to be informed of failure

View File

@ -36,6 +36,8 @@ public interface TestableServer extends ActiveMQComponent {
CountDownLatch crash(boolean waitFailure, ClientSession... sessions) throws Exception; CountDownLatch crash(boolean waitFailure, ClientSession... sessions) throws Exception;
CountDownLatch crash(boolean failover, boolean waitFailure, ClientSession... sessions) throws Exception;
boolean isActive(); boolean isActive();
void addInterceptor(Interceptor interceptor); void addInterceptor(Interceptor interceptor);