This closes #152 UDP improvement and fixes

This commit is contained in:
Clebert Suconic 2015-09-03 18:09:19 -04:00
commit dbd8bdbd8f
15 changed files with 275 additions and 215 deletions

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.Random;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
public class RandomUtil {
// Constants -----------------------------------------------------
protected static final Random random = new Random(System.currentTimeMillis());
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
public static String randomString() {
return java.util.UUID.randomUUID().toString();
}
public static SimpleString randomSimpleString() {
return new SimpleString(RandomUtil.randomString());
}
public static char randomChar() {
return RandomUtil.randomString().charAt(0);
}
public static long randomLong() {
return RandomUtil.random.nextLong();
}
public static long randomPositiveLong() {
return Math.abs(RandomUtil.randomLong());
}
public static int randomInt() {
return RandomUtil.random.nextInt();
}
public static int randomPositiveInt() {
return Math.abs(RandomUtil.randomInt());
}
public static ActiveMQBuffer randomBuffer(final int size, final long... data) {
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length);
for (long d : data) {
buffer.writeLong(d);
}
for (int i = 0; i < size; i++) {
buffer.writeByte(randomByte());
}
return buffer;
}
public static int randomInterval(final int min, final int max) {
return min + randomMax(max - min);
}
public static int randomMax(final int max) {
int value = randomPositiveInt() % max;
if (value == 0) {
value = max;
}
return value;
}
public static int randomPort() {
return RandomUtil.random.nextInt(65536);
}
public static short randomShort() {
return (short) RandomUtil.random.nextInt(Short.MAX_VALUE);
}
public static byte randomByte() {
return Integer.valueOf(RandomUtil.random.nextInt()).byteValue();
}
public static boolean randomBoolean() {
return RandomUtil.random.nextBoolean();
}
public static byte[] randomBytes() {
return RandomUtil.randomString().getBytes();
}
public static byte[] randomBytes(final int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = RandomUtil.randomByte();
}
return bytes;
}
public static double randomDouble() {
return RandomUtil.random.nextDouble();
}
public static float randomFloat() {
return RandomUtil.random.nextFloat();
}
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}

View File

@ -27,6 +27,7 @@ import java.net.MulticastSocket;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.RandomUtil;
/** /**
* The configuration used to determine how the server will broadcast members. * The configuration used to determine how the server will broadcast members.
@ -35,7 +36,10 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
*/ */
public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFactory { public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFactory {
private transient String localBindAddress = null; // You can specify a property as a default. This is useful for testsuite running.
// for that reason we won't document this property as the proper way to do it is through configuration.
// these property names can change at any time, so don't use this on production systems
private transient String localBindAddress = getProperty(UDPBroadcastEndpointFactory.class.getName() + ".localBindAddress", null);
private transient int localBindPort = -1; private transient int localBindPort = -1;
@ -170,10 +174,24 @@ public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFacto
} }
else { else {
if (localAddress != null) { if (localAddress != null) {
ActiveMQClientLogger.LOGGER.broadcastGroupBindError(); java.util.Random random = new java.util.Random(System.currentTimeMillis());
for (int i = 0; i < 100; i++) {
int nextPort = RandomUtil.randomInterval(3000, 4000);
try {
broadcastingSocket = new DatagramSocket(nextPort, localAddress);
ActiveMQClientLogger.LOGGER.broadcastGroupBindError(localAddress.toString() + ":" + nextPort);
break;
} }
catch (Exception e) {
ActiveMQClientLogger.LOGGER.broadcastGroupBindErrorRetry(localAddress.toString() + ":" + nextPort, e);
}
}
}
if (broadcastingSocket == null) {
broadcastingSocket = new DatagramSocket(); broadcastingSocket = new DatagramSocket();
} }
}
open = true; open = true;
} }
@ -231,13 +249,35 @@ public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFacto
} }
private static boolean checkForPresence(String key, String value) { private static boolean checkForPresence(String key, String value) {
try { String tmp = getProperty(key, null);
String tmp = System.getProperty(key);
return tmp != null && tmp.trim().toLowerCase().startsWith(value); return tmp != null && tmp.trim().toLowerCase().startsWith(value);
} }
catch (Throwable t) {
return false;
} }
private static String getProperty(String key, String defaultValue) {
try {
String tmp = System.getProperty(key);
if (tmp == null) {
tmp = defaultValue;
}
return tmp;
}
catch (Throwable t) {
ActiveMQClientLogger.LOGGER.warn(t);
return defaultValue;
}
}
private static int getIntProperty(String key, String defaultValue) {
String value = getProperty(key, defaultValue);
try {
return Integer.parseInt(value);
}
catch (Throwable t) {
ActiveMQClientLogger.LOGGER.warn(t.getMessage(), t);
return Integer.parseInt(defaultValue);
} }
} }

View File

@ -260,9 +260,9 @@ public interface ActiveMQClientLogger extends BasicLogger {
void jvmAllocatedMoreMemory(Long totalMemory1, Long totalMemory2); void jvmAllocatedMoreMemory(Long totalMemory1, Long totalMemory2);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 212048, value = "local-bind-address specified for broadcast group but no local-bind-port specified so socket will NOT be bound to a local address/port", @Message(id = 212048, value = "Random address ({0}) was already in use, trying another time",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void broadcastGroupBindError(); void broadcastGroupBindErrorRetry(String hostAndPort, @Cause Throwable t);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.WARN)
@Message(id = 212049, @Message(id = 212049,
@ -309,6 +309,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
@Message(id = 212055, value = "Unable to close consumer", format = Message.Format.MESSAGE_FORMAT) @Message(id = 212055, value = "Unable to close consumer", format = Message.Format.MESSAGE_FORMAT)
void unableToCloseConsumer(@Cause Exception e); void unableToCloseConsumer(@Cause Exception e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 212056, value = "local-bind-address specified for broadcast group but no local-bind-port. Using random port for UDP Broadcast ({0})",
format = Message.Format.MESSAGE_FORMAT)
void broadcastGroupBindError(String hostAndPort);
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT) @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
void onMessageError(@Cause Throwable e); void onMessageError(@Cause Throwable e);

View File

@ -816,7 +816,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
} }
} while (retry); } while (retry);
}
synchronized (topologyArrayGuard) {
// We always wait for the topology, as the server // We always wait for the topology, as the server
// will send a single element if not cluster // will send a single element if not cluster
// so clients can know the id of the server they are connected to // so clients can know the id of the server they are connected to
@ -824,7 +826,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis()) { while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis()) {
// Now wait for the topology // Now wait for the topology
try { try {
wait(1000); topologyArrayGuard.wait(1000);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e); throw new ActiveMQInterruptedException(e);
@ -847,7 +849,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return factory; return factory;
} }
} }
public boolean isHA() { public boolean isHA() {
@ -1410,10 +1411,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
updateArraysAndPairs(); updateArraysAndPairs();
if (last) { if (last) {
synchronized (this) { synchronized (topologyArrayGuard) {
receivedTopology = true; receivedTopology = true;
// Notify if waiting on getting topology // Notify if waiting on getting topology
notifyAll(); topologyArrayGuard.notifyAll();
} }
} }
} }

View File

@ -17,128 +17,11 @@
package org.apache.activemq.artemis.tests.util; package org.apache.activemq.artemis.tests.util;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.Random;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
public final class RandomUtil { public class RandomUtil extends org.apache.activemq.artemis.utils.RandomUtil {
// Constants -----------------------------------------------------
private static final Random random = new Random(System.currentTimeMillis());
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
public static String randomString() {
return UUID.randomUUID().toString();
}
public static SimpleString randomSimpleString() {
return new SimpleString(RandomUtil.randomString());
}
public static char randomChar() {
return RandomUtil.randomString().charAt(0);
}
public static long randomLong() {
return RandomUtil.random.nextLong();
}
public static long randomPositiveLong() {
return Math.abs(RandomUtil.randomLong());
}
public static int randomInt() {
return RandomUtil.random.nextInt();
}
public static int randomPositiveInt() {
return Math.abs(RandomUtil.randomInt());
}
public static ActiveMQBuffer randomBuffer(final int size, final long... data) {
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length);
for (long d : data) {
buffer.writeLong(d);
}
for (int i = 0; i < size; i++) {
buffer.writeByte(randomByte());
}
return buffer;
}
public static int randomInterval(final int min, final int max) {
return min + randomMax(max - min);
}
public static int randomMax(final int max) {
int value = randomPositiveInt() % max;
if (value == 0) {
value = max;
}
return value;
}
public static int randomPort() {
return RandomUtil.random.nextInt(65536);
}
public static short randomShort() {
return (short) RandomUtil.random.nextInt(Short.MAX_VALUE);
}
public static byte randomByte() {
return Integer.valueOf(RandomUtil.random.nextInt()).byteValue();
}
public static boolean randomBoolean() {
return RandomUtil.random.nextBoolean();
}
public static byte[] randomBytes() {
return RandomUtil.randomString().getBytes();
}
public static byte[] randomBytes(final int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = RandomUtil.randomByte();
}
return bytes;
}
public static double randomDouble() {
return RandomUtil.random.nextDouble();
}
public static float randomFloat() {
return RandomUtil.random.nextFloat();
}
public static Xid randomXid() { public static Xid randomXid() {
return new XidImpl(RandomUtil.randomBytes(), RandomUtil.randomInt(), RandomUtil.randomBytes()); return new XidImpl(RandomUtil.randomBytes(), RandomUtil.randomInt(), RandomUtil.randomBytes());
} }
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
} }

View File

@ -263,21 +263,26 @@ public class ConnectionFactoryURITest {
discoveryGroupConfiguration.setName("foo").setRefreshTimeout(12345).setDiscoveryInitialWaitTimeout(5678).setBroadcastEndpointFactory(endpoint); discoveryGroupConfiguration.setName("foo").setRefreshTimeout(12345).setDiscoveryInitialWaitTimeout(5678).setBroadcastEndpointFactory(endpoint);
ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF); ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF);
URI tcp = parser.createSchema("udp", connectionFactoryWithHA); URI tcp = parser.createSchema("udp", connectionFactoryWithHA);
ActiveMQConnectionFactory factory = parser.newObject(tcp, null);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcp.toString());
DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration(); DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration();
Assert.assertNotNull(dgc); Assert.assertNotNull(dgc);
BroadcastEndpointFactory befc = dgc.getBroadcastEndpointFactory(); BroadcastEndpointFactory befc = dgc.getBroadcastEndpointFactory();
Assert.assertNotNull(befc); Assert.assertNotNull(befc);
Assert.assertTrue(befc instanceof UDPBroadcastEndpointFactory); Assert.assertTrue(befc instanceof UDPBroadcastEndpointFactory);
UDPBroadcastEndpointFactory ubgc = (UDPBroadcastEndpointFactory) befc; UDPBroadcastEndpointFactory ubgc = (UDPBroadcastEndpointFactory) befc;
Assert.assertEquals(ubgc.getGroupAddress(), "wahey"); Assert.assertEquals("wahey", ubgc.getGroupAddress());
Assert.assertEquals(ubgc.getGroupPort(), 3333); Assert.assertEquals(3333, ubgc.getGroupPort());
//these 2 are transient //these 2 are transient
Assert.assertEquals(ubgc.getLocalBindAddress(), null); // These will take the System.properties used on the testsuite,
Assert.assertEquals(ubgc.getLocalBindPort(), -1); // for that reason we take them as != instead of checking for null
Assert.assertEquals(dgc.getName(), "foo"); Assert.assertNotEquals("uhuh", ubgc.getLocalBindAddress());
Assert.assertEquals(dgc.getDiscoveryInitialWaitTimeout(), 5678); Assert.assertNotEquals(555, ubgc.getLocalBindPort());
Assert.assertEquals(dgc.getRefreshTimeout(), 12345);
Assert.assertEquals("foo", dgc.getName());
Assert.assertEquals(5678, dgc.getDiscoveryInitialWaitTimeout());
Assert.assertEquals(12345, dgc.getRefreshTimeout());
BeanUtilsBean bean = new BeanUtilsBean(); BeanUtilsBean bean = new BeanUtilsBean();
checkEquals(bean, connectionFactoryWithHA, factory); checkEquals(bean, connectionFactoryWithHA, factory);

View File

@ -94,7 +94,7 @@
<activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager <activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
-Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties -Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties
-Djava.library.path=${activemq.basedir}/artemis-native/bin/ -Djgroups.bind_addr=localhost -Djava.library.path=${activemq.basedir}/artemis-native/bin/ -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Stack=true
</activemq-surefire-argline> </activemq-surefire-argline>
<activemq.basedir>${project.basedir}</activemq.basedir> <activemq.basedir>${project.basedir}</activemq.basedir>

View File

@ -22,37 +22,37 @@
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests
# Root logger level # Root logger level
logger.level=INFO logger.level=TRACE
# ActiveMQ Artemis logger levels # ActiveMQ Artemis logger levels
logger.org.apache.activemq.artemis.core.server.level=TRACE logger.org.apache.activemq.artemis.core.server.level=TRACE
logger.org.apache.activemq.artemis.journal.level=INFO logger.org.apache.activemq.artemis.journal.level=TRACE
logger.org.apache.activemq.artemis.utils.level=INFO logger.org.apache.activemq.artemis.utils.level=TRACE
logger.org.apache.activemq.artemis.jms.level=INFO logger.org.apache.activemq.artemis.jms.level=TRACE
logger.org.apache.activemq.artemis.ra.level=INFO logger.org.apache.activemq.artemis.ra.level=TRACE
logger.org.apache.activemq.artemis.tests.unit.level=INFO logger.org.apache.activemq.artemis.tests.unit.level=TRACE
logger.org.apache.activemq.artemis.tests.integration.level=INFO logger.org.apache.activemq.artemis.tests.integration.level=TRACE
logger.org.apache.activemq.artemis.jms.tests.level=INFO logger.org.apache.activemq.artemis.jms.tests.level=TRACE
# Root logger handlers # Root logger handlers
logger.handlers=CONSOLE,TEST logger.handlers=CONSOLE,TEST,FILE
#logger.handlers=CONSOLE,FILE #logger.handlers=CONSOLE,FILE
# Console handler configuration # Console handler configuration
handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
handler.CONSOLE.properties=autoFlush handler.CONSOLE.properties=autoFlush
handler.CONSOLE.level=FINE handler.CONSOLE.level=INFO
handler.CONSOLE.autoFlush=true handler.CONSOLE.autoFlush=true
handler.CONSOLE.formatter=PATTERN handler.CONSOLE.formatter=PATTERN
# File handler configuration # File handler configuration
handler.FILE=org.jboss.logmanager.handlers.FileHandler handler.FILE=org.jboss.logmanager.handlers.FileHandler
handler.FILE.level=FINE handler.FILE.level=TRACE
handler.FILE.properties=autoFlush,fileName handler.FILE.properties=autoFlush,fileName
handler.FILE.autoFlush=true handler.FILE.autoFlush=true
handler.FILE.fileName=target/activemq.log handler.FILE.fileName=target/activemq.log
handler.FILE.formatter=PATTERN handler.FILE.formatter=PATTERN
# Console handler configuration # Test handler
handler.TEST=org.apache.activemq.artemis.logs.AssertionLoggerHandler handler.TEST=org.apache.activemq.artemis.logs.AssertionLoggerHandler
handler.TEST.level=TRACE handler.TEST.level=TRACE
handler.TEST.formatter=PATTERN handler.TEST.formatter=PATTERN

View File

@ -16,23 +16,20 @@
*/ */
package org.apache.activemq.artemis.tests.integration.cluster.distribution; package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.ArrayList;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase { public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase {

View File

@ -56,7 +56,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
protected static void verifyBroadcast(BroadcastGroup broadcastGroup, protected static void verifyBroadcast(BroadcastGroup broadcastGroup,
DiscoveryGroup discoveryGroup) throws Exception { DiscoveryGroup discoveryGroup) throws Exception {
broadcastGroup.broadcastConnectors(); broadcastGroup.broadcastConnectors();
Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000)); Assert.assertTrue("broadcast not received", discoveryGroup.waitForBroadcast(2000));
} }
/** /**
@ -147,7 +147,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
int localPort, int localPort,
final InetAddress groupAddress, final InetAddress groupAddress,
final int groupPort) throws Exception { final int groupPort) throws Exception {
return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null).setLocalBindPort(localPort)); return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : "localhost").setLocalBindPort(localPort));
} }
protected DiscoveryGroup newDiscoveryGroup(final String nodeID, protected DiscoveryGroup newDiscoveryGroup(final String nodeID,
@ -166,7 +166,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
final int groupPort, final int groupPort,
final long timeout, final long timeout,
NotificationService notif) throws Exception { NotificationService notif) throws Exception {
return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null), notif); return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : "localhost"), notif);
} }
protected final class FakeNodeManager extends NodeManager { protected final class FakeNodeManager extends NodeManager {

View File

@ -30,10 +30,12 @@ import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl; import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl;
import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/**
* This is to make sure discovery works fine even when garbled data is sent
*/
public class DiscoveryStayAliveTest extends DiscoveryBaseTest { public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
ScheduledExecutorService scheduledExecutorService; ScheduledExecutorService scheduledExecutorService;
@ -57,7 +59,7 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
final int groupPort = getUDPDiscoveryPort(); final int groupPort = getUDPDiscoveryPort();
final int timeout = 500; final int timeout = 500;
final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(), RandomUtil.randomString(), null, groupAddress, groupPort, timeout); final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(), RandomUtil.randomString(), InetAddress.getByName("localhost"), groupAddress, groupPort, timeout);
final AtomicInteger errors = new AtomicInteger(0); final AtomicInteger errors = new AtomicInteger(0);
Thread t = new Thread() { Thread t = new Thread() {
@ -74,7 +76,11 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
}; };
t.start(); t.start();
BroadcastGroupImpl bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"), RandomUtil.randomString(), 1, scheduledExecutorService, new UDPBroadcastEndpointFactory().setGroupAddress(address1). BroadcastGroupImpl bg = null;
try {
bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"), RandomUtil.randomString(), 1, scheduledExecutorService, new UDPBroadcastEndpointFactory().setGroupAddress(address1).
setGroupPort(groupPort)); setGroupPort(groupPort));
bg.start(); bg.start();
@ -83,21 +89,32 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastEndpointFactory().setGroupAddress(address1). BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastEndpointFactory().setGroupAddress(address1).
setGroupPort(groupPort); setGroupPort(groupPort).setLocalBindAddress("localhost");
sendBadData(factoryEndpoint); sendBadData(factoryEndpoint);
}
Thread.sleep(100); Thread.sleep(100);
assertTrue(t.isAlive()); assertTrue(t.isAlive());
assertEquals(0, errors.get()); assertEquals(0, errors.get());
} }
finally {
if (bg != null) {
bg.stop(); bg.stop();
}
if (dg != null) {
dg.stop(); dg.stop();
}
t.join(5000); t.join(1000);
Assert.assertFalse(t.isAlive());
// it will retry for a limited time only
for (int i = 0; t.isAlive() && i < 100; i++) {
t.interrupt();
Thread.sleep(100);
}
}
} }
private static void sendBadData(BroadcastEndpointFactory factoryEndpoint) throws Exception { private static void sendBadData(BroadcastEndpointFactory factoryEndpoint) throws Exception {
@ -120,5 +137,7 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest {
endpoint.openBroadcaster(); endpoint.openBroadcaster();
endpoint.broadcast(bytes); endpoint.broadcast(bytes);
endpoint.close(true);
} }
} }

View File

@ -378,39 +378,11 @@ public class DiscoveryTest extends DiscoveryBaseTest {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
InetAddress localAddress = null; InetAddress localAddress = InetAddress.getLoopbackAddress();
outer:
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
if (networkInterface.isLoopback() || networkInterface.isVirtual() ||
!networkInterface.isUp() ||
!networkInterface.supportsMulticast()) {
continue;
}
Enumeration<InetAddress> en = networkInterface.getInetAddresses();
while (en.hasMoreElements()) {
InetAddress ia = en.nextElement();
if (ia.getAddress().length == 4) {
localAddress = ia;
break outer;
}
}
}
if (localAddress == null) {
log.warn("Can't find address to use");
return;
}
log.info("Local address is " + localAddress); log.info("Local address is " + localAddress);
bg = newBroadcast(nodeID, RandomUtil.randomString(), localAddress, 6552, groupAddress, groupPort); bg = newBroadcast(nodeID, RandomUtil.randomString(), localAddress, -1, groupAddress, groupPort);
bg.start(); bg.start();

View File

@ -280,7 +280,7 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
Hashtable props = new Hashtable<>(); Hashtable props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName()); props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName());
props.put("connectionFactory.myConnectionFactory", "udp://" + getUDPDiscoveryAddress() + ":" + getUDPDiscoveryPort() + "?" + props.put("connectionFactory.myConnectionFactory", "udp://" + getUDPDiscoveryAddress() + ":" + getUDPDiscoveryPort() + "?" +
TransportConstants.LOCAL_ADDRESS_PROP_NAME + "=127.0.0.1&" + TransportConstants.LOCAL_ADDRESS_PROP_NAME + "=Server1&" +
TransportConstants.LOCAL_PORT_PROP_NAME + "=1198&" + TransportConstants.LOCAL_PORT_PROP_NAME + "=1198&" +
ActiveMQInitialContextFactory.REFRESH_TIMEOUT + "=5000&" + ActiveMQInitialContextFactory.REFRESH_TIMEOUT + "=5000&" +
ActiveMQInitialContextFactory.DISCOVERY_INITIAL_WAIT_TIMEOUT + "=6000"); ActiveMQInitialContextFactory.DISCOVERY_INITIAL_WAIT_TIMEOUT + "=6000");
@ -294,8 +294,8 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
UDPBroadcastEndpointFactory udpBroadcastEndpointFactory = (UDPBroadcastEndpointFactory) discoveryGroupConfiguration.getBroadcastEndpointFactory(); UDPBroadcastEndpointFactory udpBroadcastEndpointFactory = (UDPBroadcastEndpointFactory) discoveryGroupConfiguration.getBroadcastEndpointFactory();
//these 2 are transient so are ignored //these 2 are transient so are ignored
Assert.assertEquals(null, udpBroadcastEndpointFactory.getLocalBindAddress()); Assert.assertNotEquals("Server1", udpBroadcastEndpointFactory.getLocalBindAddress());
Assert.assertEquals(-1, udpBroadcastEndpointFactory.getLocalBindPort()); Assert.assertNotEquals(1198, udpBroadcastEndpointFactory.getLocalBindPort());
Assert.assertEquals(getUDPDiscoveryAddress(), udpBroadcastEndpointFactory.getGroupAddress()); Assert.assertEquals(getUDPDiscoveryAddress(), udpBroadcastEndpointFactory.getGroupAddress());
Assert.assertEquals(getUDPDiscoveryPort(), udpBroadcastEndpointFactory.getGroupPort()); Assert.assertEquals(getUDPDiscoveryPort(), udpBroadcastEndpointFactory.getGroupPort());
} }

View File

@ -82,8 +82,8 @@ public class ConnectionFactorySerializationTest extends JMSTestBase {
Assert.assertEquals(dgc.getRefreshTimeout(), 5000); Assert.assertEquals(dgc.getRefreshTimeout(), 5000);
Assert.assertTrue(dgc.getBroadcastEndpointFactory() instanceof UDPBroadcastEndpointFactory); Assert.assertTrue(dgc.getBroadcastEndpointFactory() instanceof UDPBroadcastEndpointFactory);
UDPBroadcastEndpointFactory befc = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory(); UDPBroadcastEndpointFactory befc = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
Assert.assertEquals(-1, befc.getLocalBindPort()); Assert.assertEquals(Integer.parseInt(System.getProperty("org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindPort", "-1")), befc.getLocalBindPort());
Assert.assertEquals(null, befc.getLocalBindAddress()); Assert.assertEquals(System.getProperty("org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress"), befc.getLocalBindAddress());
Assert.assertEquals(1234, befc.getGroupPort()); Assert.assertEquals(1234, befc.getGroupPort());
Assert.assertEquals("1.2.3.4", befc.getGroupAddress()); Assert.assertEquals("1.2.3.4", befc.getGroupAddress());
} }