This closes #899

This commit is contained in:
Clebert Suconic 2016-11-29 15:24:06 -05:00
commit f820e01249
5 changed files with 174 additions and 51 deletions

View File

@ -31,6 +31,7 @@ import java.security.PrivilegedAction;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.jboss.logging.Logger;
@ -56,6 +57,9 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
private String ipv6Command = IPV6_DEFAULT_COMMAND;
// To be used on tests. As we use the loopback as a valid address on tests.
private boolean ignoreLoopback = false;
/**
* The timeout to be used on isReachable
*/
@ -88,6 +92,23 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
return this;
}
public boolean isIgnoreLoopback() {
return ignoreLoopback;
}
public NetworkHealthCheck setIgnoreLoopback(boolean ignoreLoopback) {
this.ignoreLoopback = ignoreLoopback;
return this;
}
public Set<InetAddress> getAddresses() {
return addresses;
}
public Set<URL> getUrls() {
return urls;
}
public String getNICName() {
if (networkInterface != null) {
return networkInterface.getName();
@ -101,10 +122,12 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
String[] addresses = addressList.split(",");
for (String address : addresses) {
try {
this.addAddress(InetAddress.getByName(address));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (!address.trim().isEmpty()) {
try {
this.addAddress(InetAddress.getByName(address.trim()));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
}
@ -117,10 +140,12 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
String[] addresses = addressList.split(",");
for (String address : addresses) {
try {
this.addURL(new URL(address));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (!address.trim().isEmpty()) {
try {
this.addURL(new URL(address.trim()));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
}
@ -180,9 +205,13 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
if (!check(address)) {
logger.warn("Ping Address " + address + " wasn't reacheable");
}
addresses.add(address);
checkStart();
if (!ignoreLoopback && address.isLoopbackAddress()) {
ActiveMQUtilLogger.LOGGER.addressloopback(address.toString());
} else {
addresses.add(address);
checkStart();
}
return this;
}

View File

@ -48,4 +48,9 @@ public interface ActiveMQUtilLogger extends BasicLogger {
@Message(id = 202000, value = "Missing privileges to set Thread Context Class Loader on Thread Factory. Using current Thread Context Class Loader",
format = Message.Format.MESSAGE_FORMAT)
void missingPrivsForClassloader();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 202001, value = "{0} is a loopback address and will be discarded.",
format = Message.Format.MESSAGE_FORMAT)
void addressloopback(String address);
}

View File

@ -78,6 +78,32 @@ public class ActiveMQScheduledComponentTest {
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
}
@Test
public void testAccumulationOwnPool() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(100, TimeUnit.MILLISECONDS, false) {
@Override
public void run() {
if (count.get() == 0) {
try {
Thread.sleep(500);
} catch (Exception e) {
}
}
count.incrementAndGet();
}
};
local.start();
Thread.sleep(1000);
local.stop();
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5 && count.get() > 0);
}
@Test
public void testUsingOwnExecutors() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -57,7 +57,7 @@ public class NetworkHealthTest {
NetworkHealthCheck addCheck(NetworkHealthCheck check) {
list.add(check);
return check;
return check.setIgnoreLoopback(true);
}
HttpServer httpServer;
@ -137,7 +137,26 @@ public class NetworkHealthTest {
Assert.assertTrue(check.purePing(address));
Assert.assertTrue(check.check(address));
}
@Test
public void testParseSpaces() throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));
// using two addresses for URI and localhost
check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org");
Assert.assertEquals(2, check.getAddresses().size());
Assert.assertEquals(2, check.getUrls().size());
}
@Test
public void testParseLogger() throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));
// using two addresses for URI and localhost
check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org");
Assert.assertEquals(2, check.getAddresses().size());
Assert.assertEquals(2, check.getUrls().size());
}
@Test

View File

@ -24,12 +24,24 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class NetworkIsolationReplicationTest extends FailoverTestBase {
private static final Logger logger = Logger.getLogger(NetworkIsolationReplicationTest.class);
@Before
@Override
public void setUp() throws Exception {
this.startBackupServer = false;
super.setUp();
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getNettyAcceptor(live, 1);
@ -49,37 +61,59 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase {
@Test
public void testDoNotActivateOnIsolation() throws Exception {
ServerLocator locator = getServerLocator();
AssertionLoggerHandler.startCapture();
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
try {
ServerLocator locator = getServerLocator();
ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
// this block here is just to validate if ignoring loopback addresses logic is in place
{
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));
ClientSession session = createSession(sf, false, true, true);
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ202001"));
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
AssertionLoggerHandler.clear();
Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());
backupServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1"));
crash(false, true, session);
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ202001"));
for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
Thread.sleep(10);
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
}
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
backupServer.getServer().start();
ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
ClientSession session = createSession(sf, false, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());
crash(false, true, session);
for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
Thread.sleep(10);
}
Assert.assertTrue(backupServer.isStarted());
Assert.assertFalse(backupServer.isActive());
liveServer.start();
for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
Thread.sleep(10);
}
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
// This will make sure the backup got synchronized after the network was activated again
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
} finally {
AssertionLoggerHandler.stopCapture();
}
Assert.assertTrue(backupServer.isStarted());
Assert.assertFalse(backupServer.isActive());
liveServer.start();
for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
Thread.sleep(10);
}
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
// This will make sure the backup got synchronized after the network was activated again
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
}
@Test
@ -90,29 +124,39 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase {
liveServer.getServer().getConfiguration().setNetworkCheckList("203.0.113.1").
setNetworkCheckPeriod(100).setNetworkCheckTimeout(100);
liveServer.start();
try {
Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());
liveServer.start();
liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);
Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());
Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());
liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);
long timeout = System.currentTimeMillis() + 30000;
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());
long timeout = System.currentTimeMillis() + 30000;
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}
Assert.assertFalse(liveServer.isStarted());
liveServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1"));
timeout = System.currentTimeMillis() + 30000;
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}
Assert.assertTrue(liveServer.isStarted());
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
throw e;
} finally {
liveServer.getServer().stop();
backupServer.getServer().stop();
}
Assert.assertFalse(liveServer.isStarted());
liveServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));
timeout = System.currentTimeMillis() + 30000;
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}
Assert.assertTrue(liveServer.isStarted());
}
@Override