diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java new file mode 100644 index 0000000000..0451437ef6 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/RandomUtil.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.Random; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.SimpleString; + +public class RandomUtil { + // Constants ----------------------------------------------------- + + protected static final Random random = new Random(System.currentTimeMillis()); + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + public static String randomString() { + return java.util.UUID.randomUUID().toString(); + } + + public static SimpleString randomSimpleString() { + return new SimpleString(RandomUtil.randomString()); + } + + public static char randomChar() { + return RandomUtil.randomString().charAt(0); + } + + public static long randomLong() { + return RandomUtil.random.nextLong(); + } + + public static long randomPositiveLong() { + return Math.abs(RandomUtil.randomLong()); + } + + public static int randomInt() { + return RandomUtil.random.nextInt(); + } + + public static int randomPositiveInt() { + return Math.abs(RandomUtil.randomInt()); + } + + public static ActiveMQBuffer randomBuffer(final int size, final long... data) { + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length); + + for (long d : data) { + buffer.writeLong(d); + } + + for (int i = 0; i < size; i++) { + buffer.writeByte(randomByte()); + } + + return buffer; + } + + public static int randomInterval(final int min, final int max) { + return min + randomMax(max - min); + } + + public static int randomMax(final int max) { + int value = randomPositiveInt() % max; + + if (value == 0) { + value = max; + } + + return value; + } + + public static int randomPort() { + return RandomUtil.random.nextInt(65536); + } + + public static short randomShort() { + return (short) RandomUtil.random.nextInt(Short.MAX_VALUE); + } + + public static byte randomByte() { + return Integer.valueOf(RandomUtil.random.nextInt()).byteValue(); + } + + public static boolean randomBoolean() { + return RandomUtil.random.nextBoolean(); + } + + public static byte[] randomBytes() { + return RandomUtil.randomString().getBytes(); + } + + public static byte[] randomBytes(final int length) { + byte[] bytes = new byte[length]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = RandomUtil.randomByte(); + } + return bytes; + } + + public static double randomDouble() { + return RandomUtil.random.nextDouble(); + } + + public static float randomFloat() { + return RandomUtil.random.nextFloat(); + } + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/UDPBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/UDPBroadcastEndpointFactory.java index 193bd315e4..13c69850b3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/UDPBroadcastEndpointFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/UDPBroadcastEndpointFactory.java @@ -27,6 +27,7 @@ import java.net.MulticastSocket; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.artemis.utils.RandomUtil; /** * The configuration used to determine how the server will broadcast members. @@ -35,7 +36,10 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; */ public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFactory { - private transient String localBindAddress = null; + // You can specify a property as a default. This is useful for testsuite running. + // for that reason we won't document this property as the proper way to do it is through configuration. + // these property names can change at any time, so don't use this on production systems + private transient String localBindAddress = getProperty(UDPBroadcastEndpointFactory.class.getName() + ".localBindAddress", null); private transient int localBindPort = -1; @@ -170,9 +174,23 @@ public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFacto } else { if (localAddress != null) { - ActiveMQClientLogger.LOGGER.broadcastGroupBindError(); + java.util.Random random = new java.util.Random(System.currentTimeMillis()); + + for (int i = 0; i < 100; i++) { + int nextPort = RandomUtil.randomInterval(3000, 4000); + try { + broadcastingSocket = new DatagramSocket(nextPort, localAddress); + ActiveMQClientLogger.LOGGER.broadcastGroupBindError(localAddress.toString() + ":" + nextPort); + break; + } + catch (Exception e) { + ActiveMQClientLogger.LOGGER.broadcastGroupBindErrorRetry(localAddress.toString() + ":" + nextPort, e); + } + } + } + if (broadcastingSocket == null) { + broadcastingSocket = new DatagramSocket(); } - broadcastingSocket = new DatagramSocket(); } open = true; @@ -231,13 +249,35 @@ public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFacto } private static boolean checkForPresence(String key, String value) { - try { - String tmp = System.getProperty(key); - return tmp != null && tmp.trim().toLowerCase().startsWith(value); - } - catch (Throwable t) { - return false; + String tmp = getProperty(key, null); + return tmp != null && tmp.trim().toLowerCase().startsWith(value); + } + + } + + private static String getProperty(String key, String defaultValue) { + try { + String tmp = System.getProperty(key); + if (tmp == null) { + tmp = defaultValue; } + return tmp; + } + catch (Throwable t) { + ActiveMQClientLogger.LOGGER.warn(t); + return defaultValue; + } + } + + private static int getIntProperty(String key, String defaultValue) { + String value = getProperty(key, defaultValue); + + try { + return Integer.parseInt(value); + } + catch (Throwable t) { + ActiveMQClientLogger.LOGGER.warn(t.getMessage(), t); + return Integer.parseInt(defaultValue); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 901797c3d7..f881191796 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -260,9 +260,9 @@ public interface ActiveMQClientLogger extends BasicLogger { void jvmAllocatedMoreMemory(Long totalMemory1, Long totalMemory2); @LogMessage(level = Logger.Level.WARN) - @Message(id = 212048, value = "local-bind-address specified for broadcast group but no local-bind-port specified so socket will NOT be bound to a local address/port", + @Message(id = 212048, value = "Random address ({0}) was already in use, trying another time", format = Message.Format.MESSAGE_FORMAT) - void broadcastGroupBindError(); + void broadcastGroupBindErrorRetry(String hostAndPort, @Cause Throwable t); @LogMessage(level = Logger.Level.WARN) @Message(id = 212049, @@ -309,6 +309,11 @@ public interface ActiveMQClientLogger extends BasicLogger { @Message(id = 212055, value = "Unable to close consumer", format = Message.Format.MESSAGE_FORMAT) void unableToCloseConsumer(@Cause Exception e); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 212056, value = "local-bind-address specified for broadcast group but no local-bind-port. Using random port for UDP Broadcast ({0})", + format = Message.Format.MESSAGE_FORMAT) + void broadcastGroupBindError(String hostAndPort); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT) void onMessageError(@Cause Throwable e); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index deb17dad82..bed47b7a21 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -816,7 +816,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } } while (retry); + } + synchronized (topologyArrayGuard) { // We always wait for the topology, as the server // will send a single element if not cluster // so clients can know the id of the server they are connected to @@ -824,7 +826,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis()) { // Now wait for the topology try { - wait(1000); + topologyArrayGuard.wait(1000); } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); @@ -847,7 +849,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return factory; } - } public boolean isHA() { @@ -1410,10 +1411,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery updateArraysAndPairs(); if (last) { - synchronized (this) { + synchronized (topologyArrayGuard) { receivedTopology = true; // Notify if waiting on getting topology - notifyAll(); + topologyArrayGuard.notifyAll(); } } } diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/util/RandomUtil.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/util/RandomUtil.java index 20a2957e39..d34cd0cfde 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/util/RandomUtil.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/util/RandomUtil.java @@ -17,128 +17,11 @@ package org.apache.activemq.artemis.tests.util; import javax.transaction.xa.Xid; -import java.util.Random; -import java.util.UUID; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; -public final class RandomUtil { - // Constants ----------------------------------------------------- - - private static final Random random = new Random(System.currentTimeMillis()); - - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - public static String randomString() { - return UUID.randomUUID().toString(); - } - - public static SimpleString randomSimpleString() { - return new SimpleString(RandomUtil.randomString()); - } - - public static char randomChar() { - return RandomUtil.randomString().charAt(0); - } - - public static long randomLong() { - return RandomUtil.random.nextLong(); - } - - public static long randomPositiveLong() { - return Math.abs(RandomUtil.randomLong()); - } - - public static int randomInt() { - return RandomUtil.random.nextInt(); - } - - public static int randomPositiveInt() { - return Math.abs(RandomUtil.randomInt()); - } - - public static ActiveMQBuffer randomBuffer(final int size, final long... data) { - ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length); - - for (long d : data) { - buffer.writeLong(d); - } - - for (int i = 0; i < size; i++) { - buffer.writeByte(randomByte()); - } - - return buffer; - } - - public static int randomInterval(final int min, final int max) { - return min + randomMax(max - min); - } - - public static int randomMax(final int max) { - int value = randomPositiveInt() % max; - - if (value == 0) { - value = max; - } - - return value; - } - - public static int randomPort() { - return RandomUtil.random.nextInt(65536); - } - - public static short randomShort() { - return (short) RandomUtil.random.nextInt(Short.MAX_VALUE); - } - - public static byte randomByte() { - return Integer.valueOf(RandomUtil.random.nextInt()).byteValue(); - } - - public static boolean randomBoolean() { - return RandomUtil.random.nextBoolean(); - } - - public static byte[] randomBytes() { - return RandomUtil.randomString().getBytes(); - } - - public static byte[] randomBytes(final int length) { - byte[] bytes = new byte[length]; - for (int i = 0; i < bytes.length; i++) { - bytes[i] = RandomUtil.randomByte(); - } - return bytes; - } - - public static double randomDouble() { - return RandomUtil.random.nextDouble(); - } - - public static float randomFloat() { - return RandomUtil.random.nextFloat(); - } +public class RandomUtil extends org.apache.activemq.artemis.utils.RandomUtil { public static Xid randomXid() { return new XidImpl(RandomUtil.randomBytes(), RandomUtil.randomInt(), RandomUtil.randomBytes()); } - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- } diff --git a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java index ed01ce522c..d374756522 100644 --- a/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java +++ b/artemis-jms-client/src/test/java/org/apache/activemq/artemis/uri/ConnectionFactoryURITest.java @@ -263,21 +263,26 @@ public class ConnectionFactoryURITest { discoveryGroupConfiguration.setName("foo").setRefreshTimeout(12345).setDiscoveryInitialWaitTimeout(5678).setBroadcastEndpointFactory(endpoint); ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF); URI tcp = parser.createSchema("udp", connectionFactoryWithHA); - ActiveMQConnectionFactory factory = parser.newObject(tcp, null); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcp.toString()); DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration(); Assert.assertNotNull(dgc); BroadcastEndpointFactory befc = dgc.getBroadcastEndpointFactory(); Assert.assertNotNull(befc); Assert.assertTrue(befc instanceof UDPBroadcastEndpointFactory); UDPBroadcastEndpointFactory ubgc = (UDPBroadcastEndpointFactory) befc; - Assert.assertEquals(ubgc.getGroupAddress(), "wahey"); - Assert.assertEquals(ubgc.getGroupPort(), 3333); + Assert.assertEquals("wahey", ubgc.getGroupAddress()); + Assert.assertEquals(3333, ubgc.getGroupPort()); + //these 2 are transient - Assert.assertEquals(ubgc.getLocalBindAddress(), null); - Assert.assertEquals(ubgc.getLocalBindPort(), -1); - Assert.assertEquals(dgc.getName(), "foo"); - Assert.assertEquals(dgc.getDiscoveryInitialWaitTimeout(), 5678); - Assert.assertEquals(dgc.getRefreshTimeout(), 12345); + // These will take the System.properties used on the testsuite, + // for that reason we take them as != instead of checking for null + Assert.assertNotEquals("uhuh", ubgc.getLocalBindAddress()); + Assert.assertNotEquals(555, ubgc.getLocalBindPort()); + + Assert.assertEquals("foo", dgc.getName()); + Assert.assertEquals(5678, dgc.getDiscoveryInitialWaitTimeout()); + Assert.assertEquals(12345, dgc.getRefreshTimeout()); BeanUtilsBean bean = new BeanUtilsBean(); checkEquals(bean, connectionFactoryWithHA, factory); diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so index aec757ab4c..1e24db334f 100755 Binary files a/artemis-native/bin/libartemis-native-64.so and b/artemis-native/bin/libartemis-native-64.so differ diff --git a/pom.xml b/pom.xml index 07c45160a3..111dffabe1 100644 --- a/pom.xml +++ b/pom.xml @@ -94,7 +94,7 @@ -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties - -Djava.library.path=${activemq.basedir}/artemis-native/bin/ -Djgroups.bind_addr=localhost + -Djava.library.path=${activemq.basedir}/artemis-native/bin/ -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost -Djava.net.preferIPv4Stack=true ${project.basedir} diff --git a/tests/config/logging.properties.trace b/tests/config/logging.properties.trace index 02d5269485..7fe923f89d 100644 --- a/tests/config/logging.properties.trace +++ b/tests/config/logging.properties.trace @@ -22,37 +22,37 @@ loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests # Root logger level -logger.level=INFO +logger.level=TRACE # ActiveMQ Artemis logger levels logger.org.apache.activemq.artemis.core.server.level=TRACE -logger.org.apache.activemq.artemis.journal.level=INFO -logger.org.apache.activemq.artemis.utils.level=INFO -logger.org.apache.activemq.artemis.jms.level=INFO -logger.org.apache.activemq.artemis.ra.level=INFO -logger.org.apache.activemq.artemis.tests.unit.level=INFO -logger.org.apache.activemq.artemis.tests.integration.level=INFO -logger.org.apache.activemq.artemis.jms.tests.level=INFO +logger.org.apache.activemq.artemis.journal.level=TRACE +logger.org.apache.activemq.artemis.utils.level=TRACE +logger.org.apache.activemq.artemis.jms.level=TRACE +logger.org.apache.activemq.artemis.ra.level=TRACE +logger.org.apache.activemq.artemis.tests.unit.level=TRACE +logger.org.apache.activemq.artemis.tests.integration.level=TRACE +logger.org.apache.activemq.artemis.jms.tests.level=TRACE # Root logger handlers -logger.handlers=CONSOLE,TEST +logger.handlers=CONSOLE,TEST,FILE #logger.handlers=CONSOLE,FILE # Console handler configuration handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler handler.CONSOLE.properties=autoFlush -handler.CONSOLE.level=FINE +handler.CONSOLE.level=INFO handler.CONSOLE.autoFlush=true handler.CONSOLE.formatter=PATTERN # File handler configuration handler.FILE=org.jboss.logmanager.handlers.FileHandler -handler.FILE.level=FINE +handler.FILE.level=TRACE handler.FILE.properties=autoFlush,fileName handler.FILE.autoFlush=true handler.FILE.fileName=target/activemq.log handler.FILE.formatter=PATTERN -# Console handler configuration +# Test handler handler.TEST=org.apache.activemq.artemis.logs.AssertionLoggerHandler handler.TEST.level=TRACE handler.TEST.formatter=PATTERN diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java index 4faa93b9ea..32c2c3203a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java @@ -16,23 +16,20 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; -import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.junit.Assert; -import org.junit.Before; - -import org.junit.Test; - -import java.util.ArrayList; - import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.ArrayList; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java index 06bc14ef7b..2845c9d115 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java @@ -56,7 +56,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase { protected static void verifyBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception { broadcastGroup.broadcastConnectors(); - Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000)); + Assert.assertTrue("broadcast not received", discoveryGroup.waitForBroadcast(2000)); } /** @@ -147,7 +147,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase { int localPort, final InetAddress groupAddress, final int groupPort) throws Exception { - return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null).setLocalBindPort(localPort)); + return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : "localhost").setLocalBindPort(localPort)); } protected DiscoveryGroup newDiscoveryGroup(final String nodeID, @@ -166,7 +166,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase { final int groupPort, final long timeout, NotificationService notif) throws Exception { - return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null), notif); + return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : "localhost"), notif); } protected final class FakeNodeManager extends NodeManager { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryStayAliveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryStayAliveTest.java index c3c864594e..8e51dbe546 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryStayAliveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryStayAliveTest.java @@ -30,10 +30,12 @@ import org.apache.activemq.artemis.core.cluster.DiscoveryGroup; import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl; import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; +/** + * This is to make sure discovery works fine even when garbled data is sent + */ public class DiscoveryStayAliveTest extends DiscoveryBaseTest { ScheduledExecutorService scheduledExecutorService; @@ -57,7 +59,7 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest { final int groupPort = getUDPDiscoveryPort(); final int timeout = 500; - final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(), RandomUtil.randomString(), null, groupAddress, groupPort, timeout); + final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(), RandomUtil.randomString(), InetAddress.getByName("localhost"), groupAddress, groupPort, timeout); final AtomicInteger errors = new AtomicInteger(0); Thread t = new Thread() { @@ -74,30 +76,45 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest { }; t.start(); - BroadcastGroupImpl bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"), RandomUtil.randomString(), 1, scheduledExecutorService, new UDPBroadcastEndpointFactory().setGroupAddress(address1). - setGroupPort(groupPort)); + BroadcastGroupImpl bg = null; - bg.start(); + try { - bg.addConnector(generateTC()); + bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"), RandomUtil.randomString(), 1, scheduledExecutorService, new UDPBroadcastEndpointFactory().setGroupAddress(address1). + setGroupPort(groupPort)); - for (int i = 0; i < 10; i++) { - BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastEndpointFactory().setGroupAddress(address1). - setGroupPort(groupPort); - sendBadData(factoryEndpoint); + bg.start(); + bg.addConnector(generateTC()); + + for (int i = 0; i < 10; i++) { + BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastEndpointFactory().setGroupAddress(address1). + setGroupPort(groupPort).setLocalBindAddress("localhost"); + sendBadData(factoryEndpoint); + + } Thread.sleep(100); assertTrue(t.isAlive()); assertEquals(0, errors.get()); } + finally { - bg.stop(); - dg.stop(); + if (bg != null) { + bg.stop(); + } - t.join(5000); + if (dg != null) { + dg.stop(); + } - Assert.assertFalse(t.isAlive()); + t.join(1000); + // it will retry for a limited time only + for (int i = 0; t.isAlive() && i < 100; i++) { + t.interrupt(); + Thread.sleep(100); + } + } } private static void sendBadData(BroadcastEndpointFactory factoryEndpoint) throws Exception { @@ -120,5 +137,7 @@ public class DiscoveryStayAliveTest extends DiscoveryBaseTest { endpoint.openBroadcaster(); endpoint.broadcast(bytes); + + endpoint.close(true); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java index 8dc236e844..d6f3da4447 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java @@ -378,39 +378,11 @@ public class DiscoveryTest extends DiscoveryBaseTest { Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces(); - InetAddress localAddress = null; - - outer: - while (networkInterfaces.hasMoreElements()) { - NetworkInterface networkInterface = networkInterfaces.nextElement(); - if (networkInterface.isLoopback() || networkInterface.isVirtual() || - !networkInterface.isUp() || - !networkInterface.supportsMulticast()) { - continue; - } - - Enumeration en = networkInterface.getInetAddresses(); - - while (en.hasMoreElements()) { - InetAddress ia = en.nextElement(); - - if (ia.getAddress().length == 4) { - localAddress = ia; - - break outer; - } - } - } - - if (localAddress == null) { - log.warn("Can't find address to use"); - - return; - } + InetAddress localAddress = InetAddress.getLoopbackAddress(); log.info("Local address is " + localAddress); - bg = newBroadcast(nodeID, RandomUtil.randomString(), localAddress, 6552, groupAddress, groupPort); + bg = newBroadcast(nodeID, RandomUtil.randomString(), localAddress, -1, groupAddress, groupPort); bg.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java index 26ea9ab873..e0e74a4e0e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/SimpleJNDIClientTest.java @@ -280,7 +280,7 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase { Hashtable props = new Hashtable<>(); props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName()); props.put("connectionFactory.myConnectionFactory", "udp://" + getUDPDiscoveryAddress() + ":" + getUDPDiscoveryPort() + "?" + - TransportConstants.LOCAL_ADDRESS_PROP_NAME + "=127.0.0.1&" + + TransportConstants.LOCAL_ADDRESS_PROP_NAME + "=Server1&" + TransportConstants.LOCAL_PORT_PROP_NAME + "=1198&" + ActiveMQInitialContextFactory.REFRESH_TIMEOUT + "=5000&" + ActiveMQInitialContextFactory.DISCOVERY_INITIAL_WAIT_TIMEOUT + "=6000"); @@ -294,8 +294,8 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase { UDPBroadcastEndpointFactory udpBroadcastEndpointFactory = (UDPBroadcastEndpointFactory) discoveryGroupConfiguration.getBroadcastEndpointFactory(); //these 2 are transient so are ignored - Assert.assertEquals(null, udpBroadcastEndpointFactory.getLocalBindAddress()); - Assert.assertEquals(-1, udpBroadcastEndpointFactory.getLocalBindPort()); + Assert.assertNotEquals("Server1", udpBroadcastEndpointFactory.getLocalBindAddress()); + Assert.assertNotEquals(1198, udpBroadcastEndpointFactory.getLocalBindPort()); Assert.assertEquals(getUDPDiscoveryAddress(), udpBroadcastEndpointFactory.getGroupAddress()); Assert.assertEquals(getUDPDiscoveryPort(), udpBroadcastEndpointFactory.getGroupPort()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ConnectionFactorySerializationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ConnectionFactorySerializationTest.java index f86d98d96c..b800dd0280 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ConnectionFactorySerializationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ConnectionFactorySerializationTest.java @@ -82,8 +82,8 @@ public class ConnectionFactorySerializationTest extends JMSTestBase { Assert.assertEquals(dgc.getRefreshTimeout(), 5000); Assert.assertTrue(dgc.getBroadcastEndpointFactory() instanceof UDPBroadcastEndpointFactory); UDPBroadcastEndpointFactory befc = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory(); - Assert.assertEquals(-1, befc.getLocalBindPort()); - Assert.assertEquals(null, befc.getLocalBindAddress()); + Assert.assertEquals(Integer.parseInt(System.getProperty("org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindPort", "-1")), befc.getLocalBindPort()); + Assert.assertEquals(System.getProperty("org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress"), befc.getLocalBindAddress()); Assert.assertEquals(1234, befc.getGroupPort()); Assert.assertEquals("1.2.3.4", befc.getGroupAddress()); }