From 5361e58354c5a38cd82d731c89c2adec57b37f81 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 20 Nov 2014 16:07:58 -0500 Subject: [PATCH] ACTIVEMQ6-44 - Internal error during UDP parsing https://issues.apache.org/jira/browse/ACTIVEMQ6-44 The DiscoveryGroup should be resilient to failures on the communication. We shouldn't kill the Loop if an exception happened during the read of the UDP messages. --- .../core/client/ActiveMQClientLogger.java | 2 +- .../activemq/core/cluster/DiscoveryGroup.java | 45 ++- .../discovery/DiscoveryBaseTest.java | 257 ++++++++++++++++++ .../discovery/DiscoveryStayAliveTest.java | 154 +++++++++++ .../integration/discovery/DiscoveryTest.java | 223 +-------------- 5 files changed, 443 insertions(+), 238 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryBaseTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryStayAliveTest.java diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/ActiveMQClientLogger.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/ActiveMQClientLogger.java index 08e3f49e4f..ec11ebc4a3 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/ActiveMQClientLogger.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/ActiveMQClientLogger.java @@ -355,7 +355,7 @@ public interface ActiveMQClientLogger extends BasicLogger @LogMessage(level = Logger.Level.ERROR) @Message(id = 214010, value = "Failed to receive datagram", format = Message.Format.MESSAGE_FORMAT) - void failedToReceiveDatagramInDiscovery(@Cause Exception e); + void failedToReceiveDatagramInDiscovery(@Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) @Message(id = 214011, value = "Failed to call discovery listener", format = Message.Format.MESSAGE_FORMAT) diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java b/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java index 1ee0662bef..b004850212 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java @@ -35,12 +35,12 @@ import org.apache.activemq.utils.TypedProperties; /** * This class is used to search for members on the cluster through the opaque interface {@link BroadcastEndpoint}. - *

+ *

* There are two current implementations, and that's probably all we will ever need. - *

+ *

* We will probably keep both interfaces for a while as UDP is a simple solution requiring no extra dependencies which * is suitable for users looking for embedded solutions. - *

+ *

* Created 17 Nov 2008 13:21:45 * * @author Tim Fox @@ -124,6 +124,18 @@ public final class DiscoveryGroup implements ActiveMQComponent } } + /** + * This will start the DiscoveryRunnable and run it directly. + * This is useful for a test process where we need this execution blocking a thread. + */ + public void internalRunning() throws Exception + { + endpoint.openClient(); + started = true; + DiscoveryRunnable runnable = new DiscoveryRunnable(); + runnable.run(); + } + public void stop() { synchronized (this) @@ -152,11 +164,14 @@ public final class DiscoveryGroup implements ActiveMQComponent try { - thread.interrupt(); - thread.join(10000); - if (thread.isAlive()) + if (thread != null) { - ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery(); + thread.interrupt(); + thread.join(10000); + if (thread.isAlive()) + { + ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery(); + } } } catch (InterruptedException e) @@ -262,11 +277,11 @@ public final class DiscoveryGroup implements ActiveMQComponent { public void run() { - try - { - byte[] data = null; + byte[] data = null; - while (started) + while (started) + { + try { try { @@ -362,10 +377,10 @@ public final class DiscoveryGroup implements ActiveMQComponent waitLock.notifyAll(); } } - } - catch (Exception e) - { - ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e); + catch (Throwable e) + { + ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e); + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryBaseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryBaseTest.java new file mode 100644 index 0000000000..f2c027988c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryBaseTest.java @@ -0,0 +1,257 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.tests.integration.discovery; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration; +import org.apache.activemq.core.cluster.DiscoveryEntry; +import org.apache.activemq.core.cluster.DiscoveryGroup; +import org.apache.activemq.core.cluster.DiscoveryListener; +import org.apache.activemq.core.server.NodeManager; +import org.apache.activemq.core.server.cluster.BroadcastGroup; +import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl; +import org.apache.activemq.core.server.management.NotificationService; +import org.apache.activemq.tests.integration.IntegrationTestLogger; +import org.apache.activemq.tests.util.UnitTestCase; +import org.apache.activemq.utils.UUIDGenerator; +import org.junit.Assert; + +/** + * @author Clebert Suconic + */ + +public class DiscoveryBaseTest extends UnitTestCase +{ + protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + + protected final String address1 = getUDPDiscoveryAddress(); + + protected final String address2 = getUDPDiscoveryAddress(1); + + protected final String address3 = getUDPDiscoveryAddress(2); + + + /** + * @param discoveryGroup + * @throws Exception + */ + protected static void verifyBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception + { + broadcastGroup.broadcastConnectors(); + Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000)); + } + + /** + * @param discoveryGroup + * @throws Exception + */ + protected static void verifyNonBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) + throws Exception + { + broadcastGroup.broadcastConnectors(); + Assert.assertFalse("NO broadcast received", discoveryGroup.waitForBroadcast(2000)); + } + + + + protected TransportConfiguration generateTC() + { + return generateTC(""); + } + + protected TransportConfiguration generateTC(String debug) + { + String className = "org.foo.bar." + debug + "|" + UUIDGenerator.getInstance().generateStringUUID() + ""; + String name = UUIDGenerator.getInstance().generateStringUUID(); + Map params = new HashMap(); + params.put(UUIDGenerator.getInstance().generateStringUUID(), 123); + params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID()); + params.put(UUIDGenerator.getInstance().generateStringUUID(), true); + TransportConfiguration tc = new TransportConfiguration(className, params, name); + return tc; + } + + protected static class MyListener implements DiscoveryListener + { + volatile boolean called; + + public void connectorsChanged(List newConnectors) + { + called = true; + } + } + + protected static void assertEqualsDiscoveryEntries(List expected, List actual) + { + assertNotNull(actual); + + List sortedExpected = new ArrayList(expected); + Collections.sort(sortedExpected, new Comparator() + { + + public int compare(TransportConfiguration o1, TransportConfiguration o2) + { + return o2.toString().compareTo(o1.toString()); + } + }); + List sortedActual = new ArrayList(actual); + Collections.sort(sortedActual, new Comparator() + { + public int compare(DiscoveryEntry o1, DiscoveryEntry o2) + { + return o2.getConnector().toString().compareTo(o1.getConnector().toString()); + } + }); + if (sortedExpected.size() != sortedActual.size()) + { + dump(sortedExpected, sortedActual); + } + assertEquals(sortedExpected.size(), sortedActual.size()); + for (int i = 0; i < sortedExpected.size(); i++) + { + if (!sortedExpected.get(i).equals(sortedActual.get(i).getConnector())) + { + dump(sortedExpected, sortedActual); + } + assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector()); + } + } + + protected static void dump(List sortedExpected, List sortedActual) + { + System.out.println("wrong broadcasts received"); + System.out.println("expected"); + System.out.println("----------------------------"); + for (TransportConfiguration transportConfiguration : sortedExpected) + { + System.out.println("transportConfiguration = " + transportConfiguration); + } + System.out.println("----------------------------"); + System.out.println("actual"); + System.out.println("----------------------------"); + for (DiscoveryEntry discoveryEntry : sortedActual) + { + System.out.println("transportConfiguration = " + discoveryEntry.getConnector()); + } + System.out.println("----------------------------"); + } + + /** + * This method is here just to facilitate creating the Broadcaster for this test + */ + protected BroadcastGroupImpl newBroadcast(final String nodeID, + final String name, + final InetAddress localAddress, + int localPort, + final InetAddress groupAddress, + final int groupPort) throws Exception + { + return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress.getHostAddress()) + .setGroupPort(groupPort) + .setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null) + .setLocalBindPort(localPort) + .createBroadcastEndpointFactory()); + } + + protected DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress, + final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception + { + return newDiscoveryGroup(nodeID, name, localBindAddress, groupAddress, groupPort, timeout, null); + } + + protected DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress, + final InetAddress groupAddress, final int groupPort, final long timeout, NotificationService notif) throws Exception + { + return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress.getHostAddress()) + .setGroupPort(groupPort) + .setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null) + .createBroadcastEndpointFactory(), notif); + } + + protected final class FakeNodeManager extends NodeManager + { + + public FakeNodeManager(String nodeID) + { + super(false, null); + this.setNodeID(nodeID); + } + + @Override + public void awaitLiveNode() throws Exception + { + } + + @Override + public void startBackup() throws Exception + { + } + + @Override + public void startLiveNode() throws Exception + { + } + + @Override + public void pauseLiveServer() throws Exception + { + } + + @Override + public void crashLiveServer() throws Exception + { + } + + @Override + public void releaseBackup() throws Exception + { + } + + @Override + public SimpleString readNodeId() + { + return null; + } + + @Override + public boolean isAwaitingFailback() throws Exception + { + return false; + } + + @Override + public boolean isBackupLive() throws Exception + { + return false; + } + + @Override + public void interrupt() + { + } + } + + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryStayAliveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryStayAliveTest.java new file mode 100644 index 0000000000..f16681cf02 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryStayAliveTest.java @@ -0,0 +1,154 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.tests.integration.discovery; + +import java.net.InetAddress; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.ActiveMQBuffers; +import org.apache.activemq.api.core.BroadcastEndpoint; +import org.apache.activemq.api.core.BroadcastEndpointFactory; +import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration; +import org.apache.activemq.core.cluster.DiscoveryGroup; +import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl; +import org.apache.activemq.tests.util.RandomUtil; +import org.apache.activemq.utils.ActiveMQThreadFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Clebert Suconic + */ + +public class DiscoveryStayAliveTest extends DiscoveryBaseTest +{ + + + ScheduledExecutorService scheduledExecutorService; + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", + false, + Thread.currentThread().getContextClassLoader())); + + } + + public void tearDown() throws Exception + { + scheduledExecutorService.shutdown(); + super.tearDown(); + } + + @Test + public void testDiscoveryRunning() throws Throwable + { + final InetAddress groupAddress = InetAddress.getByName(address1); + final int groupPort = getUDPDiscoveryPort(); + final int timeout = 500; + + + final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(), + RandomUtil.randomString(), + null, + groupAddress, + groupPort, + timeout); + + final AtomicInteger errors = new AtomicInteger(0); + Thread t = new Thread() + { + public void run() + { + try + { + dg.internalRunning(); + } + catch (Throwable e) + { + e.printStackTrace(); + errors.incrementAndGet(); + } + + } + }; + t.start(); + + + BroadcastGroupImpl bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"), + RandomUtil.randomString(), + 1, scheduledExecutorService, new UDPBroadcastGroupConfiguration().setGroupAddress(address1). + setGroupPort(groupPort).createBroadcastEndpointFactory()); + + bg.start(); + + bg.addConnector(generateTC()); + + + for (int i = 0; i < 10; i++) + { + BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastGroupConfiguration().setGroupAddress(address1). + setGroupPort(groupPort).createBroadcastEndpointFactory(); + sendBadData(factoryEndpoint); + + Thread.sleep(100); + assertTrue(t.isAlive()); + assertEquals(0, errors.get()); + } + + bg.stop(); + dg.stop(); + + t.join(5000); + + Assert.assertFalse(t.isAlive()); + + } + + + private static void sendBadData(BroadcastEndpointFactory factoryEndpoint) throws Exception + { + BroadcastEndpoint endpoint = factoryEndpoint.createBroadcastEndpoint(); + + + ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(500); + + buffer.writeString("This is a test1!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + buffer.writeString("This is a test2!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + + + byte[] bytes = new byte[buffer.writerIndex()]; + + buffer.readBytes(bytes); + + // messing up with the string!!! + for (int i = bytes.length - 10; i < bytes.length; i++) + { + bytes[i] = 0; + } + + + endpoint.openBroadcaster(); + + endpoint.broadcast(bytes); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryTest.java index 2407c6c480..7ba6d1850a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/discovery/DiscoveryTest.java @@ -15,22 +15,15 @@ package org.apache.activemq.tests.integration.discovery; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetAddress; import java.net.NetworkInterface; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.Enumeration; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.activemq.api.core.ActiveMQIllegalStateException; import org.apache.activemq.api.core.BroadcastEndpoint; import org.apache.activemq.api.core.BroadcastEndpointFactory; import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration; @@ -40,17 +33,12 @@ import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration; import org.apache.activemq.api.core.management.CoreNotificationType; import org.apache.activemq.core.cluster.DiscoveryEntry; import org.apache.activemq.core.cluster.DiscoveryGroup; -import org.apache.activemq.core.cluster.DiscoveryListener; import org.apache.activemq.core.server.ActiveMQComponent; -import org.apache.activemq.core.server.NodeManager; import org.apache.activemq.core.server.cluster.BroadcastGroup; import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl; import org.apache.activemq.core.server.management.Notification; -import org.apache.activemq.core.server.management.NotificationService; -import org.apache.activemq.tests.integration.IntegrationTestLogger; import org.apache.activemq.tests.integration.SimpleNotificationService; import org.apache.activemq.tests.util.RandomUtil; -import org.apache.activemq.tests.util.UnitTestCase; import org.apache.activemq.utils.UUIDGenerator; import org.junit.After; import org.junit.Assert; @@ -77,18 +65,10 @@ import org.junit.Test; * * @author Tim Fox */ -public class DiscoveryTest extends UnitTestCase +public class DiscoveryTest extends DiscoveryBaseTest { private static final String TEST_JGROUPS_CONF_FILE = "test-jgroups-file_ping.xml"; - private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; - - private final String address1 = getUDPDiscoveryAddress(); - - private final String address2 = getUDPDiscoveryAddress(1); - - private final String address3 = getUDPDiscoveryAddress(2); - BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null; DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null; @@ -879,26 +859,6 @@ public class DiscoveryTest extends UnitTestCase Assert.assertFalse(listener3.called); } - /** - * @param discoveryGroup - * @throws Exception - */ - private static void verifyBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception - { - broadcastGroup.broadcastConnectors(); - Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000)); - } - - /** - * @param discoveryGroup - * @throws Exception - */ - private static void verifyNonBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception - { - broadcastGroup.broadcastConnectors(); - Assert.assertFalse("NO broadcast received", discoveryGroup.waitForBroadcast(2000)); - } - @Test public void testConnectorsUpdatedMultipleBroadcasters() throws Exception { @@ -1238,185 +1198,4 @@ public class DiscoveryTest extends UnitTestCase assertNotNull(object); assertTrue(object instanceof JGroupsBroadcastGroupConfiguration); } - - private TransportConfiguration generateTC(String debug) - { - String className = "org.foo.bar." + debug + "|" + UUIDGenerator.getInstance().generateStringUUID() + ""; - String name = UUIDGenerator.getInstance().generateStringUUID(); - Map params = new HashMap(); - params.put(UUIDGenerator.getInstance().generateStringUUID(), 123); - params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID()); - params.put(UUIDGenerator.getInstance().generateStringUUID(), true); - TransportConfiguration tc = new TransportConfiguration(className, params, name); - return tc; - } - - private TransportConfiguration generateTC() - { - return generateTC(""); - } - - private static class MyListener implements DiscoveryListener - { - volatile boolean called; - - public void connectorsChanged(List newConnectors) - { - called = true; - } - } - - private static void assertEqualsDiscoveryEntries(List expected, List actual) - { - assertNotNull(actual); - - List sortedExpected = new ArrayList(expected); - Collections.sort(sortedExpected, new Comparator() - { - - public int compare(TransportConfiguration o1, TransportConfiguration o2) - { - return o2.toString().compareTo(o1.toString()); - } - }); - List sortedActual = new ArrayList(actual); - Collections.sort(sortedActual, new Comparator() - { - public int compare(DiscoveryEntry o1, DiscoveryEntry o2) - { - return o2.getConnector().toString().compareTo(o1.getConnector().toString()); - } - }); - if (sortedExpected.size() != sortedActual.size()) - { - dump(sortedExpected, sortedActual); - } - assertEquals(sortedExpected.size(), sortedActual.size()); - for (int i = 0; i < sortedExpected.size(); i++) - { - if (!sortedExpected.get(i).equals(sortedActual.get(i).getConnector())) - { - dump(sortedExpected, sortedActual); - } - assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector()); - } - } - - private static void dump(List sortedExpected, List sortedActual) - { - System.out.println("wrong broadcasts received"); - System.out.println("expected"); - System.out.println("----------------------------"); - for (TransportConfiguration transportConfiguration : sortedExpected) - { - System.out.println("transportConfiguration = " + transportConfiguration); - } - System.out.println("----------------------------"); - System.out.println("actual"); - System.out.println("----------------------------"); - for (DiscoveryEntry discoveryEntry : sortedActual) - { - System.out.println("transportConfiguration = " + discoveryEntry.getConnector()); - } - System.out.println("----------------------------"); - } - - /** - * This method is here just to facilitate creating the Broadcaster for this test - */ - private BroadcastGroupImpl newBroadcast(final String nodeID, - final String name, - final InetAddress localAddress, - int localPort, - final InetAddress groupAddress, - final int groupPort) throws Exception - { - return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastGroupConfiguration() - .setGroupAddress(groupAddress.getHostAddress()) - .setGroupPort(groupPort) - .setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null) - .setLocalBindPort(localPort) - .createBroadcastEndpointFactory()); - } - - private DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress, - final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception - { - return newDiscoveryGroup(nodeID, name, localBindAddress, groupAddress, groupPort, timeout, null); - } - - private DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress, - final InetAddress groupAddress, final int groupPort, final long timeout, NotificationService notif) throws Exception - { - return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastGroupConfiguration() - .setGroupAddress(groupAddress.getHostAddress()) - .setGroupPort(groupPort) - .setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null) - .createBroadcastEndpointFactory(), notif); - } - - - private final class FakeNodeManager extends NodeManager - { - - public FakeNodeManager(String nodeID) - { - super(false, null); - this.setNodeID(nodeID); - } - - @Override - public void awaitLiveNode() throws Exception - { - } - - @Override - public void startBackup() throws Exception - { - } - - @Override - public void startLiveNode() throws Exception - { - } - - @Override - public void pauseLiveServer() throws Exception - { - } - - @Override - public void crashLiveServer() throws Exception - { - } - - @Override - public void releaseBackup() throws Exception - { - } - - @Override - public SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException - { - return null; - } - - @Override - public boolean isAwaitingFailback() throws Exception - { - return false; - } - - @Override - public boolean isBackupLive() throws Exception - { - return false; - } - - @Override - public void interrupt() - { - } - } - }