From 0cac669840d22ed6b9b89b5bdecc91f0c9d53126 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 27 Feb 2020 17:59:39 -0500 Subject: [PATCH] ARTEMIS-2637 Making UDP client discovery resilient In case there is a hardware, firewal or any other thing making the UDP connection to go deaf we will now reopen the connection in an attempt to go over possible issues. This is also improving locking around DiscoveryGroup initial connection. --- .../core/client/ActiveMQClientLogger.java | 5 + .../core/client/impl/ServerLocatorImpl.java | 139 ++++++++++++++---- .../artemis/core/cluster/DiscoveryGroup.java | 60 +++++++- ...ettyHAClientTopologyWithDiscoveryTest.java | 135 +++++++++++++++++ 4 files changed, 311 insertions(+), 28 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 700b863827..9eebed87d0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -420,6 +420,11 @@ public interface ActiveMQClientLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void unableToCheckEpollAvailabilitynoClass(); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {0} of {1}", + format = Message.Format.MESSAGE_FORMAT) + void broadcastTimeout(int retry, int maxretry); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT) void onMessageError(@Cause Throwable e); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index d8bccd2e3a..f06361af7d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; +import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.Interceptor; @@ -63,10 +64,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.apache.activemq.artemis.uri.ServerLocatorParser; -import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; -import org.apache.activemq.artemis.utils.ClassloadingUtil; -import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.*; import org.apache.activemq.artemis.utils.actors.Actor; import org.apache.activemq.artemis.utils.actors.OrderedExecutor; import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores; @@ -122,6 +120,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private boolean compressLargeMessage; + /** This specifies serverLocator.connect was used, + * which means it's a cluster connection. + * We should not use retries */ + private volatile transient boolean disableDiscoveryRetries = false; + // if the system should shutdown the pool when shutting down private transient boolean shutdownPool; @@ -133,6 +136,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private transient ConnectionLoadBalancingPolicy loadBalancingPolicy; + private final Object discoveryGroupGuardian = new Object(); + // Settable attributes: private boolean cacheLargeMessagesClient; @@ -211,6 +216,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private boolean useTopologyForLoadBalancing; + /** For tests only */ + public DiscoveryGroup getDiscoveryGroup() { + return discoveryGroup; + } + private final Exception traceException = new Exception(); public static synchronized void clearThreadPools() { @@ -230,7 +240,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction() { @Override public ThreadFactory run() { - return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ClientSessionFactoryImpl.class.getClassLoader()); + return new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, ServerLocatorImpl.class.getClassLoader()); } }); @@ -299,13 +309,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery instantiateLoadBalancingPolicy(); - if (discoveryGroupConfiguration != null) { - discoveryGroup = createDiscoveryGroup(nodeID, discoveryGroupConfiguration); + startDiscovery(); - discoveryGroup.registerListener(this); - - discoveryGroup.start(); - } } catch (Exception e) { state = null; throw ActiveMQClientMessageBundle.BUNDLE.failedToInitialiseSessionFactory(e); @@ -313,6 +318,20 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + private void startDiscovery() throws ActiveMQException { + if (discoveryGroupConfiguration != null) { + try { + discoveryGroup = createDiscoveryGroup(nodeID, discoveryGroupConfiguration); + + discoveryGroup.registerListener(this); + + discoveryGroup.start(); + } catch (Exception e) { + throw new ActiveMQInternalErrorException(e.getMessage(), e); + } + } + } + private static DiscoveryGroup createDiscoveryGroup(String nodeID, DiscoveryGroupConfiguration config) throws Exception { return new DiscoveryGroup(nodeID, config.getName(), config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null); @@ -633,6 +652,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException { + // if we used connect, we should control UDP reconnections at a different path. + // and this belongs to a cluster connection, not client + disableDiscoveryRetries = true; ClientSessionFactoryInternal returnFactory = null; synchronized (this) { @@ -752,14 +774,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery flushTopology(); - if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) { - // Wait for an initial broadcast to give us at least one node in the cluster - long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout(); - boolean ok = discoveryGroup.waitForBroadcast(timeout); - - if (!ok) { - throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast(); - } + if (discoveryGroupConfiguration != null) { + executeDiscovery(); } ClientSessionFactoryInternal factory = null; @@ -826,6 +842,77 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return factory; } + private void executeDiscovery() throws ActiveMQException { + boolean discoveryOK = false; + boolean retryDiscovery = false; + int tryNumber = 0; + + do { + + discoveryOK = checkOnDiscovery(); + + retryDiscovery = (initialConnectAttempts > 0 && tryNumber++ < initialConnectAttempts) && !disableDiscoveryRetries; + + if (!discoveryOK) { + + if (retryDiscovery) { + ActiveMQClientLogger.LOGGER.broadcastTimeout(tryNumber, initialConnectAttempts); + } else { + throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast(); + } + } + } + while (!discoveryOK && retryDiscovery); + + if (!discoveryOK) { + // I don't think the code would ever get to this situation, since there's an exception thrown on the previous loop + // however I will keep this just in case + throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast(); + } + + } + + private boolean checkOnDiscovery() throws ActiveMQException { + + synchronized (discoveryGroupGuardian) { + + // notice: in case you have many threads waiting to get on checkOnDiscovery, only one will perform the actual discovery + // while subsequent calls will have numberOfInitialConnectors > 0 + if (this.getNumInitialConnectors() == 0 && discoveryGroupConfiguration != null) { + try { + + long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout(); + if (!discoveryGroup.waitForBroadcast(timeout)) { + + if (logger.isDebugEnabled()) { + String threadDump = ThreadDumpUtil.threadDump("Discovery timeout, printing thread dump"); + logger.debug(threadDump); + } + + // if disableDiscoveryRetries = true, it means this is a Bridge or a Cluster Connection Bridge + // which has a different mechanism of retry + // and we should ignore UDP restarts here. + if (!disableDiscoveryRetries) { + if (discoveryGroup != null) { + discoveryGroup.stop(); + } + + logger.debug("Restarting discovery"); + + startDiscovery(); + } + + return false; + } + } catch (Exception e) { + throw new ActiveMQInternalErrorException(e.getMessage(), e); + } + } + } + + return true; + } + public void flushTopology() { if (updateArrayActor != null) { updateArrayActor.flush(10, TimeUnit.SECONDS); @@ -1682,8 +1769,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery while (!isClosed()) { retryNumber++; for (Connector conn : connectors) { - if (logger.isDebugEnabled()) { - logger.debug(this + "::Submitting connect towards " + conn); + if (logger.isTraceEnabled()) { + logger.trace(this + "::Submitting connect towards " + conn); } ClientSessionFactory csf = conn.tryConnect(); @@ -1717,8 +1804,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } }); - if (logger.isDebugEnabled()) { - logger.debug("Returning " + csf + + if (logger.isTraceEnabled()) { + logger.trace("Returning " + csf + " after " + retryNumber + " retries on StaticConnector " + @@ -1740,7 +1827,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } catch (RejectedExecutionException e) { if (isClosed() || skipWarnings) return null; - logger.debug("Rejected execution", e); + logger.trace("Rejected execution", e); throw e; } catch (Exception e) { if (isClosed() || skipWarnings) @@ -1809,7 +1896,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientSessionFactory tryConnect() throws ActiveMQException { if (logger.isDebugEnabled()) { - logger.debug(this + "::Trying to connect to " + factory); + logger.trace(this + "::Trying to connect to " + factory); } try { ClientSessionFactoryInternal factoryToUse = factory; @@ -1824,7 +1911,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } return factoryToUse; } catch (ActiveMQException e) { - logger.debug(this + "::Exception on establish connector initial connection", e); + logger.trace(this + "::Exception on establish connector initial connection", e); return null; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java index b57042b66a..86b28d2a11 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java @@ -16,12 +16,15 @@ */ package org.apache.activemq.artemis.core.cluster; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadFactory; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -35,6 +38,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationService; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; @@ -102,13 +106,22 @@ public final class DiscoveryGroup implements ActiveMQComponent { return; } + if (logger.isDebugEnabled()) logger.debug("Starting Discovery Group for " + name); + endpoint.openClient(); started = true; - thread = new Thread(new DiscoveryRunnable(), "activemq-discovery-group-thread-" + name); + ThreadFactory tfactory = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ThreadFactory run() { + return new ActiveMQThreadFactory("DiscoveryGroup-" + System.identityHashCode(this), "activemq-discovery-group-thread-" + name, true, DiscoveryGroup.class.getClassLoader()); + } + }); - thread.setDaemon(true); + thread = tfactory.newThread(new DiscoveryRunnable()); + + if (logger.isDebugEnabled()) logger.debug("Starting daemon thread"); thread.start(); @@ -136,6 +149,10 @@ public final class DiscoveryGroup implements ActiveMQComponent { @Override public void stop() { + + if (logger.isDebugEnabled()) { + logger.debug("Stopping discovery. There's an exception just as a trace where it happened", new Exception("trace")); + } synchronized (this) { if (!started) { return; @@ -150,6 +167,9 @@ public final class DiscoveryGroup implements ActiveMQComponent { try { endpoint.close(false); + if (logger.isDebugEnabled()) { + logger.debug("endpoing closed"); + } } catch (Exception e1) { ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1); } @@ -167,6 +187,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { } thread = null; + received = false; if (notificationService != null) { TypedProperties props = new TypedProperties(); @@ -255,8 +276,16 @@ public final class DiscoveryGroup implements ActiveMQComponent { if (started) { ActiveMQClientLogger.LOGGER.unexpectedNullDataReceived(); } + + if (logger.isDebugEnabled()) { + logger.debug("Received broadcast data as null"); + } break; } + + if (logger.isDebugEnabled()) { + logger.debug("receiving " + data.length); + } } catch (Exception e) { if (!started) { return; @@ -274,6 +303,10 @@ public final class DiscoveryGroup implements ActiveMQComponent { checkUniqueID(originatingNodeID, uniqueID); if (nodeID.equals(originatingNodeID)) { + + if (logger.isDebugEnabled()) { + logger.debug("ignoring original NodeID" + originatingNodeID + " receivedID = " + nodeID); + } if (checkExpiration()) { callListeners(); } @@ -281,6 +314,10 @@ public final class DiscoveryGroup implements ActiveMQComponent { continue; } + if (logger.isDebugEnabled()) { + logger.debug("Received nodeID " + nodeID); + } + int size = buffer.readInt(); boolean changed = false; @@ -295,6 +332,15 @@ public final class DiscoveryGroup implements ActiveMQComponent { entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis()); } + + if (logger.isDebugEnabled()) { + logger.debug("Received " + entriesRead.length + " discovery entry elements"); + for (DiscoveryEntry entryDisco : entriesRead) { + logger.debug("" + entryDisco); + } + } + + synchronized (DiscoveryGroup.this) { for (DiscoveryEntry entry : entriesRead) { if (connectors.put(originatingNodeID, entry) == null) { @@ -303,6 +349,10 @@ public final class DiscoveryGroup implements ActiveMQComponent { } changed = changed || checkExpiration(); + + if (logger.isDebugEnabled()) { + logger.debug("changed = " + changed); + } } //only call the listeners if we have changed //also make sure that we aren't stopping to avoid deadlock @@ -313,12 +363,18 @@ public final class DiscoveryGroup implements ActiveMQComponent { logger.trace(connector); } } + if (logger.isDebugEnabled()) { + logger.debug("Calling listeners"); + } callListeners(); } synchronized (waitLock) { received = true; + if (logger.isDebugEnabled()) { + logger.debug("Calling notifyAll"); + } waitLock.notifyAll(); } } catch (Throwable e) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java index 0066ae188a..7458ebf5e2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/topology/NettyHAClientTopologyWithDiscoveryTest.java @@ -16,6 +16,18 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.topology; +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + public class NettyHAClientTopologyWithDiscoveryTest extends HAClientTopologyWithDiscoveryTest { @Override @@ -23,4 +35,127 @@ public class NettyHAClientTopologyWithDiscoveryTest extends HAClientTopologyWith return true; } + + + @Test + public void testRecoveryBadUDPWithRetry() throws Exception { + startServers(0); + ServerLocatorImpl serverLocator = (ServerLocatorImpl) createHAServerLocator(); + serverLocator.setInitialConnectAttempts(10); + serverLocator.initialize(); + serverLocator.getDiscoveryGroup().stop(); + + + ClientSessionFactory factory = serverLocator.createSessionFactory(); + ClientSession session = factory.createSession(); + session.close(); + } + + @Test + public void testRecoveryBadUDPWithoutRetry() throws Exception { + startServers(0); + ServerLocatorImpl serverLocator = (ServerLocatorImpl) createHAServerLocator(); + serverLocator.setInitialConnectAttempts(0); + serverLocator.initialize(); + serverLocator.getDiscoveryGroup().stop(); + + + boolean failure = false; + try { + ClientSessionFactory factory = serverLocator.createSessionFactory(); + ClientSession session = factory.createSession(); + session.close(); + factory.close(); + } catch (Exception e) { + e.printStackTrace(); + failure = true; + } + + Assert.assertTrue(failure); + + ClientSessionFactory factory = serverLocator.createSessionFactory(); + ClientSession session = factory.createSession(); + session.close(); + factory.close(); + + } + + @Test + public void testNoServer() { + final ServerLocatorImpl serverLocator = (ServerLocatorImpl)ActiveMQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration(). + setBroadcastEndpointFactory(new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress). + setGroupPort(groupPort)).setDiscoveryInitialWaitTimeout(10)).setInitialConnectAttempts(0); + addServerLocator(serverLocator); + serverLocator.setInitialConnectAttempts(3); + + try { + serverLocator.createSessionFactory(); + Assert.fail("Exception was expected"); + } catch (Exception e) { + } + } + + + @Test + public void testConnectWithMultiThread() throws Exception { + final AtomicInteger errors = new AtomicInteger(0); + int NUMBER_OF_THREADS = 100; + final CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS); + final ServerLocatorImpl serverLocator = (ServerLocatorImpl)ActiveMQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration(). + setBroadcastEndpointFactory(new UDPBroadcastEndpointFactory().setGroupAddress(groupAddress). + setGroupPort(groupPort)).setDiscoveryInitialWaitTimeout(1000)).setInitialConnectAttempts(0); + serverLocator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); + addServerLocator(serverLocator); + + startServers(0); + + try { + + serverLocator.setInitialConnectAttempts(0); + + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + barrier.await(); + + ClientSessionFactory factory = serverLocator.createSessionFactory(); + ClientSession session = factory.createSession(); + session.close(); + factory.close(); + + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + + + Thread[] threads = new Thread[NUMBER_OF_THREADS]; + + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(runnable); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + + Assert.assertEquals(0, errors.get()); + + serverLocator.close(); + + serverLocator.getDiscoveryGroup().stop(); + } finally { + stopServers(0); + } + } + + + + + }