ARTEMIS-4639 NPEs when TopologyMember's primary is null

This commit is contained in:
Justin Bertram 2024-02-07 09:20:16 -06:00 committed by Robbie Gemmell
parent 311e9382a2
commit 20840cfdf1
4 changed files with 182 additions and 10 deletions

View File

@ -232,6 +232,13 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
@Override
public void nodeUP(TopologyMember member, boolean last) {
if (member.getPrimary() == null) {
if (logger.isTraceEnabled()) {
logger.trace("{} ignoring nodeUP call due to null primary; topologyMember={}, last={}", this, member, last);
}
return;
}
if (topologyMap.put(member.getNodeId(), member) == null) {
updateClientClusterInfo();
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.concurrent.Executors;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.junit.Test;
import org.mockito.Mockito;
public class OpenWireProtocolManagerTest {
@Test
public void testNullPrimaryOnNodeUp() throws Exception {
ArtemisExecutor executor = ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())));
try {
ClusterManager clusterManager = Mockito.mock(ClusterManager.class);
ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
StorageManager storageManager = new NullStorageManager();
Mockito.when(server.getStorageManager()).thenReturn(storageManager);
Mockito.when(server.newOperationContext()).thenReturn(storageManager.newContext(executor));
Mockito.when(server.getClusterManager()).thenReturn(clusterManager);
Mockito.when(clusterManager.getDefaultConnection(Mockito.any())).thenReturn(null);
SecurityStore securityStore = Mockito.mock(SecurityStore.class);
Mockito.when(server.getSecurityStore()).thenReturn(securityStore);
Mockito.when(securityStore.authenticate(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(null);
ServerSession serverSession = Mockito.mock(ServerSession.class);
Mockito.when(serverSession.getName()).thenReturn("session");
Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any(), Mockito.anyBoolean());
OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server, null, null);
openWireProtocolManager.setSecurityDomain("securityDomain");
openWireProtocolManager.setSupportAdvisory(false);
Connection connection = Mockito.mock(Connection.class);
Mockito.doReturn(new ChannelBufferWrapper(Unpooled.buffer(1024))).when(connection).createTransportBuffer(Mockito.anyInt());
OpenWireConnection openWireConnection = new OpenWireConnection(connection, server, openWireProtocolManager, openWireProtocolManager.wireFormat(), executor);
ConnectionInfo connectionInfo = new ConnectionInfo(new ConnectionId("1:1"));
connectionInfo.setClientId(RandomUtil.randomString());
openWireProtocolManager.addConnection(openWireConnection, connectionInfo);
TopologyMember topologyMember = new TopologyMemberImpl(RandomUtil.randomString(), null, null, null, null);
openWireProtocolManager.nodeUP(topologyMember, false);
} finally {
executor.shutdown();
}
}
}

View File

@ -761,16 +761,12 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
return;
}
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowDirectConnectionsOnly && !allowableConnections.contains(topologyMember.getPrimary().newTransportConfig(TRANSPORT_CONFIG_NAME))) {
return;
}
// FIXME required to prevent cluster connections w/o discovery group
// and empty static connectors to create bridges... ulgy!
if (serverLocator == null) {
return;
}
/*we don't create bridges to backups*/
if (topologyMember.getPrimary() == null) {
if (logger.isTraceEnabled()) {
@ -779,6 +775,11 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
return;
}
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowDirectConnectionsOnly && !allowableConnections.contains(topologyMember.getPrimary().newTransportConfig(TRANSPORT_CONFIG_NAME))) {
return;
}
synchronized (recordsGuard) {
try {
MessageFlowRecord record = records.get(nodeID);

View File

@ -17,25 +17,33 @@
package org.apache.activemq.artemis.core.server.cluster.impl;
import java.util.concurrent.Executors;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.tests.util.ServerTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.Executors;
public class ClusterConnectionImplMockTest extends ServerTestBase {
/**
* Verification for the fix https://issues.apache.org/jira/browse/ARTEMIS-1946
*/
/**
* Verification for the fix https://issues.apache.org/jira/browse/ARTEMIS-1946
*/
@Test
public void testRemvalOfLocalParameters() throws Exception {
TransportConfiguration tc = new TransportConfiguration();
@ -85,6 +93,25 @@ public class ClusterConnectionImplMockTest extends ServerTestBase {
}
@Test
public void testNullPrimaryOnNodeUp() throws Exception {
TransportConfiguration tc = new TransportConfiguration();
tc.setFactoryClassName("mock");
tc.getParams().put(TransportConstants.LOCAL_ADDRESS_PROP_NAME, "localAddress");
tc.getParams().put(TransportConstants.LOCAL_PORT_PROP_NAME, "localPort");
ArtemisExecutor executor = ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())));
try {
ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L, 0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0, new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null, null, true, 0, 0);
TopologyMember topologyMember = new TopologyMemberImpl(RandomUtil.randomString(), null, null, null, null);
cci.nodeUP(topologyMember, false);
} finally {
executor.shutdownNow();
}
}
static final class MockServer extends ActiveMQServerImpl {
@Override
@ -117,4 +144,61 @@ public class ClusterConnectionImplMockTest extends ServerTestBase {
};
}
}
protected final class FakeNodeManager extends NodeManager {
public FakeNodeManager(String nodeID) {
super(false);
this.setNodeID(nodeID);
}
@Override
public void awaitPrimaryNode() {
}
@Override
public void awaitActiveStatus() {
}
@Override
public void startBackup() {
}
@Override
public ActivateCallback startPrimaryNode() {
return new CleaningActivateCallback() {
};
}
@Override
public void pausePrimaryServer() {
}
@Override
public void crashPrimaryServer() {
}
@Override
public void releaseBackup() {
}
@Override
public SimpleString readNodeId() {
return null;
}
@Override
public boolean isAwaitingFailback() {
return false;
}
@Override
public boolean isBackupActive() {
return false;
}
@Override
public void interrupt() {
}
}
}