ARTEMIS-863 parsing spaces properly on network health addresses and avoiding loopback on configuration
This commit is contained in:
parent
5a9647cd76
commit
43634c098b
|
@ -31,6 +31,7 @@ import java.security.PrivilegedAction;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -56,6 +57,9 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
|
|||
|
||||
private String ipv6Command = IPV6_DEFAULT_COMMAND;
|
||||
|
||||
// To be used on tests. As we use the loopback as a valid address on tests.
|
||||
private boolean ignoreLoopback = false;
|
||||
|
||||
/**
|
||||
* The timeout to be used on isReachable
|
||||
*/
|
||||
|
@ -88,6 +92,23 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean isIgnoreLoopback() {
|
||||
return ignoreLoopback;
|
||||
}
|
||||
|
||||
public NetworkHealthCheck setIgnoreLoopback(boolean ignoreLoopback) {
|
||||
this.ignoreLoopback = ignoreLoopback;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Set<InetAddress> getAddresses() {
|
||||
return addresses;
|
||||
}
|
||||
|
||||
public Set<URL> getUrls() {
|
||||
return urls;
|
||||
}
|
||||
|
||||
public String getNICName() {
|
||||
if (networkInterface != null) {
|
||||
return networkInterface.getName();
|
||||
|
@ -101,10 +122,12 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
|
|||
String[] addresses = addressList.split(",");
|
||||
|
||||
for (String address : addresses) {
|
||||
try {
|
||||
this.addAddress(InetAddress.getByName(address));
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
if (!address.trim().isEmpty()) {
|
||||
try {
|
||||
this.addAddress(InetAddress.getByName(address.trim()));
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -117,10 +140,12 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
|
|||
String[] addresses = addressList.split(",");
|
||||
|
||||
for (String address : addresses) {
|
||||
try {
|
||||
this.addURL(new URL(address));
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
if (!address.trim().isEmpty()) {
|
||||
try {
|
||||
this.addURL(new URL(address.trim()));
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -180,9 +205,13 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {
|
|||
if (!check(address)) {
|
||||
logger.warn("Ping Address " + address + " wasn't reacheable");
|
||||
}
|
||||
addresses.add(address);
|
||||
|
||||
checkStart();
|
||||
if (!ignoreLoopback && address.isLoopbackAddress()) {
|
||||
ActiveMQUtilLogger.LOGGER.addressloopback(address.toString());
|
||||
} else {
|
||||
addresses.add(address);
|
||||
checkStart();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -48,4 +48,9 @@ public interface ActiveMQUtilLogger extends BasicLogger {
|
|||
@Message(id = 202000, value = "Missing privileges to set Thread Context Class Loader on Thread Factory. Using current Thread Context Class Loader",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void missingPrivsForClassloader();
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 202001, value = "{0} is a loopback address and will be discarded.",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void addressloopback(String address);
|
||||
}
|
||||
|
|
|
@ -78,6 +78,32 @@ public class ActiveMQScheduledComponentTest {
|
|||
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccumulationOwnPool() throws Exception {
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(100, TimeUnit.MILLISECONDS, false) {
|
||||
@Override
|
||||
public void run() {
|
||||
if (count.get() == 0) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
count.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
local.start();
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
local.stop();
|
||||
|
||||
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5 && count.get() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUsingOwnExecutors() throws Exception {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
|
|
@ -57,7 +57,7 @@ public class NetworkHealthTest {
|
|||
|
||||
NetworkHealthCheck addCheck(NetworkHealthCheck check) {
|
||||
list.add(check);
|
||||
return check;
|
||||
return check.setIgnoreLoopback(true);
|
||||
}
|
||||
|
||||
HttpServer httpServer;
|
||||
|
@ -137,7 +137,26 @@ public class NetworkHealthTest {
|
|||
Assert.assertTrue(check.purePing(address));
|
||||
|
||||
Assert.assertTrue(check.check(address));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSpaces() throws Exception {
|
||||
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));
|
||||
|
||||
// using two addresses for URI and localhost
|
||||
check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org");
|
||||
Assert.assertEquals(2, check.getAddresses().size());
|
||||
Assert.assertEquals(2, check.getUrls().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseLogger() throws Exception {
|
||||
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));
|
||||
|
||||
// using two addresses for URI and localhost
|
||||
check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org");
|
||||
Assert.assertEquals(2, check.getAddresses().size());
|
||||
Assert.assertEquals(2, check.getUrls().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -24,12 +24,24 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class NetworkIsolationReplicationTest extends FailoverTestBase {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(NetworkIsolationReplicationTest.class);
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
this.startBackupServer = false;
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
|
||||
return TransportConfigurationUtils.getNettyAcceptor(live, 1);
|
||||
|
@ -49,37 +61,59 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase {
|
|||
|
||||
@Test
|
||||
public void testDoNotActivateOnIsolation() throws Exception {
|
||||
ServerLocator locator = getServerLocator();
|
||||
AssertionLoggerHandler.startCapture();
|
||||
|
||||
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
|
||||
try {
|
||||
ServerLocator locator = getServerLocator();
|
||||
|
||||
ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
|
||||
// this block here is just to validate if ignoring loopback addresses logic is in place
|
||||
{
|
||||
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));
|
||||
|
||||
ClientSession session = createSession(sf, false, true, true);
|
||||
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ202001"));
|
||||
|
||||
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
|
||||
AssertionLoggerHandler.clear();
|
||||
|
||||
Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());
|
||||
backupServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1"));
|
||||
|
||||
crash(false, true, session);
|
||||
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ202001"));
|
||||
|
||||
for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
|
||||
Thread.sleep(10);
|
||||
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
|
||||
}
|
||||
|
||||
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
|
||||
backupServer.getServer().start();
|
||||
|
||||
ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
|
||||
|
||||
ClientSession session = createSession(sf, false, true, true);
|
||||
|
||||
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
|
||||
|
||||
Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());
|
||||
|
||||
crash(false, true, session);
|
||||
|
||||
for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
Assert.assertTrue(backupServer.isStarted());
|
||||
Assert.assertFalse(backupServer.isActive());
|
||||
|
||||
liveServer.start();
|
||||
|
||||
for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
|
||||
|
||||
// This will make sure the backup got synchronized after the network was activated again
|
||||
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
|
||||
} finally {
|
||||
AssertionLoggerHandler.stopCapture();
|
||||
}
|
||||
|
||||
Assert.assertTrue(backupServer.isStarted());
|
||||
Assert.assertFalse(backupServer.isActive());
|
||||
|
||||
liveServer.start();
|
||||
|
||||
for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
|
||||
|
||||
// This will make sure the backup got synchronized after the network was activated again
|
||||
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -90,29 +124,39 @@ public class NetworkIsolationReplicationTest extends FailoverTestBase {
|
|||
liveServer.getServer().getConfiguration().setNetworkCheckList("203.0.113.1").
|
||||
setNetworkCheckPeriod(100).setNetworkCheckTimeout(100);
|
||||
|
||||
liveServer.start();
|
||||
try {
|
||||
|
||||
Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());
|
||||
liveServer.start();
|
||||
|
||||
liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);
|
||||
Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());
|
||||
|
||||
Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());
|
||||
liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);
|
||||
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(100);
|
||||
Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());
|
||||
|
||||
long timeout = System.currentTimeMillis() + 30000;
|
||||
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
Assert.assertFalse(liveServer.isStarted());
|
||||
|
||||
liveServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1"));
|
||||
|
||||
timeout = System.currentTimeMillis() + 30000;
|
||||
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
Assert.assertTrue(liveServer.isStarted());
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
throw e;
|
||||
} finally {
|
||||
liveServer.getServer().stop();
|
||||
backupServer.getServer().stop();
|
||||
}
|
||||
|
||||
Assert.assertFalse(liveServer.isStarted());
|
||||
|
||||
liveServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));
|
||||
|
||||
timeout = System.currentTimeMillis() + 30000;
|
||||
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
Assert.assertTrue(liveServer.isStarted());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue