binding UDP to localhost so the testsuite will work on environments where UDP is not available due to firewal constraints

The server would need to have loopback routes for UDP for this to work.
This commit is contained in:
Clebert Suconic 2015-09-02 18:45:00 -04:00
parent b7c7c42d22
commit ab618d295d
13 changed files with 270 additions and 211 deletions

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.Random;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
public class RandomUtil {
// Constants -----------------------------------------------------
protected static final Random random = new Random(System.currentTimeMillis());
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
public static String randomString() {
return java.util.UUID.randomUUID().toString();
}
public static SimpleString randomSimpleString() {
return new SimpleString(RandomUtil.randomString());
}
public static char randomChar() {
return RandomUtil.randomString().charAt(0);
}
public static long randomLong() {
return RandomUtil.random.nextLong();
}
public static long randomPositiveLong() {
return Math.abs(RandomUtil.randomLong());
}
public static int randomInt() {
return RandomUtil.random.nextInt();
}
public static int randomPositiveInt() {
return Math.abs(RandomUtil.randomInt());
}
public static ActiveMQBuffer randomBuffer(final int size, final long... data) {
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length);
for (long d : data) {
buffer.writeLong(d);
}
for (int i = 0; i < size; i++) {
buffer.writeByte(randomByte());
}
return buffer;
}
public static int randomInterval(final int min, final int max) {
return min + randomMax(max - min);
}
public static int randomMax(final int max) {
int value = randomPositiveInt() % max;
if (value == 0) {
value = max;
}
return value;
}
public static int randomPort() {
return RandomUtil.random.nextInt(65536);
}
public static short randomShort() {
return (short) RandomUtil.random.nextInt(Short.MAX_VALUE);
}
public static byte randomByte() {
return Integer.valueOf(RandomUtil.random.nextInt()).byteValue();
}
public static boolean randomBoolean() {
return RandomUtil.random.nextBoolean();
}
public static byte[] randomBytes() {
return RandomUtil.randomString().getBytes();
}
public static byte[] randomBytes(final int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = RandomUtil.randomByte();
}
return bytes;
}
public static double randomDouble() {
return RandomUtil.random.nextDouble();
}
public static float randomFloat() {
return RandomUtil.random.nextFloat();
}
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}

View File

@ -27,6 +27,7 @@ import java.net.MulticastSocket;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.RandomUtil;
/** /**
* The configuration used to determine how the server will broadcast members. * The configuration used to determine how the server will broadcast members.
@ -35,7 +36,10 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
*/ */
public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFactory { public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFactory {
private transient String localBindAddress = null; // You can specify a property as a default. This is useful for testsuite running.
// for that reason we won't document this property as the proper way to do it is through configuration.
// these property names can change at any time, so don't use this on production systems
private transient String localBindAddress = getProperty(UDPBroadcastEndpointFactory.class.getName() + ".localBindAddress", null);
private transient int localBindPort = -1; private transient int localBindPort = -1;
@ -170,9 +174,23 @@ public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFacto
} }
else { else {
if (localAddress != null) { if (localAddress != null) {
ActiveMQClientLogger.LOGGER.broadcastGroupBindError(); java.util.Random random = new java.util.Random(System.currentTimeMillis());
for (int i = 0; i < 100; i++) {
int nextPort = RandomUtil.randomInterval(3000, 4000);
try {
broadcastingSocket = new DatagramSocket(nextPort, localAddress);
ActiveMQClientLogger.LOGGER.broadcastGroupBindError(localAddress.toString() + ":" + nextPort);
break;
}
catch (Exception e) {
ActiveMQClientLogger.LOGGER.broadcastGroupBindErrorRetry(localAddress.toString() + ":" + nextPort, e);
}
}
}
if (broadcastingSocket == null) {
broadcastingSocket = new DatagramSocket();
} }
broadcastingSocket = new DatagramSocket();
} }
open = true; open = true;
@ -231,13 +249,35 @@ public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFacto
} }
private static boolean checkForPresence(String key, String value) { private static boolean checkForPresence(String key, String value) {
try { String tmp = getProperty(key, null);
String tmp = System.getProperty(key); return tmp != null && tmp.trim().toLowerCase().startsWith(value);
return tmp != null && tmp.trim().toLowerCase().startsWith(value); }
}
catch (Throwable t) { }
return false;
private static String getProperty(String key, String defaultValue) {
try {
String tmp = System.getProperty(key);
if (tmp == null) {
tmp = defaultValue;
} }
return tmp;
}
catch (Throwable t) {
ActiveMQClientLogger.LOGGER.warn(t);
return defaultValue;
}
}
private static int getIntProperty(String key, String defaultValue) {
String value = getProperty(key, defaultValue);
try {
return Integer.parseInt(value);
}
catch (Throwable t) {
ActiveMQClientLogger.LOGGER.warn(t.getMessage(), t);
return Integer.parseInt(defaultValue);
} }
} }

View File

@ -260,9 +260,9 @@ public interface ActiveMQClientLogger extends BasicLogger {
void jvmAllocatedMoreMemory(Long totalMemory1, Long totalMemory2); void jvmAllocatedMoreMemory(Long totalMemory1, Long totalMemory2);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 212048, value = "local-bind-address specified for broadcast group but no local-bind-port specified so socket will NOT be bound to a local address/port", @Message(id = 212048, value = "Random address ({0}) was already in use, trying another time",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void broadcastGroupBindError(); void broadcastGroupBindErrorRetry(String hostAndPort, @Cause Throwable t);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 212049, @Message(id = 212049,
@ -309,6 +309,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
@Message(id = 212055, value = "Unable to close consumer", format = Message.Format.MESSAGE_FORMAT) @Message(id = 212055, value = "Unable to close consumer", format = Message.Format.MESSAGE_FORMAT)
void unableToCloseConsumer(@Cause Exception e); void unableToCloseConsumer(@Cause Exception e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 212056, value = "local-bind-address specified for broadcast group but no local-bind-port. Using random port for UDP Broadcast ({0})",
format = Message.Format.MESSAGE_FORMAT)
void broadcastGroupBindError(String hostAndPort);
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT) @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
void onMessageError(@Cause Throwable e); void onMessageError(@Cause Throwable e);

View File

@ -17,128 +17,11 @@
package org.apache.activemq.artemis.tests.util; package org.apache.activemq.artemis.tests.util;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.Random;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
public final class RandomUtil { public class RandomUtil extends org.apache.activemq.artemis.utils.RandomUtil {
// Constants -----------------------------------------------------
private static final Random random = new Random(System.currentTimeMillis());
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
public static String randomString() {
return UUID.randomUUID().toString();
}
public static SimpleString randomSimpleString() {
return new SimpleString(RandomUtil.randomString());
}
public static char randomChar() {
return RandomUtil.randomString().charAt(0);
}
public static long randomLong() {
return RandomUtil.random.nextLong();
}
public static long randomPositiveLong() {
return Math.abs(RandomUtil.randomLong());
}
public static int randomInt() {
return RandomUtil.random.nextInt();
}
public static int randomPositiveInt() {
return Math.abs(RandomUtil.randomInt());
}
public static ActiveMQBuffer randomBuffer(final int size, final long... data) {
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length);
for (long d : data) {
buffer.writeLong(d);
}
for (int i = 0; i < size; i++) {
buffer.writeByte(randomByte());
}
return buffer;
}
public static int randomInterval(final int min, final int max) {
return min + randomMax(max - min);
}
public static int randomMax(final int max) {
int value = randomPositiveInt() % max;
if (value == 0) {
value = max;
}
return value;
}
public static int randomPort() {
return RandomUtil.random.nextInt(65536);
}
public static short randomShort() {
return (short) RandomUtil.random.nextInt(Short.MAX_VALUE);
}
public static byte randomByte() {
return Integer.valueOf(RandomUtil.random.nextInt()).byteValue();
}
public static boolean randomBoolean() {
return RandomUtil.random.nextBoolean();
}
public static byte[] randomBytes() {
return RandomUtil.randomString().getBytes();
}
public static byte[] randomBytes(final int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = RandomUtil.randomByte();
}
return bytes;
}
public static double randomDouble() {
return RandomUtil.random.nextDouble();
}
public static float randomFloat() {
return RandomUtil.random.nextFloat();
}
public static Xid randomXid() { public static Xid randomXid() {
return new XidImpl(RandomUtil.randomBytes(), RandomUtil.randomInt(), RandomUtil.randomBytes()); return new XidImpl(RandomUtil.randomBytes(), RandomUtil.randomInt(), RandomUtil.randomBytes());
} }
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
} }

View File

@ -263,21 +263,26 @@ public class ConnectionFactoryURITest {
discoveryGroupConfiguration.setName("foo").setRefreshTimeout(12345).setDiscoveryInitialWaitTimeout(5678).setBroadcastEndpointFactory(endpoint); discoveryGroupConfiguration.setName("foo").setRefreshTimeout(12345).setDiscoveryInitialWaitTimeout(5678).setBroadcastEndpointFactory(endpoint);
ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF); ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF);
URI tcp = parser.createSchema("udp", connectionFactoryWithHA); URI tcp = parser.createSchema("udp", connectionFactoryWithHA);
ActiveMQConnectionFactory factory = parser.newObject(tcp, null);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcp.toString());
DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration(); DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration();
Assert.assertNotNull(dgc); Assert.assertNotNull(dgc);
BroadcastEndpointFactory befc = dgc.getBroadcastEndpointFactory(); BroadcastEndpointFactory befc = dgc.getBroadcastEndpointFactory();
Assert.assertNotNull(befc); Assert.assertNotNull(befc);
Assert.assertTrue(befc instanceof UDPBroadcastEndpointFactory); Assert.assertTrue(befc instanceof UDPBroadcastEndpointFactory);
UDPBroadcastEndpointFactory ubgc = (UDPBroadcastEndpointFactory) befc; UDPBroadcastEndpointFactory ubgc = (UDPBroadcastEndpointFactory) befc;
Assert.assertEquals(ubgc.getGroupAddress(), "wahey"); Assert.assertEquals("wahey", ubgc.getGroupAddress());
Assert.assertEquals(ubgc.getGroupPort(), 3333); Assert.assertEquals(3333, ubgc.getGroupPort());
//these 2 are transient //these 2 are transient
Assert.assertEquals(ubgc.getLocalBindAddress(), null); // These will take the System.properties used on the testsuite,
Assert.assertEquals(ubgc.getLocalBindPort(), -1); // for that reason we take them as != instead of checking for null
Assert.assertEquals(dgc.getName(), "foo"); Assert.assertNotEquals("uhuh", ubgc.getLocalBindAddress());
Assert.assertEquals(dgc.getDiscoveryInitialWaitTimeout(), 5678); Assert.assertNotEquals(555, ubgc.getLocalBindPort());
Assert.assertEquals(dgc.getRefreshTimeout(), 12345);
Assert.assertEquals("foo", dgc.getName());
Assert.assertEquals(5678, dgc.getDiscoveryInitialWaitTimeout());
Assert.assertEquals(12345, dgc.getRefreshTimeout());
BeanUtilsBean bean = new BeanUtilsBean(); BeanUtilsBean bean = new BeanUtilsBean();
checkEquals(bean, connectionFactoryWithHA, factory); checkEquals(bean, connectionFactoryWithHA, factory);

View File

@ -94,7 +94,7 @@
<activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager <activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
-Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties -Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties
-Djava.library.path=${activemq.basedir}/artemis-native/bin/ -Djgroups.bind_addr=localhost -Djava.library.path=${activemq.basedir}/artemis-native/bin/ -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Stack=true
</activemq-surefire-argline> </activemq-surefire-argline>
<activemq.basedir>${project.basedir}</activemq.basedir> <activemq.basedir>${project.basedir}</activemq.basedir>

View File

@ -22,37 +22,37 @@
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests
# Root logger level # Root logger level
logger.level=INFO logger.level=TRACE
# ActiveMQ Artemis logger levels # ActiveMQ Artemis logger levels
logger.org.apache.activemq.artemis.core.server.level=TRACE logger.org.apache.activemq.artemis.core.server.level=TRACE
logger.org.apache.activemq.artemis.journal.level=INFO logger.org.apache.activemq.artemis.journal.level=TRACE
logger.org.apache.activemq.artemis.utils.level=INFO logger.org.apache.activemq.artemis.utils.level=TRACE
logger.org.apache.activemq.artemis.jms.level=INFO logger.org.apache.activemq.artemis.jms.level=TRACE
logger.org.apache.activemq.artemis.ra.level=INFO logger.org.apache.activemq.artemis.ra.level=TRACE
logger.org.apache.activemq.artemis.tests.unit.level=INFO logger.org.apache.activemq.artemis.tests.unit.level=TRACE
logger.org.apache.activemq.artemis.tests.integration.level=INFO logger.org.apache.activemq.artemis.tests.integration.level=TRACE
logger.org.apache.activemq.artemis.jms.tests.level=INFO logger.org.apache.activemq.artemis.jms.tests.level=TRACE
# Root logger handlers # Root logger handlers
logger.handlers=CONSOLE,TEST logger.handlers=CONSOLE,TEST,FILE
#logger.handlers=CONSOLE,FILE #logger.handlers=CONSOLE,FILE
# Console handler configuration # Console handler configuration
handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
handler.CONSOLE.properties=autoFlush handler.CONSOLE.properties=autoFlush
handler.CONSOLE.level=FINE handler.CONSOLE.level=INFO
handler.CONSOLE.autoFlush=true handler.CONSOLE.autoFlush=true
handler.CONSOLE.formatter=PATTERN handler.CONSOLE.formatter=PATTERN
# File handler configuration # File handler configuration
handler.FILE=org.jboss.logmanager.handlers.FileHandler handler.FILE=org.jboss.logmanager.handlers.FileHandler
handler.FILE.level=FINE handler.FILE.level=TRACE
handler.FILE.properties=autoFlush,fileName handler.FILE.properties=autoFlush,fileName
handler.FILE.autoFlush=true handler.FILE.autoFlush=true
handler.FILE.fileName=target/activemq.log handler.FILE.fileName=target/activemq.log
handler.FILE.formatter=PATTERN handler.FILE.formatter=PATTERN
# Console handler configuration # Test handler
handler.TEST=org.apache.activemq.artemis.logs.AssertionLoggerHandler handler.TEST=org.apache.activemq.artemis.logs.AssertionLoggerHandler
handler.TEST.level=TRACE handler.TEST.level=TRACE
handler.TEST.formatter=PATTERN handler.TEST.formatter=PATTERN

View File

@ -16,23 +16,20 @@
*/ */
package org.apache.activemq.artemis.tests.integration.cluster.distribution; package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.ArrayList;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase { public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase {

View File

@ -56,7 +56,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
protected static void verifyBroadcast(BroadcastGroup broadcastGroup, protected static void verifyBroadcast(BroadcastGroup broadcastGroup,
DiscoveryGroup discoveryGroup) throws Exception { DiscoveryGroup discoveryGroup) throws Exception {
broadcastGroup.broadcastConnectors(); broadcastGroup.broadcastConnectors();
Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000)); Assert.assertTrue("broadcast not received", discoveryGroup.waitForBroadcast(2000));
} }
/** /**
@ -147,7 +147,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
int localPort, int localPort,
final InetAddress groupAddress, final InetAddress groupAddress,
final int groupPort) throws Exception { final int groupPort) throws Exception {
return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null).setLocalBindPort(localPort)); return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : "localhost").setLocalBindPort(localPort));
} }
protected DiscoveryGroup newDiscoveryGroup(final String nodeID, protected DiscoveryGroup newDiscoveryGroup(final String nodeID,
@ -166,7 +166,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
final int groupPort, final int groupPort,
final long timeout, final long timeout,
NotificationService notif) throws Exception { NotificationService notif) throws Exception {
return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null), notif); return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : "localhost"), notif);
} }
protected final class FakeNodeManager extends NodeManager { protected final class FakeNodeManager extends NodeManager {

View File

@ -30,10 +30,12 @@ import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl; import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl;
import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/**
* This is to make sure discovery works fine even when garbled data is sent
*/
public class DiscoveryStayAliveTest extends DiscoveryBaseTest { public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
ScheduledExecutorService scheduledExecutorService; ScheduledExecutorService scheduledExecutorService;
@ -57,7 +59,7 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
final int groupPort = getUDPDiscoveryPort(); final int groupPort = getUDPDiscoveryPort();
final int timeout = 500; final int timeout = 500;
final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(), RandomUtil.randomString(), null, groupAddress, groupPort, timeout); final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(), RandomUtil.randomString(), InetAddress.getByName("localhost"), groupAddress, groupPort, timeout);
final AtomicInteger errors = new AtomicInteger(0); final AtomicInteger errors = new AtomicInteger(0);
Thread t = new Thread() { Thread t = new Thread() {
@ -74,30 +76,45 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
}; };
t.start(); t.start();
BroadcastGroupImpl bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"), RandomUtil.randomString(), 1, scheduledExecutorService, new UDPBroadcastEndpointFactory().setGroupAddress(address1). BroadcastGroupImpl bg = null;
setGroupPort(groupPort));
bg.start(); try {
bg.addConnector(generateTC()); bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"), RandomUtil.randomString(), 1, scheduledExecutorService, new UDPBroadcastEndpointFactory().setGroupAddress(address1).
setGroupPort(groupPort));
for (int i = 0; i < 10; i++) { bg.start();
BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastEndpointFactory().setGroupAddress(address1).
setGroupPort(groupPort);
sendBadData(factoryEndpoint);
bg.addConnector(generateTC());
for (int i = 0; i < 10; i++) {
BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastEndpointFactory().setGroupAddress(address1).
setGroupPort(groupPort).setLocalBindAddress("localhost");
sendBadData(factoryEndpoint);
}
Thread.sleep(100); Thread.sleep(100);
assertTrue(t.isAlive()); assertTrue(t.isAlive());
assertEquals(0, errors.get()); assertEquals(0, errors.get());
} }
finally {
bg.stop(); if (bg != null) {
dg.stop(); bg.stop();
}
t.join(5000); if (dg != null) {
dg.stop();
}
Assert.assertFalse(t.isAlive()); t.join(1000);
// it will retry for a limited time only
for (int i = 0; t.isAlive() && i < 100; i++) {
t.interrupt();
Thread.sleep(100);
}
}
} }
private static void sendBadData(BroadcastEndpointFactory factoryEndpoint) throws Exception { private static void sendBadData(BroadcastEndpointFactory factoryEndpoint) throws Exception {
@ -120,5 +137,7 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
endpoint.openBroadcaster(); endpoint.openBroadcaster();
endpoint.broadcast(bytes); endpoint.broadcast(bytes);
endpoint.close(true);
} }
} }

View File

@ -378,39 +378,11 @@ public class DiscoveryTest extends DiscoveryBaseTest {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
InetAddress localAddress = null; InetAddress localAddress = InetAddress.getLoopbackAddress();
outer:
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
if (networkInterface.isLoopback() || networkInterface.isVirtual() ||
!networkInterface.isUp() ||
!networkInterface.supportsMulticast()) {
continue;
}
Enumeration<InetAddress> en = networkInterface.getInetAddresses();
while (en.hasMoreElements()) {
InetAddress ia = en.nextElement();
if (ia.getAddress().length == 4) {
localAddress = ia;
break outer;
}
}
}
if (localAddress == null) {
log.warn("Can't find address to use");
return;
}
log.info("Local address is " + localAddress); log.info("Local address is " + localAddress);
bg = newBroadcast(nodeID, RandomUtil.randomString(), localAddress, 6552, groupAddress, groupPort); bg = newBroadcast(nodeID, RandomUtil.randomString(), localAddress, -1, groupAddress, groupPort);
bg.start(); bg.start();

View File

@ -280,7 +280,7 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
Hashtable props = new Hashtable<>(); Hashtable props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName()); props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName());
props.put("connectionFactory.myConnectionFactory", "udp://" + getUDPDiscoveryAddress() + ":" + getUDPDiscoveryPort() + "?" + props.put("connectionFactory.myConnectionFactory", "udp://" + getUDPDiscoveryAddress() + ":" + getUDPDiscoveryPort() + "?" +
TransportConstants.LOCAL_ADDRESS_PROP_NAME + "=127.0.0.1&" + TransportConstants.LOCAL_ADDRESS_PROP_NAME + "=Server1&" +
TransportConstants.LOCAL_PORT_PROP_NAME + "=1198&" + TransportConstants.LOCAL_PORT_PROP_NAME + "=1198&" +
ActiveMQInitialContextFactory.REFRESH_TIMEOUT + "=5000&" + ActiveMQInitialContextFactory.REFRESH_TIMEOUT + "=5000&" +
ActiveMQInitialContextFactory.DISCOVERY_INITIAL_WAIT_TIMEOUT + "=6000"); ActiveMQInitialContextFactory.DISCOVERY_INITIAL_WAIT_TIMEOUT + "=6000");
@ -294,8 +294,8 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
UDPBroadcastEndpointFactory udpBroadcastEndpointFactory = (UDPBroadcastEndpointFactory) discoveryGroupConfiguration.getBroadcastEndpointFactory(); UDPBroadcastEndpointFactory udpBroadcastEndpointFactory = (UDPBroadcastEndpointFactory) discoveryGroupConfiguration.getBroadcastEndpointFactory();
//these 2 are transient so are ignored //these 2 are transient so are ignored
Assert.assertEquals(null, udpBroadcastEndpointFactory.getLocalBindAddress()); Assert.assertNotEquals("Server1", udpBroadcastEndpointFactory.getLocalBindAddress());
Assert.assertEquals(-1, udpBroadcastEndpointFactory.getLocalBindPort()); Assert.assertNotEquals(1198, udpBroadcastEndpointFactory.getLocalBindPort());
Assert.assertEquals(getUDPDiscoveryAddress(), udpBroadcastEndpointFactory.getGroupAddress()); Assert.assertEquals(getUDPDiscoveryAddress(), udpBroadcastEndpointFactory.getGroupAddress());
Assert.assertEquals(getUDPDiscoveryPort(), udpBroadcastEndpointFactory.getGroupPort()); Assert.assertEquals(getUDPDiscoveryPort(), udpBroadcastEndpointFactory.getGroupPort());
} }

View File

@ -82,8 +82,8 @@ public class ConnectionFactorySerializationTest extends JMSTestBase {
Assert.assertEquals(dgc.getRefreshTimeout(), 5000); Assert.assertEquals(dgc.getRefreshTimeout(), 5000);
Assert.assertTrue(dgc.getBroadcastEndpointFactory() instanceof UDPBroadcastEndpointFactory); Assert.assertTrue(dgc.getBroadcastEndpointFactory() instanceof UDPBroadcastEndpointFactory);
UDPBroadcastEndpointFactory befc = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory(); UDPBroadcastEndpointFactory befc = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
Assert.assertEquals(-1, befc.getLocalBindPort()); Assert.assertEquals(Integer.parseInt(System.getProperty("org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindPort", "-1")), befc.getLocalBindPort());
Assert.assertEquals(null, befc.getLocalBindAddress()); Assert.assertEquals(System.getProperty("org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress"), befc.getLocalBindAddress());
Assert.assertEquals(1234, befc.getGroupPort()); Assert.assertEquals(1234, befc.getGroupPort());
Assert.assertEquals("1.2.3.4", befc.getGroupAddress()); Assert.assertEquals("1.2.3.4", befc.getGroupAddress());
} }