From ac7b0e85fbebaa713072c30d4e159d8f3a402ba3 Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Sat, 10 Dec 2022 08:46:37 +0100 Subject: [PATCH] ARTEMIS-4113 Fix NPE for backup brokers with connection routers The nodeID on backup brokers is available only after they become live. --- .../core/server/routing/ConnectionRouter.java | 8 ++++ .../routing/ConnectionRouterManager.java | 4 +- .../routing/pools/DiscoveryGroupService.java | 12 +++-- .../server/routing/targets/LocalTarget.java | 6 ++- .../routing/ConnectionRouterManagerTest.java | 2 - .../server/routing/ConnectionRouterTest.java | 3 -- .../integration/routing/KeyTypeTest.java | 44 +++++++++++++++++++ 7 files changed, 66 insertions(+), 13 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java index c89e17d646..c0e4335a42 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java @@ -128,6 +128,10 @@ public class ConnectionRouter implements ActiveMQComponent { @Override public void start() throws Exception { + if (localTarget != null) { + localTarget.getTarget().connect(); + } + if (cache != null) { cache.start(); } @@ -150,6 +154,10 @@ public class ConnectionRouter implements ActiveMQComponent { if (cache != null) { cache.stop(); } + + if (localTarget != null) { + localTarget.getTarget().disconnect(); + } } public TargetResult getTarget(Connection connection, String clientID, String username) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java index e3ac3aba61..1e7c3de5a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.routing; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.cluster.DiscoveryGroup; import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration; import org.apache.activemq.artemis.core.config.routing.CacheConfiguration; import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration; @@ -142,8 +141,7 @@ public final class ConnectionRouterManager implements ActiveMQComponent { DiscoveryGroupConfiguration discoveryGroupConfiguration = server.getConfiguration(). getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName()); - DiscoveryService discoveryService = new DiscoveryGroupService(new DiscoveryGroup(server.getNodeID().toString(), config.getDiscoveryGroupName(), - discoveryGroupConfiguration.getRefreshTimeout(), discoveryGroupConfiguration.getBroadcastEndpointFactory(), null)); + DiscoveryService discoveryService = new DiscoveryGroupService(localTarget, discoveryGroupConfiguration); pool = new DiscoveryPool(targetFactory, scheduledExecutor, config.getCheckPeriod(), discoveryService); } else if (config.getStaticConnectors() != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java index b2e7951adb..1e384c9027 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java @@ -16,9 +16,11 @@ */ package org.apache.activemq.artemis.core.server.routing.pools; +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.core.cluster.DiscoveryEntry; import org.apache.activemq.artemis.core.cluster.DiscoveryGroup; import org.apache.activemq.artemis.core.cluster.DiscoveryListener; +import org.apache.activemq.artemis.core.server.routing.targets.Target; import java.util.HashMap; import java.util.List; @@ -26,16 +28,20 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class DiscoveryGroupService extends DiscoveryService implements DiscoveryListener { - private final DiscoveryGroup discoveryGroup; + private final Target localTarget; + private final DiscoveryGroupConfiguration config; + private DiscoveryGroup discoveryGroup; private final Map entries = new ConcurrentHashMap<>(); - public DiscoveryGroupService(DiscoveryGroup discoveryGroup) { - this.discoveryGroup = discoveryGroup; + public DiscoveryGroupService(Target localTarget, DiscoveryGroupConfiguration config) { + this.localTarget = localTarget; + this.config = config; } @Override public void start() throws Exception { + discoveryGroup = new DiscoveryGroup(localTarget.getNodeID(), config.getName(), config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null); discoveryGroup.registerListener(this); discoveryGroup.start(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java index 9232f9adf0..afa13cc95c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/targets/LocalTarget.java @@ -25,7 +25,7 @@ public class LocalTarget extends AbstractTarget { private final ManagementService managementService; public LocalTarget(TransportConfiguration connector, ActiveMQServer server) { - super(connector, server.getNodeID().toString()); + super(connector, null); this.server = server; this.managementService = server.getManagementService(); @@ -43,7 +43,9 @@ public class LocalTarget extends AbstractTarget { @Override public void connect() throws Exception { - + if (getNodeID() == null) { + setNodeID(server.getNodeID().toString()); + } } @Override diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java index 44bcd05ec8..427eab32ff 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.routing; import java.util.Collections; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration; import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration; import org.apache.activemq.artemis.core.config.routing.PoolConfiguration; @@ -46,7 +45,6 @@ public class ConnectionRouterManagerTest { public void setUp() throws Exception { mockServer = mock(ActiveMQServer.class); - Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID")); underTest = new ConnectionRouterManager(null, mockServer, null); underTest.start(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java index 6e7aa732c1..17f84e80fc 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.routing; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.routing.policies.AbstractPolicy; import org.apache.activemq.artemis.core.server.routing.policies.Policy; @@ -29,7 +28,6 @@ import org.apache.activemq.artemis.core.server.routing.targets.TargetResult; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -41,7 +39,6 @@ public class ConnectionRouterTest { @Before public void setUp() { ActiveMQServer mockServer = mock(ActiveMQServer.class); - Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID")); localTarget = new LocalTarget(null, mockServer); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java index 511e102ef5..d69c1ab6b7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/KeyTypeTest.java @@ -16,9 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.routing; +import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.routing.policies.FirstElementPolicy; import org.apache.activemq.artemis.core.server.routing.policies.Policy; import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactory; @@ -47,6 +51,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; @RunWith(Parameterized.class) public class KeyTypeTest extends RoutingTestBase { @@ -121,6 +126,45 @@ public class KeyTypeTest extends RoutingTestBase { Assert.assertEquals("test", keys.get(0)); } + @Override + protected boolean isForceUniqueStorageManagerIds() { + return false; + } + + @Test + public void testClientIDKeyOnBackup() throws Exception { + setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false); + setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", MessageLoadBalancingType.OFF, 1, true); + setupRouterServerWithCluster(0, KeyType.CLIENT_ID, FirstElementPolicy.NAME, null, true, null, 1, "cluster0"); + setupBackupServer(1, 0, false, HAType.SharedNothingReplication, true); + UDPBroadcastEndpointFactory endpoint = new UDPBroadcastEndpointFactory().setGroupAddress(GROUP_ADDRESS).setGroupPort(GROUP_PORT); + List connectorInfos = getServer(1).getConfiguration().getConnectorConfigurations().keySet().stream().collect(Collectors.toList()); + BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration().setName("bg1").setBroadcastPeriod(1000).setConnectorInfos(connectorInfos).setEndpointFactory(endpoint); + DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(5000).setDiscoveryInitialWaitTimeout(5000).setBroadcastEndpointFactory(endpoint); + getServer(1).getConfiguration().addBroadcastGroupConfiguration(bcConfig).addDiscoveryGroupConfiguration(dcConfig.getName(), dcConfig); + setupDiscoveryClusterConnection("cluster0", 1, "dg1", "queues", MessageLoadBalancingType.OFF, 1, true); + setupRouterServerWithCluster(1, KeyType.CLIENT_ID, MOCK_POLICY_NAME, null, true, null, 1, "cluster0"); + startServers(0, 1); + + waitForTopology(getServer(0), 1, 1); + + getServer(0).fail(true); + + waitForFailoverTopology(1); + + ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST, + TransportConstants.DEFAULT_PORT + 1, "test", null, null); + + keys.clear(); + + try (Connection connection = connectionFactory.createConnection()) { + connection.start(); + } + + Assert.assertEquals(1, keys.size()); + Assert.assertEquals("test", keys.get(0)); + } + @Test public void testSNIHostKey() throws Exception { String localHostname = "localhost.localdomain";