ARTEMIS-4113 Fix NPE for backup brokers with connection routers

The nodeID on backup brokers is available only after they become live.
This commit is contained in:
Domenico Francesco Bruscino 2022-12-10 08:46:37 +01:00 committed by clebertsuconic
parent a2bfdc7d0d
commit ac7b0e85fb
7 changed files with 66 additions and 13 deletions

View File

@ -128,6 +128,10 @@ public class ConnectionRouter implements ActiveMQComponent {
@Override @Override
public void start() throws Exception { public void start() throws Exception {
if (localTarget != null) {
localTarget.getTarget().connect();
}
if (cache != null) { if (cache != null) {
cache.start(); cache.start();
} }
@ -150,6 +154,10 @@ public class ConnectionRouter implements ActiveMQComponent {
if (cache != null) { if (cache != null) {
cache.stop(); cache.stop();
} }
if (localTarget != null) {
localTarget.getTarget().disconnect();
}
} }
public TargetResult getTarget(Connection connection, String clientID, String username) { public TargetResult getTarget(Connection connection, String clientID, String username) {

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.routing;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration; import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.routing.CacheConfiguration; import org.apache.activemq.artemis.core.config.routing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration; import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
@ -142,8 +141,7 @@ public final class ConnectionRouterManager implements ActiveMQComponent {
DiscoveryGroupConfiguration discoveryGroupConfiguration = server.getConfiguration(). DiscoveryGroupConfiguration discoveryGroupConfiguration = server.getConfiguration().
getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName()); getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
DiscoveryService discoveryService = new DiscoveryGroupService(new DiscoveryGroup(server.getNodeID().toString(), config.getDiscoveryGroupName(), DiscoveryService discoveryService = new DiscoveryGroupService(localTarget, discoveryGroupConfiguration);
discoveryGroupConfiguration.getRefreshTimeout(), discoveryGroupConfiguration.getBroadcastEndpointFactory(), null));
pool = new DiscoveryPool(targetFactory, scheduledExecutor, config.getCheckPeriod(), discoveryService); pool = new DiscoveryPool(targetFactory, scheduledExecutor, config.getCheckPeriod(), discoveryService);
} else if (config.getStaticConnectors() != null) { } else if (config.getStaticConnectors() != null) {

View File

@ -16,9 +16,11 @@
*/ */
package org.apache.activemq.artemis.core.server.routing.pools; package org.apache.activemq.artemis.core.server.routing.pools;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry; import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup; import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.cluster.DiscoveryListener; import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -26,16 +28,20 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class DiscoveryGroupService extends DiscoveryService implements DiscoveryListener { public class DiscoveryGroupService extends DiscoveryService implements DiscoveryListener {
private final DiscoveryGroup discoveryGroup; private final Target localTarget;
private final DiscoveryGroupConfiguration config;
private DiscoveryGroup discoveryGroup;
private final Map<String, Entry> entries = new ConcurrentHashMap<>(); private final Map<String, Entry> entries = new ConcurrentHashMap<>();
public DiscoveryGroupService(DiscoveryGroup discoveryGroup) { public DiscoveryGroupService(Target localTarget, DiscoveryGroupConfiguration config) {
this.discoveryGroup = discoveryGroup; this.localTarget = localTarget;
this.config = config;
} }
@Override @Override
public void start() throws Exception { public void start() throws Exception {
discoveryGroup = new DiscoveryGroup(localTarget.getNodeID(), config.getName(), config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
discoveryGroup.registerListener(this); discoveryGroup.registerListener(this);
discoveryGroup.start(); discoveryGroup.start();

View File

@ -25,7 +25,7 @@ public class LocalTarget extends AbstractTarget {
private final ManagementService managementService; private final ManagementService managementService;
public LocalTarget(TransportConfiguration connector, ActiveMQServer server) { public LocalTarget(TransportConfiguration connector, ActiveMQServer server) {
super(connector, server.getNodeID().toString()); super(connector, null);
this.server = server; this.server = server;
this.managementService = server.getManagementService(); this.managementService = server.getManagementService();
@ -43,7 +43,9 @@ public class LocalTarget extends AbstractTarget {
@Override @Override
public void connect() throws Exception { public void connect() throws Exception {
if (getNodeID() == null) {
setNodeID(server.getNodeID().toString());
}
} }
@Override @Override

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.routing;
import java.util.Collections; import java.util.Collections;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration; import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration; import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.routing.PoolConfiguration; import org.apache.activemq.artemis.core.config.routing.PoolConfiguration;
@ -46,7 +45,6 @@ public class ConnectionRouterManagerTest {
public void setUp() throws Exception { public void setUp() throws Exception {
mockServer = mock(ActiveMQServer.class); mockServer = mock(ActiveMQServer.class);
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
underTest = new ConnectionRouterManager(null, mockServer, null); underTest = new ConnectionRouterManager(null, mockServer, null);
underTest.start(); underTest.start();

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.routing;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.routing.policies.AbstractPolicy; import org.apache.activemq.artemis.core.server.routing.policies.AbstractPolicy;
import org.apache.activemq.artemis.core.server.routing.policies.Policy; import org.apache.activemq.artemis.core.server.routing.policies.Policy;
@ -29,7 +28,6 @@ import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
@ -41,7 +39,6 @@ public class ConnectionRouterTest {
@Before @Before
public void setUp() { public void setUp() {
ActiveMQServer mockServer = mock(ActiveMQServer.class); ActiveMQServer mockServer = mock(ActiveMQServer.class);
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
localTarget = new LocalTarget(null, mockServer); localTarget = new LocalTarget(null, mockServer);
} }

View File

@ -16,9 +16,13 @@
*/ */
package org.apache.activemq.artemis.tests.integration.routing; package org.apache.activemq.artemis.tests.integration.routing;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.routing.policies.FirstElementPolicy; import org.apache.activemq.artemis.core.server.routing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.routing.policies.Policy; import org.apache.activemq.artemis.core.server.routing.policies.Policy;
import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactory; import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactory;
@ -47,6 +51,7 @@ import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class KeyTypeTest extends RoutingTestBase { public class KeyTypeTest extends RoutingTestBase {
@ -121,6 +126,45 @@ public class KeyTypeTest extends RoutingTestBase {
Assert.assertEquals("test", keys.get(0)); Assert.assertEquals("test", keys.get(0));
} }
@Override
protected boolean isForceUniqueStorageManagerIds() {
return false;
}
@Test
public void testClientIDKeyOnBackup() throws Exception {
setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
setupDiscoveryClusterConnection("cluster0", 0, "dg1", "queues", MessageLoadBalancingType.OFF, 1, true);
setupRouterServerWithCluster(0, KeyType.CLIENT_ID, FirstElementPolicy.NAME, null, true, null, 1, "cluster0");
setupBackupServer(1, 0, false, HAType.SharedNothingReplication, true);
UDPBroadcastEndpointFactory endpoint = new UDPBroadcastEndpointFactory().setGroupAddress(GROUP_ADDRESS).setGroupPort(GROUP_PORT);
List<String> connectorInfos = getServer(1).getConfiguration().getConnectorConfigurations().keySet().stream().collect(Collectors.toList());
BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration().setName("bg1").setBroadcastPeriod(1000).setConnectorInfos(connectorInfos).setEndpointFactory(endpoint);
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(5000).setDiscoveryInitialWaitTimeout(5000).setBroadcastEndpointFactory(endpoint);
getServer(1).getConfiguration().addBroadcastGroupConfiguration(bcConfig).addDiscoveryGroupConfiguration(dcConfig.getName(), dcConfig);
setupDiscoveryClusterConnection("cluster0", 1, "dg1", "queues", MessageLoadBalancingType.OFF, 1, true);
setupRouterServerWithCluster(1, KeyType.CLIENT_ID, MOCK_POLICY_NAME, null, true, null, 1, "cluster0");
startServers(0, 1);
waitForTopology(getServer(0), 1, 1);
getServer(0).fail(true);
waitForFailoverTopology(1);
ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
TransportConstants.DEFAULT_PORT + 1, "test", null, null);
keys.clear();
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
}
Assert.assertEquals(1, keys.size());
Assert.assertEquals("test", keys.get(0));
}
@Test @Test
public void testSNIHostKey() throws Exception { public void testSNIHostKey() throws Exception {
String localHostname = "localhost.localdomain"; String localHostname = "localhost.localdomain";