ARTEMIS-2637 Making UDP client discovery resilient

In case there is a hardware, firewal or any other thing making the UDP connection to go deaf
we will now reopen the connection in an attempt to go over possible issues.

This is also improving locking around DiscoveryGroup initial connection.
This commit is contained in:
Clebert Suconic 2020-02-27 17:59:39 -05:00
parent 34f2fc8e88
commit 0cac669840
4 changed files with 311 additions and 28 deletions

View File

@ -420,6 +420,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void unableToCheckEpollAvailabilitynoClass();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {0} of {1}",
format = Message.Format.MESSAGE_FORMAT)
void broadcastTimeout(int retry, int maxretry);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
void onMessageError(@Cause Throwable e);

View File

@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.Interceptor;
@ -63,10 +64,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.remoting.Connector;
import org.apache.activemq.artemis.uri.ServerLocatorParser;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.*;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
@ -122,6 +120,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private boolean compressLargeMessage;
/** This specifies serverLocator.connect was used,
* which means it's a cluster connection.
* We should not use retries */
private volatile transient boolean disableDiscoveryRetries = false;
// if the system should shutdown the pool when shutting down
private transient boolean shutdownPool;
@ -133,6 +136,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private transient ConnectionLoadBalancingPolicy loadBalancingPolicy;
private final Object discoveryGroupGuardian = new Object();
// Settable attributes:
private boolean cacheLargeMessagesClient;
@ -211,6 +216,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private boolean useTopologyForLoadBalancing;
/** For tests only */
public DiscoveryGroup getDiscoveryGroup() {
return discoveryGroup;
}
private final Exception traceException = new Exception();
public static synchronized void clearThreadPools() {
@ -230,7 +240,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ClientSessionFactoryImpl.class.getClassLoader());
return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ServerLocatorImpl.class.getClassLoader());
}
});
@ -299,13 +309,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
instantiateLoadBalancingPolicy();
if (discoveryGroupConfiguration != null) {
discoveryGroup = createDiscoveryGroup(nodeID, discoveryGroupConfiguration);
startDiscovery();
discoveryGroup.registerListener(this);
discoveryGroup.start();
}
} catch (Exception e) {
state = null;
throw ActiveMQClientMessageBundle.BUNDLE.failedToInitialiseSessionFactory(e);
@ -313,6 +318,20 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
private void startDiscovery() throws ActiveMQException {
if (discoveryGroupConfiguration != null) {
try {
discoveryGroup = createDiscoveryGroup(nodeID, discoveryGroupConfiguration);
discoveryGroup.registerListener(this);
discoveryGroup.start();
} catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e);
}
}
}
private static DiscoveryGroup createDiscoveryGroup(String nodeID,
DiscoveryGroupConfiguration config) throws Exception {
return new DiscoveryGroup(nodeID, config.getName(), config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
@ -633,6 +652,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException {
// if we used connect, we should control UDP reconnections at a different path.
// and this belongs to a cluster connection, not client
disableDiscoveryRetries = true;
ClientSessionFactoryInternal returnFactory = null;
synchronized (this) {
@ -752,14 +774,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
flushTopology();
if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) {
// Wait for an initial broadcast to give us at least one node in the cluster
long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
boolean ok = discoveryGroup.waitForBroadcast(timeout);
if (!ok) {
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast();
}
if (discoveryGroupConfiguration != null) {
executeDiscovery();
}
ClientSessionFactoryInternal factory = null;
@ -826,6 +842,77 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return factory;
}
private void executeDiscovery() throws ActiveMQException {
boolean discoveryOK = false;
boolean retryDiscovery = false;
int tryNumber = 0;
do {
discoveryOK = checkOnDiscovery();
retryDiscovery = (initialConnectAttempts > 0 && tryNumber++ < initialConnectAttempts) && !disableDiscoveryRetries;
if (!discoveryOK) {
if (retryDiscovery) {
ActiveMQClientLogger.LOGGER.broadcastTimeout(tryNumber, initialConnectAttempts);
} else {
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast();
}
}
}
while (!discoveryOK && retryDiscovery);
if (!discoveryOK) {
// I don't think the code would ever get to this situation, since there's an exception thrown on the previous loop
// however I will keep this just in case
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast();
}
}
private boolean checkOnDiscovery() throws ActiveMQException {
synchronized (discoveryGroupGuardian) {
// notice: in case you have many threads waiting to get on checkOnDiscovery, only one will perform the actual discovery
// while subsequent calls will have numberOfInitialConnectors > 0
if (this.getNumInitialConnectors() == 0 && discoveryGroupConfiguration != null) {
try {
long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
if (!discoveryGroup.waitForBroadcast(timeout)) {
if (logger.isDebugEnabled()) {
String threadDump = ThreadDumpUtil.threadDump("Discovery timeout, printing thread dump");
logger.debug(threadDump);
}
// if disableDiscoveryRetries = true, it means this is a Bridge or a Cluster Connection Bridge
// which has a different mechanism of retry
// and we should ignore UDP restarts here.
if (!disableDiscoveryRetries) {
if (discoveryGroup != null) {
discoveryGroup.stop();
}
logger.debug("Restarting discovery");
startDiscovery();
}
return false;
}
} catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e);
}
}
}
return true;
}
public void flushTopology() {
if (updateArrayActor != null) {
updateArrayActor.flush(10, TimeUnit.SECONDS);
@ -1682,8 +1769,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
while (!isClosed()) {
retryNumber++;
for (Connector conn : connectors) {
if (logger.isDebugEnabled()) {
logger.debug(this + "::Submitting connect towards " + conn);
if (logger.isTraceEnabled()) {
logger.trace(this + "::Submitting connect towards " + conn);
}
ClientSessionFactory csf = conn.tryConnect();
@ -1717,8 +1804,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
});
if (logger.isDebugEnabled()) {
logger.debug("Returning " + csf +
if (logger.isTraceEnabled()) {
logger.trace("Returning " + csf +
" after " +
retryNumber +
" retries on StaticConnector " +
@ -1740,7 +1827,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} catch (RejectedExecutionException e) {
if (isClosed() || skipWarnings)
return null;
logger.debug("Rejected execution", e);
logger.trace("Rejected execution", e);
throw e;
} catch (Exception e) {
if (isClosed() || skipWarnings)
@ -1809,7 +1896,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
public ClientSessionFactory tryConnect() throws ActiveMQException {
if (logger.isDebugEnabled()) {
logger.debug(this + "::Trying to connect to " + factory);
logger.trace(this + "::Trying to connect to " + factory);
}
try {
ClientSessionFactoryInternal factoryToUse = factory;
@ -1824,7 +1911,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
return factoryToUse;
} catch (ActiveMQException e) {
logger.debug(this + "::Exception on establish connector initial connection", e);
logger.trace(this + "::Exception on establish connector initial connection", e);
return null;
}
}

View File

@ -16,12 +16,15 @@
*/
package org.apache.activemq.artemis.core.cluster;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -35,6 +38,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
@ -102,13 +106,22 @@ public final class DiscoveryGroup implements ActiveMQComponent {
return;
}
if (logger.isDebugEnabled()) logger.debug("Starting Discovery Group for " + name);
endpoint.openClient();
started = true;
thread = new Thread(new DiscoveryRunnable(), "activemq-discovery-group-thread-" + name);
ThreadFactory tfactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@Override
public ThreadFactory run() {
return new ActiveMQThreadFactory("DiscoveryGroup-" + System.identityHashCode(this), "activemq-discovery-group-thread-" + name, true, DiscoveryGroup.class.getClassLoader());
}
});
thread.setDaemon(true);
thread = tfactory.newThread(new DiscoveryRunnable());
if (logger.isDebugEnabled()) logger.debug("Starting daemon thread");
thread.start();
@ -136,6 +149,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
@Override
public void stop() {
if (logger.isDebugEnabled()) {
logger.debug("Stopping discovery. There's an exception just as a trace where it happened", new Exception("trace"));
}
synchronized (this) {
if (!started) {
return;
@ -150,6 +167,9 @@ public final class DiscoveryGroup implements ActiveMQComponent {
try {
endpoint.close(false);
if (logger.isDebugEnabled()) {
logger.debug("endpoing closed");
}
} catch (Exception e1) {
ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1);
}
@ -167,6 +187,7 @@ public final class DiscoveryGroup implements ActiveMQComponent {
}
thread = null;
received = false;
if (notificationService != null) {
TypedProperties props = new TypedProperties();
@ -255,8 +276,16 @@ public final class DiscoveryGroup implements ActiveMQComponent {
if (started) {
ActiveMQClientLogger.LOGGER.unexpectedNullDataReceived();
}
if (logger.isDebugEnabled()) {
logger.debug("Received broadcast data as null");
}
break;
}
if (logger.isDebugEnabled()) {
logger.debug("receiving " + data.length);
}
} catch (Exception e) {
if (!started) {
return;
@ -274,6 +303,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
checkUniqueID(originatingNodeID, uniqueID);
if (nodeID.equals(originatingNodeID)) {
if (logger.isDebugEnabled()) {
logger.debug("ignoring original NodeID" + originatingNodeID + " receivedID = " + nodeID);
}
if (checkExpiration()) {
callListeners();
}
@ -281,6 +314,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("Received nodeID " + nodeID);
}
int size = buffer.readInt();
boolean changed = false;
@ -295,6 +332,15 @@ public final class DiscoveryGroup implements ActiveMQComponent {
entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
}
if (logger.isDebugEnabled()) {
logger.debug("Received " + entriesRead.length + " discovery entry elements");
for (DiscoveryEntry entryDisco : entriesRead) {
logger.debug("" + entryDisco);
}
}
synchronized (DiscoveryGroup.this) {
for (DiscoveryEntry entry : entriesRead) {
if (connectors.put(originatingNodeID, entry) == null) {
@ -303,6 +349,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
}
changed = changed || checkExpiration();
if (logger.isDebugEnabled()) {
logger.debug("changed = " + changed);
}
}
//only call the listeners if we have changed
//also make sure that we aren't stopping to avoid deadlock
@ -313,12 +363,18 @@ public final class DiscoveryGroup implements ActiveMQComponent {
logger.trace(connector);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Calling listeners");
}
callListeners();
}
synchronized (waitLock) {
received = true;
if (logger.isDebugEnabled()) {
logger.debug("Calling notifyAll");
}
waitLock.notifyAll();
}
} catch (Throwable e) {

View File

@ -16,6 +16,18 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.topology;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
public class NettyHAClientTopologyWithDiscoveryTest extends HAClientTopologyWithDiscoveryTest {
@Override
@ -23,4 +35,127 @@ public class NettyHAClientTopologyWithDiscoveryTest extends HAClientTopologyWith
return true;
}
@Test
public void testRecoveryBadUDPWithRetry() throws Exception {
startServers(0);
ServerLocatorImpl serverLocator = (ServerLocatorImpl) createHAServerLocator();
serverLocator.setInitialConnectAttempts(10);
serverLocator.initialize();
serverLocator.getDiscoveryGroup().stop();
ClientSessionFactory factory = serverLocator.createSessionFactory();
ClientSession session = factory.createSession();
session.close();
}
@Test
public void testRecoveryBadUDPWithoutRetry() throws Exception {
startServers(0);
ServerLocatorImpl serverLocator = (ServerLocatorImpl) createHAServerLocator();
serverLocator.setInitialConnectAttempts(0);
serverLocator.initialize();
serverLocator.getDiscoveryGroup().stop();
boolean failure = false;
try {
ClientSessionFactory factory = serverLocator.createSessionFactory();
ClientSession session = factory.createSession();
session.close();
factory.close();
} catch (Exception e) {
e.printStackTrace();
failure = true;
}
Assert.assertTrue(failure);
ClientSessionFactory factory = serverLocator.createSessionFactory();
ClientSession session = factory.createSession();
session.close();
factory.close();
}
@Test
public void testNoServer() {
final ServerLocatorImpl serverLocator = (ServerLocatorImpl)ActiveMQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration().
setBroadcastEndpointFactory(new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress).
setGroupPort(groupPort)).setDiscoveryInitialWaitTimeout(10)).setInitialConnectAttempts(0);
addServerLocator(serverLocator);
serverLocator.setInitialConnectAttempts(3);
try {
serverLocator.createSessionFactory();
Assert.fail("Exception was expected");
} catch (Exception e) {
}
}
@Test
public void testConnectWithMultiThread() throws Exception {
final AtomicInteger errors = new AtomicInteger(0);
int NUMBER_OF_THREADS = 100;
final CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS);
final ServerLocatorImpl serverLocator = (ServerLocatorImpl)ActiveMQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration().
setBroadcastEndpointFactory(new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress).
setGroupPort(groupPort)).setDiscoveryInitialWaitTimeout(1000)).setInitialConnectAttempts(0);
serverLocator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
addServerLocator(serverLocator);
startServers(0);
try {
serverLocator.setInitialConnectAttempts(0);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
barrier.await();
ClientSessionFactory factory = serverLocator.createSessionFactory();
ClientSession session = factory.createSession();
session.close();
factory.close();
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
Thread[] threads = new Thread[NUMBER_OF_THREADS];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(runnable);
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
Assert.assertEquals(0, errors.get());
serverLocator.close();
serverLocator.getDiscoveryGroup().stop();
} finally {
stopServers(0);
}
}
}