ARTEMIS-4436 Artemis is logging warnings during clean shutdown of server

in cluster.

When we know that a node leaves a clustercleanly we shouldn't log WARN
messages about it.

Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
This commit is contained in:
Emmanuel Hugonnet 2023-09-25 11:21:57 +02:00 committed by clebertsuconic
parent fe5afc1d69
commit 2269ad417e
13 changed files with 141 additions and 19 deletions

View File

@ -350,4 +350,7 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 214035, value = "Couldn't finish the client globalFlowControlThreadPool in less than 10 seconds, interrupting it now", level = LogMessage.Level.WARN)
void unableToProcessGlobalFlowControlThreadPoolIn10Sec();
@LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO)
void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type);
}

View File

@ -483,15 +483,15 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
private void interruptConnectAndCloseAllSessions(boolean close) {
//release all threads waiting for topology
latchFinalTopology.countDown();
clientProtocolManager.stop();
synchronized (createSessionLock) {
closeCleanSessions(close);
closed = true;
}
//release all threads waiting for topology
latchFinalTopology.countDown();
}
/**
@ -544,7 +544,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
return latchFinalTopology.await(timeout, unit) && topologyReady;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (!isClosed()) {
ActiveMQClientLogger.LOGGER.unableToReceiveClusterTopology(e);
}
return false;
}
}
@ -1547,7 +1549,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
serverLocator, nodeID, reason, new Exception("trace"));
}
serverLocator.notifyNodeDown(System.currentTimeMillis(), nodeID);
serverLocator.notifyNodeDown(System.currentTimeMillis(), nodeID, true);
if (reason.isRedirect()) {
if (serverLocator.isHA()) {
@ -1596,8 +1598,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
@Override
public void notifyNodeDown(long eventTime, String nodeID) {
serverLocator.notifyNodeDown(eventTime, nodeID);
public void notifyNodeDown(long eventTime, String nodeID, boolean disconnect) {
serverLocator.notifyNodeDown(eventTime, nodeID, disconnect);
}
}
}

View File

@ -1500,7 +1500,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
* Look for callers of this method!
*/
@Override
public void notifyNodeDown(final long eventTime, final String nodeID) {
public void notifyNodeDown(final long eventTime, final String nodeID, boolean disconnect) {
if (!ha) {
// there's no topology here
@ -1511,7 +1511,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
logger.trace("nodeDown {} nodeID={} as being down", this, nodeID, new Exception("trace"));
}
topology.removeMember(eventTime, nodeID);
topology.removeMember(eventTime, nodeID, disconnect);
if (clusterConnection) {
updateArraysAndPairs(eventTime);

View File

@ -71,7 +71,10 @@ public interface ServerLocatorInternal extends ServerLocator {
* @param uniqueEventID 0 means get the previous ID +1
* @param nodeID
*/
void notifyNodeDown(long uniqueEventID, String nodeID);
default void notifyNodeDown(long uniqueEventID, String nodeID) {
notifyNodeDown(uniqueEventID, nodeID, false);
}
void notifyNodeDown(long uniqueEventID, String nodeID, boolean disconnect);
ServerLocatorInternal setClusterConnection(boolean clusterConnection);

View File

@ -302,11 +302,11 @@ public final class Topology {
return listenersCopy;
}
boolean removeMember(final long uniqueEventID, final String nodeId) {
boolean removeMember(final long uniqueEventID, final String nodeId, final boolean disconnect) {
TopologyMemberImpl member;
if (manager != null && !manager.removeMember(uniqueEventID, nodeId)) {
if (manager != null && !manager.removeMember(uniqueEventID, nodeId, disconnect)) {
logger.debug("TopologyManager rejected the update towards {}", nodeId);
return false;
}

View File

@ -19,5 +19,5 @@ package org.apache.activemq.artemis.core.client.impl;
public interface TopologyManager {
boolean updateMember(long uniqueEventID, String nodeId, TopologyMemberImpl memberInput);
boolean removeMember(long uniqueEventID, String nodeId);
boolean removeMember(long uniqueEventID, String nodeId, boolean disconnect);
}

View File

@ -532,7 +532,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
logger.debug("Notifying {} going down", topMessage.getNodeID());
if (topologyResponseHandler != null) {
topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID());
topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID(), topMessage.isExit());
}
} else {
Pair<TransportConfiguration, TransportConfiguration> transportConfig = topMessage.getPair();

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {
@ -198,8 +199,10 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
destroyed = true;
}
if (!(me instanceof ActiveMQRemoteDisconnectException) && !(me instanceof ActiveMQRoutingException)) {
if (!(me instanceof ActiveMQRemoteDisconnectException) && !(me instanceof ActiveMQRoutingException) && !(me instanceof ActiveMQDisconnectedException)) {
ActiveMQClientLogger.LOGGER.connectionFailureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType());
} else if (me instanceof ActiveMQDisconnectedException) {
ActiveMQClientLogger.LOGGER.connectionClosureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType());
}
try {

View File

@ -34,5 +34,5 @@ public interface TopologyResponseHandler {
boolean isLast);
// This is sent when any node on the cluster topology is going down
void notifyNodeDown(long eventTime, String nodeID);
void notifyNodeDown(long eventTime, String nodeID, boolean disconnect);
}

View File

@ -1602,4 +1602,10 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224133, value = "{} orphaned page transactions have been removed", level = LogMessage.Level.INFO)
void cleaningOrphanedTXCleanup(long numberOfPageTx);
@LogMessage(id = 224134, value = "Connection closed with failedOver={}", level = LogMessage.Level.INFO)
void bridgeConnectionClosed(Boolean failedOver);
@LogMessage(id = 224135, value = "nodeID {} is closing. Topology update ignored", level = LogMessage.Level.INFO)
void nodeLeavingCluster(String nodeID);
}

View File

@ -75,6 +75,7 @@ import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
/**
* A Core BridgeImpl
@ -661,8 +662,12 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override
public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
if (server.isStarted()) {
if (me instanceof ActiveMQDisconnectedException) {
ActiveMQServerLogger.LOGGER.bridgeConnectionClosed(failedOver);
} else {
ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(failedOver);
}
}
synchronized (connectionGuard) {
keepConnecting = true;

View File

@ -554,9 +554,13 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
* @return
*/
@Override
public boolean removeMember(final long uniqueEventID, final String nodeId) {
public boolean removeMember(final long uniqueEventID, final String nodeId, final boolean disconnect) {
if (nodeId.equals(nodeManager.getNodeId().toString())) {
if (!disconnect) {
ActiveMQServerLogger.LOGGER.possibleSplitBrain(nodeId);
} else {
ActiveMQServerLogger.LOGGER.nodeLeavingCluster(nodeId);
}
return false;
}
return true;

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.warnings;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClusterCleanNodeShutdownTest extends ClusterTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test
public void testNoWarningErrorsDuringRestartingNodesInCluster() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(0, 1);
Wait.assertTrue(() -> {
getServer(0).getClusterManager().getClusterController().awaitConnectionToReplicationCluster();
return true;
}, 2000L);
Wait.assertTrue(() -> {
getServer(1).getClusterManager().getClusterController().awaitConnectionToReplicationCluster();
return true;
}, 2000L);
logger.debug("server 0 = {}", getServer(0).getNodeID());
logger.debug("server 1 = {}", getServer(1).getNodeID());
setupSessionFactory(0, isNetty(), 15);
setupSessionFactory(1, isNetty());
// now create the 2 queues and make sure they are durable
createQueue(0, "queues.testaddress", "queue10", null, true);
createQueue(1, "queues.testaddress", "queue10", null, true);
addConsumer(0, 0, "queue10", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
sendInRange(1, "queues.testaddress", 0, 10, true, null);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(true)) {
logger.debug("*****************************************************************************");
stopServers(0);
// Waiting some time after stopped
Wait.assertTrue(() -> !getServer(0).isStarted() && !getServer(0).isActive(), 2000L);
logger.debug("*****************************************************************************");
Assert.assertFalse("Connection failure detected for an expected DISCONNECT event", loggerHandler.findText("AMQ212037", " [code=DISCONNECTED]"));
Assert.assertFalse("WARN found", loggerHandler.hasLevel(AssertionLoggerHandler.LogLevel.WARN));
}
startServers(0);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
sendInRange(1, "queues.testaddress", 10, 20, false, null);
verifyReceiveAllInRange(0, 20, 0);
logger.debug("*****************************************************************************");
}
public boolean isNetty() {
return true;
}
}