From 43634c098b1815bc6abe220b6e21b76bdbcb7856 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 28 Nov 2016 10:38:34 -0500 Subject: [PATCH] ARTEMIS-863 parsing spaces properly on network health addresses and avoiding loopback on configuration --- .../core/server/NetworkHealthCheck.java | 49 +++++-- .../artemis/logs/ActiveMQUtilLogger.java | 5 + .../utils/ActiveMQScheduledComponentTest.java | 26 ++++ .../artemis/utils/NetworkHealthTest.java | 21 ++- .../NetworkIsolationReplicationTest.java | 124 ++++++++++++------ 5 files changed, 174 insertions(+), 51 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java index 8b7770dad0..ec98aad314 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.java @@ -31,6 +31,7 @@ import java.security.PrivilegedAction; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.jboss.logging.Logger; @@ -56,6 +57,9 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent { private String ipv6Command = IPV6_DEFAULT_COMMAND; + // To be used on tests. As we use the loopback as a valid address on tests. + private boolean ignoreLoopback = false; + /** * The timeout to be used on isReachable */ @@ -88,6 +92,23 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent { return this; } + public boolean isIgnoreLoopback() { + return ignoreLoopback; + } + + public NetworkHealthCheck setIgnoreLoopback(boolean ignoreLoopback) { + this.ignoreLoopback = ignoreLoopback; + return this; + } + + public Set getAddresses() { + return addresses; + } + + public Set getUrls() { + return urls; + } + public String getNICName() { if (networkInterface != null) { return networkInterface.getName(); @@ -101,10 +122,12 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent { String[] addresses = addressList.split(","); for (String address : addresses) { - try { - this.addAddress(InetAddress.getByName(address)); - } catch (Exception e) { - logger.warn(e.getMessage(), e); + if (!address.trim().isEmpty()) { + try { + this.addAddress(InetAddress.getByName(address.trim())); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } } } } @@ -117,10 +140,12 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent { String[] addresses = addressList.split(","); for (String address : addresses) { - try { - this.addURL(new URL(address)); - } catch (Exception e) { - logger.warn(e.getMessage(), e); + if (!address.trim().isEmpty()) { + try { + this.addURL(new URL(address.trim())); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } } } } @@ -180,9 +205,13 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent { if (!check(address)) { logger.warn("Ping Address " + address + " wasn't reacheable"); } - addresses.add(address); - checkStart(); + if (!ignoreLoopback && address.isLoopbackAddress()) { + ActiveMQUtilLogger.LOGGER.addressloopback(address.toString()); + } else { + addresses.add(address); + checkStart(); + } return this; } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java index b26d30dc26..ab285f9d32 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java @@ -48,4 +48,9 @@ public interface ActiveMQUtilLogger extends BasicLogger { @Message(id = 202000, value = "Missing privileges to set Thread Context Class Loader on Thread Factory. Using current Thread Context Class Loader", format = Message.Format.MESSAGE_FORMAT) void missingPrivsForClassloader(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 202001, value = "{0} is a loopback address and will be discarded.", + format = Message.Format.MESSAGE_FORMAT) + void addressloopback(String address); } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java index 0e64cae4e3..2fcfb1b45a 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java @@ -78,6 +78,32 @@ public class ActiveMQScheduledComponentTest { Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5); } + @Test + public void testAccumulationOwnPool() throws Exception { + final AtomicInteger count = new AtomicInteger(0); + + final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(100, TimeUnit.MILLISECONDS, false) { + @Override + public void run() { + if (count.get() == 0) { + try { + Thread.sleep(500); + } catch (Exception e) { + } + } + count.incrementAndGet(); + } + }; + + local.start(); + + Thread.sleep(1000); + + local.stop(); + + Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5 && count.get() > 0); + } + @Test public void testUsingOwnExecutors() throws Exception { final CountDownLatch latch = new CountDownLatch(1); diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/NetworkHealthTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/NetworkHealthTest.java index 76499124d3..6534457934 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/NetworkHealthTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/NetworkHealthTest.java @@ -57,7 +57,7 @@ public class NetworkHealthTest { NetworkHealthCheck addCheck(NetworkHealthCheck check) { list.add(check); - return check; + return check.setIgnoreLoopback(true); } HttpServer httpServer; @@ -137,7 +137,26 @@ public class NetworkHealthTest { Assert.assertTrue(check.purePing(address)); Assert.assertTrue(check.check(address)); + } + @Test + public void testParseSpaces() throws Exception { + NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100)); + + // using two addresses for URI and localhost + check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org"); + Assert.assertEquals(2, check.getAddresses().size()); + Assert.assertEquals(2, check.getUrls().size()); + } + + @Test + public void testParseLogger() throws Exception { + NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100)); + + // using two addresses for URI and localhost + check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org"); + Assert.assertEquals(2, check.getAddresses().size()); + Assert.assertEquals(2, check.getUrls().size()); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkIsolationReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkIsolationReplicationTest.java index 9b42279c3d..e179c2187c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkIsolationReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkIsolationReplicationTest.java @@ -24,12 +24,24 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.jboss.logging.Logger; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class NetworkIsolationReplicationTest extends FailoverTestBase { + private static final Logger logger = Logger.getLogger(NetworkIsolationReplicationTest.class); + + @Before + @Override + public void setUp() throws Exception { + this.startBackupServer = false; + super.setUp(); + } + @Override protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { return TransportConfigurationUtils.getNettyAcceptor(live, 1); @@ -49,37 +61,59 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase { @Test public void testDoNotActivateOnIsolation() throws Exception { - ServerLocator locator = getServerLocator(); + AssertionLoggerHandler.startCapture(); - backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1")); + try { + ServerLocator locator = getServerLocator(); - ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory()); + // this block here is just to validate if ignoring loopback addresses logic is in place + { + backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1")); - ClientSession session = createSession(sf, false, true, true); + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ202001")); - session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true); + AssertionLoggerHandler.clear(); - Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check()); + backupServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1")); - crash(false, true, session); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ202001")); - for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) { - Thread.sleep(10); + backupServer.getServer().getNetworkHealthCheck().clearAddresses(); + } + + backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1")); + backupServer.getServer().start(); + + ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory()); + + ClientSession session = createSession(sf, false, true, true); + + session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true); + + Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check()); + + crash(false, true, session); + + for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) { + Thread.sleep(10); + } + + Assert.assertTrue(backupServer.isStarted()); + Assert.assertFalse(backupServer.isActive()); + + liveServer.start(); + + for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) { + Thread.sleep(10); + } + + backupServer.getServer().getNetworkHealthCheck().clearAddresses(); + + // This will make sure the backup got synchronized after the network was activated again + Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted()); + } finally { + AssertionLoggerHandler.stopCapture(); } - - Assert.assertTrue(backupServer.isStarted()); - Assert.assertFalse(backupServer.isActive()); - - liveServer.start(); - - for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) { - Thread.sleep(10); - } - - backupServer.getServer().getNetworkHealthCheck().clearAddresses(); - - // This will make sure the backup got synchronized after the network was activated again - Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted()); } @Test @@ -90,29 +124,39 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase { liveServer.getServer().getConfiguration().setNetworkCheckList("203.0.113.1"). setNetworkCheckPeriod(100).setNetworkCheckTimeout(100); - liveServer.start(); + try { - Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod()); + liveServer.start(); - liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS); + Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod()); - Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check()); + liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS); - long timeout = System.currentTimeMillis() + 30000; - while (liveServer.isStarted() && System.currentTimeMillis() < timeout) { - Thread.sleep(100); + Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check()); + + long timeout = System.currentTimeMillis() + 30000; + while (liveServer.isStarted() && System.currentTimeMillis() < timeout) { + Thread.sleep(100); + } + + Assert.assertFalse(liveServer.isStarted()); + + liveServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1")); + + timeout = System.currentTimeMillis() + 30000; + while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) { + Thread.sleep(100); + } + + Assert.assertTrue(liveServer.isStarted()); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + throw e; + } finally { + liveServer.getServer().stop(); + backupServer.getServer().stop(); } - Assert.assertFalse(liveServer.isStarted()); - - liveServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1")); - - timeout = System.currentTimeMillis() + 30000; - while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) { - Thread.sleep(100); - } - - Assert.assertTrue(liveServer.isStarted()); } @Override