ACTIVEMQ6-44 - Internal error during UDP parsing
https://issues.apache.org/jira/browse/ACTIVEMQ6-44 The DiscoveryGroup should be resilient to failures on the communication. We shouldn't kill the Loop if an exception happened during the read of the UDP messages.
This commit is contained in:
parent
49a33ae32e
commit
5361e58354
|
@ -355,7 +355,7 @@ public interface ActiveMQClientLogger extends BasicLogger
|
|||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 214010, value = "Failed to receive datagram", format = Message.Format.MESSAGE_FORMAT)
|
||||
void failedToReceiveDatagramInDiscovery(@Cause Exception e);
|
||||
void failedToReceiveDatagramInDiscovery(@Cause Throwable e);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 214011, value = "Failed to call discovery listener", format = Message.Format.MESSAGE_FORMAT)
|
||||
|
|
|
@ -35,12 +35,12 @@ import org.apache.activemq.utils.TypedProperties;
|
|||
|
||||
/**
|
||||
* This class is used to search for members on the cluster through the opaque interface {@link BroadcastEndpoint}.
|
||||
* <p>
|
||||
* <p/>
|
||||
* There are two current implementations, and that's probably all we will ever need.
|
||||
* <p>
|
||||
* <p/>
|
||||
* We will probably keep both interfaces for a while as UDP is a simple solution requiring no extra dependencies which
|
||||
* is suitable for users looking for embedded solutions.
|
||||
* <p>
|
||||
* <p/>
|
||||
* Created 17 Nov 2008 13:21:45
|
||||
*
|
||||
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
|
||||
|
@ -124,6 +124,18 @@ public final class DiscoveryGroup implements ActiveMQComponent
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This will start the DiscoveryRunnable and run it directly.
|
||||
* This is useful for a test process where we need this execution blocking a thread.
|
||||
*/
|
||||
public void internalRunning() throws Exception
|
||||
{
|
||||
endpoint.openClient();
|
||||
started = true;
|
||||
DiscoveryRunnable runnable = new DiscoveryRunnable();
|
||||
runnable.run();
|
||||
}
|
||||
|
||||
public void stop()
|
||||
{
|
||||
synchronized (this)
|
||||
|
@ -152,11 +164,14 @@ public final class DiscoveryGroup implements ActiveMQComponent
|
|||
|
||||
try
|
||||
{
|
||||
thread.interrupt();
|
||||
thread.join(10000);
|
||||
if (thread.isAlive())
|
||||
if (thread != null)
|
||||
{
|
||||
ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();
|
||||
thread.interrupt();
|
||||
thread.join(10000);
|
||||
if (thread.isAlive())
|
||||
{
|
||||
ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
|
@ -262,11 +277,11 @@ public final class DiscoveryGroup implements ActiveMQComponent
|
|||
{
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
byte[] data = null;
|
||||
byte[] data = null;
|
||||
|
||||
while (started)
|
||||
while (started)
|
||||
{
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -362,10 +377,10 @@ public final class DiscoveryGroup implements ActiveMQComponent
|
|||
waitLock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);
|
||||
catch (Throwable e)
|
||||
{
|
||||
ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,257 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.tests.integration.discovery;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.api.core.SimpleString;
|
||||
import org.apache.activemq.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
|
||||
import org.apache.activemq.core.cluster.DiscoveryEntry;
|
||||
import org.apache.activemq.core.cluster.DiscoveryGroup;
|
||||
import org.apache.activemq.core.cluster.DiscoveryListener;
|
||||
import org.apache.activemq.core.server.NodeManager;
|
||||
import org.apache.activemq.core.server.cluster.BroadcastGroup;
|
||||
import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl;
|
||||
import org.apache.activemq.core.server.management.NotificationService;
|
||||
import org.apache.activemq.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.tests.util.UnitTestCase;
|
||||
import org.apache.activemq.utils.UUIDGenerator;
|
||||
import org.junit.Assert;
|
||||
|
||||
/**
|
||||
* @author Clebert Suconic
|
||||
*/
|
||||
|
||||
public class DiscoveryBaseTest extends UnitTestCase
|
||||
{
|
||||
protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
protected final String address1 = getUDPDiscoveryAddress();
|
||||
|
||||
protected final String address2 = getUDPDiscoveryAddress(1);
|
||||
|
||||
protected final String address3 = getUDPDiscoveryAddress(2);
|
||||
|
||||
|
||||
/**
|
||||
* @param discoveryGroup
|
||||
* @throws Exception
|
||||
*/
|
||||
protected static void verifyBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception
|
||||
{
|
||||
broadcastGroup.broadcastConnectors();
|
||||
Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param discoveryGroup
|
||||
* @throws Exception
|
||||
*/
|
||||
protected static void verifyNonBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup)
|
||||
throws Exception
|
||||
{
|
||||
broadcastGroup.broadcastConnectors();
|
||||
Assert.assertFalse("NO broadcast received", discoveryGroup.waitForBroadcast(2000));
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected TransportConfiguration generateTC()
|
||||
{
|
||||
return generateTC("");
|
||||
}
|
||||
|
||||
protected TransportConfiguration generateTC(String debug)
|
||||
{
|
||||
String className = "org.foo.bar." + debug + "|" + UUIDGenerator.getInstance().generateStringUUID() + "";
|
||||
String name = UUIDGenerator.getInstance().generateStringUUID();
|
||||
Map<String, Object> params = new HashMap<String, Object>();
|
||||
params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
|
||||
params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
|
||||
params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
|
||||
TransportConfiguration tc = new TransportConfiguration(className, params, name);
|
||||
return tc;
|
||||
}
|
||||
|
||||
protected static class MyListener implements DiscoveryListener
|
||||
{
|
||||
volatile boolean called;
|
||||
|
||||
public void connectorsChanged(List<DiscoveryEntry> newConnectors)
|
||||
{
|
||||
called = true;
|
||||
}
|
||||
}
|
||||
|
||||
protected static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
|
||||
{
|
||||
assertNotNull(actual);
|
||||
|
||||
List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
|
||||
Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
|
||||
{
|
||||
|
||||
public int compare(TransportConfiguration o1, TransportConfiguration o2)
|
||||
{
|
||||
return o2.toString().compareTo(o1.toString());
|
||||
}
|
||||
});
|
||||
List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
|
||||
Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
|
||||
{
|
||||
public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
|
||||
{
|
||||
return o2.getConnector().toString().compareTo(o1.getConnector().toString());
|
||||
}
|
||||
});
|
||||
if (sortedExpected.size() != sortedActual.size())
|
||||
{
|
||||
dump(sortedExpected, sortedActual);
|
||||
}
|
||||
assertEquals(sortedExpected.size(), sortedActual.size());
|
||||
for (int i = 0; i < sortedExpected.size(); i++)
|
||||
{
|
||||
if (!sortedExpected.get(i).equals(sortedActual.get(i).getConnector()))
|
||||
{
|
||||
dump(sortedExpected, sortedActual);
|
||||
}
|
||||
assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
|
||||
}
|
||||
}
|
||||
|
||||
protected static void dump(List<TransportConfiguration> sortedExpected, List<DiscoveryEntry> sortedActual)
|
||||
{
|
||||
System.out.println("wrong broadcasts received");
|
||||
System.out.println("expected");
|
||||
System.out.println("----------------------------");
|
||||
for (TransportConfiguration transportConfiguration : sortedExpected)
|
||||
{
|
||||
System.out.println("transportConfiguration = " + transportConfiguration);
|
||||
}
|
||||
System.out.println("----------------------------");
|
||||
System.out.println("actual");
|
||||
System.out.println("----------------------------");
|
||||
for (DiscoveryEntry discoveryEntry : sortedActual)
|
||||
{
|
||||
System.out.println("transportConfiguration = " + discoveryEntry.getConnector());
|
||||
}
|
||||
System.out.println("----------------------------");
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is here just to facilitate creating the Broadcaster for this test
|
||||
*/
|
||||
protected BroadcastGroupImpl newBroadcast(final String nodeID,
|
||||
final String name,
|
||||
final InetAddress localAddress,
|
||||
int localPort,
|
||||
final InetAddress groupAddress,
|
||||
final int groupPort) throws Exception
|
||||
{
|
||||
return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastGroupConfiguration()
|
||||
.setGroupAddress(groupAddress.getHostAddress())
|
||||
.setGroupPort(groupPort)
|
||||
.setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null)
|
||||
.setLocalBindPort(localPort)
|
||||
.createBroadcastEndpointFactory());
|
||||
}
|
||||
|
||||
protected DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress,
|
||||
final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
|
||||
{
|
||||
return newDiscoveryGroup(nodeID, name, localBindAddress, groupAddress, groupPort, timeout, null);
|
||||
}
|
||||
|
||||
protected DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress,
|
||||
final InetAddress groupAddress, final int groupPort, final long timeout, NotificationService notif) throws Exception
|
||||
{
|
||||
return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastGroupConfiguration()
|
||||
.setGroupAddress(groupAddress.getHostAddress())
|
||||
.setGroupPort(groupPort)
|
||||
.setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null)
|
||||
.createBroadcastEndpointFactory(), notif);
|
||||
}
|
||||
|
||||
protected final class FakeNodeManager extends NodeManager
|
||||
{
|
||||
|
||||
public FakeNodeManager(String nodeID)
|
||||
{
|
||||
super(false, null);
|
||||
this.setNodeID(nodeID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitLiveNode() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBackup() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startLiveNode() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pauseLiveServer() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void crashLiveServer() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseBackup() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString readNodeId()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAwaitingFailback() throws Exception
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBackupLive() throws Exception
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void interrupt()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
/*
|
||||
* Copyright 2005-2014 Red Hat, Inc.
|
||||
* Red Hat licenses this file to you under the Apache License, version
|
||||
* 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
* implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.tests.integration.discovery;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.api.core.BroadcastEndpoint;
|
||||
import org.apache.activemq.api.core.BroadcastEndpointFactory;
|
||||
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
|
||||
import org.apache.activemq.core.cluster.DiscoveryGroup;
|
||||
import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl;
|
||||
import org.apache.activemq.tests.util.RandomUtil;
|
||||
import org.apache.activemq.utils.ActiveMQThreadFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* @author Clebert Suconic
|
||||
*/
|
||||
|
||||
public class DiscoveryStayAliveTest extends DiscoveryBaseTest
|
||||
{
|
||||
|
||||
|
||||
ScheduledExecutorService scheduledExecutorService;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
super.setUp();
|
||||
scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
||||
new ActiveMQThreadFactory("ActiveMQ-scheduled-threads",
|
||||
false,
|
||||
Thread.currentThread().getContextClassLoader()));
|
||||
|
||||
}
|
||||
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
scheduledExecutorService.shutdown();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiscoveryRunning() throws Throwable
|
||||
{
|
||||
final InetAddress groupAddress = InetAddress.getByName(address1);
|
||||
final int groupPort = getUDPDiscoveryPort();
|
||||
final int timeout = 500;
|
||||
|
||||
|
||||
final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(),
|
||||
RandomUtil.randomString(),
|
||||
null,
|
||||
groupAddress,
|
||||
groupPort,
|
||||
timeout);
|
||||
|
||||
final AtomicInteger errors = new AtomicInteger(0);
|
||||
Thread t = new Thread()
|
||||
{
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
dg.internalRunning();
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
errors.incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
|
||||
|
||||
BroadcastGroupImpl bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"),
|
||||
RandomUtil.randomString(),
|
||||
1, scheduledExecutorService, new UDPBroadcastGroupConfiguration().setGroupAddress(address1).
|
||||
setGroupPort(groupPort).createBroadcastEndpointFactory());
|
||||
|
||||
bg.start();
|
||||
|
||||
bg.addConnector(generateTC());
|
||||
|
||||
|
||||
for (int i = 0; i < 10; i++)
|
||||
{
|
||||
BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastGroupConfiguration().setGroupAddress(address1).
|
||||
setGroupPort(groupPort).createBroadcastEndpointFactory();
|
||||
sendBadData(factoryEndpoint);
|
||||
|
||||
Thread.sleep(100);
|
||||
assertTrue(t.isAlive());
|
||||
assertEquals(0, errors.get());
|
||||
}
|
||||
|
||||
bg.stop();
|
||||
dg.stop();
|
||||
|
||||
t.join(5000);
|
||||
|
||||
Assert.assertFalse(t.isAlive());
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static void sendBadData(BroadcastEndpointFactory factoryEndpoint) throws Exception
|
||||
{
|
||||
BroadcastEndpoint endpoint = factoryEndpoint.createBroadcastEndpoint();
|
||||
|
||||
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(500);
|
||||
|
||||
buffer.writeString("This is a test1!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
|
||||
buffer.writeString("This is a test2!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
|
||||
|
||||
|
||||
byte[] bytes = new byte[buffer.writerIndex()];
|
||||
|
||||
buffer.readBytes(bytes);
|
||||
|
||||
// messing up with the string!!!
|
||||
for (int i = bytes.length - 10; i < bytes.length; i++)
|
||||
{
|
||||
bytes[i] = 0;
|
||||
}
|
||||
|
||||
|
||||
endpoint.openBroadcaster();
|
||||
|
||||
endpoint.broadcast(bytes);
|
||||
}
|
||||
}
|
|
@ -15,22 +15,15 @@ package org.apache.activemq.tests.integration.discovery;
|
|||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.api.core.ActiveMQIllegalStateException;
|
||||
import org.apache.activemq.api.core.BroadcastEndpoint;
|
||||
import org.apache.activemq.api.core.BroadcastEndpointFactory;
|
||||
import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
|
||||
|
@ -40,17 +33,12 @@ import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
|
|||
import org.apache.activemq.api.core.management.CoreNotificationType;
|
||||
import org.apache.activemq.core.cluster.DiscoveryEntry;
|
||||
import org.apache.activemq.core.cluster.DiscoveryGroup;
|
||||
import org.apache.activemq.core.cluster.DiscoveryListener;
|
||||
import org.apache.activemq.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.core.server.NodeManager;
|
||||
import org.apache.activemq.core.server.cluster.BroadcastGroup;
|
||||
import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl;
|
||||
import org.apache.activemq.core.server.management.Notification;
|
||||
import org.apache.activemq.core.server.management.NotificationService;
|
||||
import org.apache.activemq.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.tests.integration.SimpleNotificationService;
|
||||
import org.apache.activemq.tests.util.RandomUtil;
|
||||
import org.apache.activemq.tests.util.UnitTestCase;
|
||||
import org.apache.activemq.utils.UUIDGenerator;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -77,18 +65,10 @@ import org.junit.Test;
|
|||
*
|
||||
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
|
||||
*/
|
||||
public class DiscoveryTest extends UnitTestCase
|
||||
public class DiscoveryTest extends DiscoveryBaseTest
|
||||
{
|
||||
private static final String TEST_JGROUPS_CONF_FILE = "test-jgroups-file_ping.xml";
|
||||
|
||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
private final String address1 = getUDPDiscoveryAddress();
|
||||
|
||||
private final String address2 = getUDPDiscoveryAddress(1);
|
||||
|
||||
private final String address3 = getUDPDiscoveryAddress(2);
|
||||
|
||||
BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null;
|
||||
DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null;
|
||||
|
||||
|
@ -879,26 +859,6 @@ public class DiscoveryTest extends UnitTestCase
|
|||
Assert.assertFalse(listener3.called);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param discoveryGroup
|
||||
* @throws Exception
|
||||
*/
|
||||
private static void verifyBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception
|
||||
{
|
||||
broadcastGroup.broadcastConnectors();
|
||||
Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param discoveryGroup
|
||||
* @throws Exception
|
||||
*/
|
||||
private static void verifyNonBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception
|
||||
{
|
||||
broadcastGroup.broadcastConnectors();
|
||||
Assert.assertFalse("NO broadcast received", discoveryGroup.waitForBroadcast(2000));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
|
||||
{
|
||||
|
@ -1238,185 +1198,4 @@ public class DiscoveryTest extends UnitTestCase
|
|||
assertNotNull(object);
|
||||
assertTrue(object instanceof JGroupsBroadcastGroupConfiguration);
|
||||
}
|
||||
|
||||
private TransportConfiguration generateTC(String debug)
|
||||
{
|
||||
String className = "org.foo.bar." + debug + "|" + UUIDGenerator.getInstance().generateStringUUID() + "";
|
||||
String name = UUIDGenerator.getInstance().generateStringUUID();
|
||||
Map<String, Object> params = new HashMap<String, Object>();
|
||||
params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
|
||||
params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
|
||||
params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
|
||||
TransportConfiguration tc = new TransportConfiguration(className, params, name);
|
||||
return tc;
|
||||
}
|
||||
|
||||
private TransportConfiguration generateTC()
|
||||
{
|
||||
return generateTC("");
|
||||
}
|
||||
|
||||
private static class MyListener implements DiscoveryListener
|
||||
{
|
||||
volatile boolean called;
|
||||
|
||||
public void connectorsChanged(List<DiscoveryEntry> newConnectors)
|
||||
{
|
||||
called = true;
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
|
||||
{
|
||||
assertNotNull(actual);
|
||||
|
||||
List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
|
||||
Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
|
||||
{
|
||||
|
||||
public int compare(TransportConfiguration o1, TransportConfiguration o2)
|
||||
{
|
||||
return o2.toString().compareTo(o1.toString());
|
||||
}
|
||||
});
|
||||
List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
|
||||
Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
|
||||
{
|
||||
public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
|
||||
{
|
||||
return o2.getConnector().toString().compareTo(o1.getConnector().toString());
|
||||
}
|
||||
});
|
||||
if (sortedExpected.size() != sortedActual.size())
|
||||
{
|
||||
dump(sortedExpected, sortedActual);
|
||||
}
|
||||
assertEquals(sortedExpected.size(), sortedActual.size());
|
||||
for (int i = 0; i < sortedExpected.size(); i++)
|
||||
{
|
||||
if (!sortedExpected.get(i).equals(sortedActual.get(i).getConnector()))
|
||||
{
|
||||
dump(sortedExpected, sortedActual);
|
||||
}
|
||||
assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
|
||||
}
|
||||
}
|
||||
|
||||
private static void dump(List<TransportConfiguration> sortedExpected, List<DiscoveryEntry> sortedActual)
|
||||
{
|
||||
System.out.println("wrong broadcasts received");
|
||||
System.out.println("expected");
|
||||
System.out.println("----------------------------");
|
||||
for (TransportConfiguration transportConfiguration : sortedExpected)
|
||||
{
|
||||
System.out.println("transportConfiguration = " + transportConfiguration);
|
||||
}
|
||||
System.out.println("----------------------------");
|
||||
System.out.println("actual");
|
||||
System.out.println("----------------------------");
|
||||
for (DiscoveryEntry discoveryEntry : sortedActual)
|
||||
{
|
||||
System.out.println("transportConfiguration = " + discoveryEntry.getConnector());
|
||||
}
|
||||
System.out.println("----------------------------");
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is here just to facilitate creating the Broadcaster for this test
|
||||
*/
|
||||
private BroadcastGroupImpl newBroadcast(final String nodeID,
|
||||
final String name,
|
||||
final InetAddress localAddress,
|
||||
int localPort,
|
||||
final InetAddress groupAddress,
|
||||
final int groupPort) throws Exception
|
||||
{
|
||||
return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastGroupConfiguration()
|
||||
.setGroupAddress(groupAddress.getHostAddress())
|
||||
.setGroupPort(groupPort)
|
||||
.setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null)
|
||||
.setLocalBindPort(localPort)
|
||||
.createBroadcastEndpointFactory());
|
||||
}
|
||||
|
||||
private DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress,
|
||||
final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
|
||||
{
|
||||
return newDiscoveryGroup(nodeID, name, localBindAddress, groupAddress, groupPort, timeout, null);
|
||||
}
|
||||
|
||||
private DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress,
|
||||
final InetAddress groupAddress, final int groupPort, final long timeout, NotificationService notif) throws Exception
|
||||
{
|
||||
return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastGroupConfiguration()
|
||||
.setGroupAddress(groupAddress.getHostAddress())
|
||||
.setGroupPort(groupPort)
|
||||
.setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null)
|
||||
.createBroadcastEndpointFactory(), notif);
|
||||
}
|
||||
|
||||
|
||||
private final class FakeNodeManager extends NodeManager
|
||||
{
|
||||
|
||||
public FakeNodeManager(String nodeID)
|
||||
{
|
||||
super(false, null);
|
||||
this.setNodeID(nodeID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitLiveNode() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBackup() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startLiveNode() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pauseLiveServer() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void crashLiveServer() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseBackup() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAwaitingFailback() throws Exception
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBackupLive() throws Exception
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void interrupt()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue