This commit is contained in:
jbertram 2014-11-21 14:43:22 -06:00
commit d59417340f
5 changed files with 443 additions and 238 deletions

View File

@ -355,7 +355,7 @@ public interface ActiveMQClientLogger extends BasicLogger
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214010, value = "Failed to receive datagram", format = Message.Format.MESSAGE_FORMAT)
void failedToReceiveDatagramInDiscovery(@Cause Exception e);
void failedToReceiveDatagramInDiscovery(@Cause Throwable e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214011, value = "Failed to call discovery listener", format = Message.Format.MESSAGE_FORMAT)

View File

@ -35,12 +35,12 @@ import org.apache.activemq.utils.TypedProperties;
/**
* This class is used to search for members on the cluster through the opaque interface {@link BroadcastEndpoint}.
* <p>
* <p/>
* There are two current implementations, and that's probably all we will ever need.
* <p>
* <p/>
* We will probably keep both interfaces for a while as UDP is a simple solution requiring no extra dependencies which
* is suitable for users looking for embedded solutions.
* <p>
* <p/>
* Created 17 Nov 2008 13:21:45
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@ -124,6 +124,18 @@ public final class DiscoveryGroup implements ActiveMQComponent
}
}
/**
* This will start the DiscoveryRunnable and run it directly.
* This is useful for a test process where we need this execution blocking a thread.
*/
public void internalRunning() throws Exception
{
endpoint.openClient();
started = true;
DiscoveryRunnable runnable = new DiscoveryRunnable();
runnable.run();
}
public void stop()
{
synchronized (this)
@ -152,11 +164,14 @@ public final class DiscoveryGroup implements ActiveMQComponent
try
{
thread.interrupt();
thread.join(10000);
if (thread.isAlive())
if (thread != null)
{
ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();
thread.interrupt();
thread.join(10000);
if (thread.isAlive())
{
ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();
}
}
}
catch (InterruptedException e)
@ -262,11 +277,11 @@ public final class DiscoveryGroup implements ActiveMQComponent
{
public void run()
{
try
{
byte[] data = null;
byte[] data = null;
while (started)
while (started)
{
try
{
try
{
@ -362,10 +377,10 @@ public final class DiscoveryGroup implements ActiveMQComponent
waitLock.notifyAll();
}
}
}
catch (Exception e)
{
ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);
catch (Throwable e)
{
ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);
}
}
}

View File

@ -0,0 +1,257 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.activemq.tests.integration.discovery;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.core.cluster.DiscoveryEntry;
import org.apache.activemq.core.cluster.DiscoveryGroup;
import org.apache.activemq.core.cluster.DiscoveryListener;
import org.apache.activemq.core.server.NodeManager;
import org.apache.activemq.core.server.cluster.BroadcastGroup;
import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl;
import org.apache.activemq.core.server.management.NotificationService;
import org.apache.activemq.tests.integration.IntegrationTestLogger;
import org.apache.activemq.tests.util.UnitTestCase;
import org.apache.activemq.utils.UUIDGenerator;
import org.junit.Assert;
/**
* @author Clebert Suconic
*/
public class DiscoveryBaseTest extends UnitTestCase
{
protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
protected final String address1 = getUDPDiscoveryAddress();
protected final String address2 = getUDPDiscoveryAddress(1);
protected final String address3 = getUDPDiscoveryAddress(2);
/**
* @param discoveryGroup
* @throws Exception
*/
protected static void verifyBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception
{
broadcastGroup.broadcastConnectors();
Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000));
}
/**
* @param discoveryGroup
* @throws Exception
*/
protected static void verifyNonBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup)
throws Exception
{
broadcastGroup.broadcastConnectors();
Assert.assertFalse("NO broadcast received", discoveryGroup.waitForBroadcast(2000));
}
protected TransportConfiguration generateTC()
{
return generateTC("");
}
protected TransportConfiguration generateTC(String debug)
{
String className = "org.foo.bar." + debug + "|" + UUIDGenerator.getInstance().generateStringUUID() + "";
String name = UUIDGenerator.getInstance().generateStringUUID();
Map<String, Object> params = new HashMap<String, Object>();
params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
TransportConfiguration tc = new TransportConfiguration(className, params, name);
return tc;
}
protected static class MyListener implements DiscoveryListener
{
volatile boolean called;
public void connectorsChanged(List<DiscoveryEntry> newConnectors)
{
called = true;
}
}
protected static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
{
assertNotNull(actual);
List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
{
public int compare(TransportConfiguration o1, TransportConfiguration o2)
{
return o2.toString().compareTo(o1.toString());
}
});
List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
{
public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
{
return o2.getConnector().toString().compareTo(o1.getConnector().toString());
}
});
if (sortedExpected.size() != sortedActual.size())
{
dump(sortedExpected, sortedActual);
}
assertEquals(sortedExpected.size(), sortedActual.size());
for (int i = 0; i < sortedExpected.size(); i++)
{
if (!sortedExpected.get(i).equals(sortedActual.get(i).getConnector()))
{
dump(sortedExpected, sortedActual);
}
assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
}
}
protected static void dump(List<TransportConfiguration> sortedExpected, List<DiscoveryEntry> sortedActual)
{
System.out.println("wrong broadcasts received");
System.out.println("expected");
System.out.println("----------------------------");
for (TransportConfiguration transportConfiguration : sortedExpected)
{
System.out.println("transportConfiguration = " + transportConfiguration);
}
System.out.println("----------------------------");
System.out.println("actual");
System.out.println("----------------------------");
for (DiscoveryEntry discoveryEntry : sortedActual)
{
System.out.println("transportConfiguration = " + discoveryEntry.getConnector());
}
System.out.println("----------------------------");
}
/**
* This method is here just to facilitate creating the Broadcaster for this test
*/
protected BroadcastGroupImpl newBroadcast(final String nodeID,
final String name,
final InetAddress localAddress,
int localPort,
final InetAddress groupAddress,
final int groupPort) throws Exception
{
return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastGroupConfiguration()
.setGroupAddress(groupAddress.getHostAddress())
.setGroupPort(groupPort)
.setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null)
.setLocalBindPort(localPort)
.createBroadcastEndpointFactory());
}
protected DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress,
final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
{
return newDiscoveryGroup(nodeID, name, localBindAddress, groupAddress, groupPort, timeout, null);
}
protected DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress,
final InetAddress groupAddress, final int groupPort, final long timeout, NotificationService notif) throws Exception
{
return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastGroupConfiguration()
.setGroupAddress(groupAddress.getHostAddress())
.setGroupPort(groupPort)
.setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null)
.createBroadcastEndpointFactory(), notif);
}
protected final class FakeNodeManager extends NodeManager
{
public FakeNodeManager(String nodeID)
{
super(false, null);
this.setNodeID(nodeID);
}
@Override
public void awaitLiveNode() throws Exception
{
}
@Override
public void startBackup() throws Exception
{
}
@Override
public void startLiveNode() throws Exception
{
}
@Override
public void pauseLiveServer() throws Exception
{
}
@Override
public void crashLiveServer() throws Exception
{
}
@Override
public void releaseBackup() throws Exception
{
}
@Override
public SimpleString readNodeId()
{
return null;
}
@Override
public boolean isAwaitingFailback() throws Exception
{
return false;
}
@Override
public boolean isBackupLive() throws Exception
{
return false;
}
@Override
public void interrupt()
{
}
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.activemq.tests.integration.discovery;
import java.net.InetAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQBuffers;
import org.apache.activemq.api.core.BroadcastEndpoint;
import org.apache.activemq.api.core.BroadcastEndpointFactory;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.core.cluster.DiscoveryGroup;
import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl;
import org.apache.activemq.tests.util.RandomUtil;
import org.apache.activemq.utils.ActiveMQThreadFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author Clebert Suconic
*/
public class DiscoveryStayAliveTest extends DiscoveryBaseTest
{
ScheduledExecutorService scheduledExecutorService;
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new ActiveMQThreadFactory("ActiveMQ-scheduled-threads",
false,
Thread.currentThread().getContextClassLoader()));
}
public void tearDown() throws Exception
{
scheduledExecutorService.shutdown();
super.tearDown();
}
@Test
public void testDiscoveryRunning() throws Throwable
{
final InetAddress groupAddress = InetAddress.getByName(address1);
final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
final DiscoveryGroup dg = newDiscoveryGroup(RandomUtil.randomString(),
RandomUtil.randomString(),
null,
groupAddress,
groupPort,
timeout);
final AtomicInteger errors = new AtomicInteger(0);
Thread t = new Thread()
{
public void run()
{
try
{
dg.internalRunning();
}
catch (Throwable e)
{
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t.start();
BroadcastGroupImpl bg = new BroadcastGroupImpl(new FakeNodeManager("test-nodeID"),
RandomUtil.randomString(),
1, scheduledExecutorService, new UDPBroadcastGroupConfiguration().setGroupAddress(address1).
setGroupPort(groupPort).createBroadcastEndpointFactory());
bg.start();
bg.addConnector(generateTC());
for (int i = 0; i < 10; i++)
{
BroadcastEndpointFactory factoryEndpoint = new UDPBroadcastGroupConfiguration().setGroupAddress(address1).
setGroupPort(groupPort).createBroadcastEndpointFactory();
sendBadData(factoryEndpoint);
Thread.sleep(100);
assertTrue(t.isAlive());
assertEquals(0, errors.get());
}
bg.stop();
dg.stop();
t.join(5000);
Assert.assertFalse(t.isAlive());
}
private static void sendBadData(BroadcastEndpointFactory factoryEndpoint) throws Exception
{
BroadcastEndpoint endpoint = factoryEndpoint.createBroadcastEndpoint();
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(500);
buffer.writeString("This is a test1!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
buffer.writeString("This is a test2!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
byte[] bytes = new byte[buffer.writerIndex()];
buffer.readBytes(bytes);
// messing up with the string!!!
for (int i = bytes.length - 10; i < bytes.length; i++)
{
bytes[i] = 0;
}
endpoint.openBroadcaster();
endpoint.broadcast(bytes);
}
}

View File

@ -15,22 +15,15 @@ package org.apache.activemq.tests.integration.discovery;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.api.core.BroadcastEndpoint;
import org.apache.activemq.api.core.BroadcastEndpointFactory;
import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
@ -40,17 +33,12 @@ import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.api.core.management.CoreNotificationType;
import org.apache.activemq.core.cluster.DiscoveryEntry;
import org.apache.activemq.core.cluster.DiscoveryGroup;
import org.apache.activemq.core.cluster.DiscoveryListener;
import org.apache.activemq.core.server.ActiveMQComponent;
import org.apache.activemq.core.server.NodeManager;
import org.apache.activemq.core.server.cluster.BroadcastGroup;
import org.apache.activemq.core.server.cluster.impl.BroadcastGroupImpl;
import org.apache.activemq.core.server.management.Notification;
import org.apache.activemq.core.server.management.NotificationService;
import org.apache.activemq.tests.integration.IntegrationTestLogger;
import org.apache.activemq.tests.integration.SimpleNotificationService;
import org.apache.activemq.tests.util.RandomUtil;
import org.apache.activemq.tests.util.UnitTestCase;
import org.apache.activemq.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
@ -77,18 +65,10 @@ import org.junit.Test;
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public class DiscoveryTest extends UnitTestCase
public class DiscoveryTest extends DiscoveryBaseTest
{
private static final String TEST_JGROUPS_CONF_FILE = "test-jgroups-file_ping.xml";
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
private final String address1 = getUDPDiscoveryAddress();
private final String address2 = getUDPDiscoveryAddress(1);
private final String address3 = getUDPDiscoveryAddress(2);
BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null;
DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null;
@ -879,26 +859,6 @@ public class DiscoveryTest extends UnitTestCase
Assert.assertFalse(listener3.called);
}
/**
* @param discoveryGroup
* @throws Exception
*/
private static void verifyBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception
{
broadcastGroup.broadcastConnectors();
Assert.assertTrue("broadcast received", discoveryGroup.waitForBroadcast(2000));
}
/**
* @param discoveryGroup
* @throws Exception
*/
private static void verifyNonBroadcast(BroadcastGroup broadcastGroup, DiscoveryGroup discoveryGroup) throws Exception
{
broadcastGroup.broadcastConnectors();
Assert.assertFalse("NO broadcast received", discoveryGroup.waitForBroadcast(2000));
}
@Test
public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
{
@ -1238,185 +1198,4 @@ public class DiscoveryTest extends UnitTestCase
assertNotNull(object);
assertTrue(object instanceof JGroupsBroadcastGroupConfiguration);
}
private TransportConfiguration generateTC(String debug)
{
String className = "org.foo.bar." + debug + "|" + UUIDGenerator.getInstance().generateStringUUID() + "";
String name = UUIDGenerator.getInstance().generateStringUUID();
Map<String, Object> params = new HashMap<String, Object>();
params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
TransportConfiguration tc = new TransportConfiguration(className, params, name);
return tc;
}
private TransportConfiguration generateTC()
{
return generateTC("");
}
private static class MyListener implements DiscoveryListener
{
volatile boolean called;
public void connectorsChanged(List<DiscoveryEntry> newConnectors)
{
called = true;
}
}
private static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
{
assertNotNull(actual);
List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
{
public int compare(TransportConfiguration o1, TransportConfiguration o2)
{
return o2.toString().compareTo(o1.toString());
}
});
List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
{
public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
{
return o2.getConnector().toString().compareTo(o1.getConnector().toString());
}
});
if (sortedExpected.size() != sortedActual.size())
{
dump(sortedExpected, sortedActual);
}
assertEquals(sortedExpected.size(), sortedActual.size());
for (int i = 0; i < sortedExpected.size(); i++)
{
if (!sortedExpected.get(i).equals(sortedActual.get(i).getConnector()))
{
dump(sortedExpected, sortedActual);
}
assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
}
}
private static void dump(List<TransportConfiguration> sortedExpected, List<DiscoveryEntry> sortedActual)
{
System.out.println("wrong broadcasts received");
System.out.println("expected");
System.out.println("----------------------------");
for (TransportConfiguration transportConfiguration : sortedExpected)
{
System.out.println("transportConfiguration = " + transportConfiguration);
}
System.out.println("----------------------------");
System.out.println("actual");
System.out.println("----------------------------");
for (DiscoveryEntry discoveryEntry : sortedActual)
{
System.out.println("transportConfiguration = " + discoveryEntry.getConnector());
}
System.out.println("----------------------------");
}
/**
* This method is here just to facilitate creating the Broadcaster for this test
*/
private BroadcastGroupImpl newBroadcast(final String nodeID,
final String name,
final InetAddress localAddress,
int localPort,
final InetAddress groupAddress,
final int groupPort) throws Exception
{
return new BroadcastGroupImpl(new FakeNodeManager(nodeID), name, 0, null, new UDPBroadcastGroupConfiguration()
.setGroupAddress(groupAddress.getHostAddress())
.setGroupPort(groupPort)
.setLocalBindAddress(localAddress != null ? localAddress.getHostAddress() : null)
.setLocalBindPort(localPort)
.createBroadcastEndpointFactory());
}
private DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress,
final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
{
return newDiscoveryGroup(nodeID, name, localBindAddress, groupAddress, groupPort, timeout, null);
}
private DiscoveryGroup newDiscoveryGroup(final String nodeID, final String name, final InetAddress localBindAddress,
final InetAddress groupAddress, final int groupPort, final long timeout, NotificationService notif) throws Exception
{
return new DiscoveryGroup(nodeID, name, timeout, new UDPBroadcastGroupConfiguration()
.setGroupAddress(groupAddress.getHostAddress())
.setGroupPort(groupPort)
.setLocalBindAddress(localBindAddress != null ? localBindAddress.getHostAddress() : null)
.createBroadcastEndpointFactory(), notif);
}
private final class FakeNodeManager extends NodeManager
{
public FakeNodeManager(String nodeID)
{
super(false, null);
this.setNodeID(nodeID);
}
@Override
public void awaitLiveNode() throws Exception
{
}
@Override
public void startBackup() throws Exception
{
}
@Override
public void startLiveNode() throws Exception
{
}
@Override
public void pauseLiveServer() throws Exception
{
}
@Override
public void crashLiveServer() throws Exception
{
}
@Override
public void releaseBackup() throws Exception
{
}
@Override
public SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException
{
return null;
}
@Override
public boolean isAwaitingFailback() throws Exception
{
return false;
}
@Override
public boolean isBackupLive() throws Exception
{
return false;
}
@Override
public void interrupt()
{
}
}
}