ARTEMIS-1790 Improve Topology Member Finding

When finding out if a connector belong to a target node it compares
the whole parameter map which is not necessary. Also in understanding
the connector the best place is to delegate it to the corresponding
remoting connection who understands it. (e.g. INVMConnection knows
whether the connector belongs to a target node by checking it's
serverID only. The netty ones only need to match host and port, and
understanding that localhost and 127.0.0.1 are same thing).
This commit is contained in:
Howard Gao 2018-04-11 10:52:52 +08:00 committed by Clebert Suconic
parent 3384d6790e
commit 6818762da8
10 changed files with 194 additions and 9 deletions

View File

@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connector;
import org.jboss.logging.Logger;
@ -372,9 +373,9 @@ public final class Topology {
return topology.get(nodeID);
}
public synchronized TopologyMemberImpl getMember(final TransportConfiguration configuration) {
public synchronized TopologyMemberImpl getMember(final RemotingConnection rc) {
for (TopologyMemberImpl member : topology.values()) {
if (member.isMember(configuration)) {
if (member.isMember(rc)) {
return member;
}
}

View File

@ -105,12 +105,16 @@ public final class TopologyMemberImpl implements TopologyMember {
return connector;
}
/**
* We only need to check if the connection point to the same node,
* don't need to compare the whole params map.
* @param connection The connection to the target node
* @return true if the connection point to the same node
* as this member represents.
*/
@Override
public boolean isMember(RemotingConnection connection) {
TransportConfiguration connectorConfig = connection.getTransportConnection() != null ? connection.getTransportConnection().getConnectorConfig() : null;
return isMember(connectorConfig);
return connection.isSameTarget(getConnector().getA(), getConnector().getB());
}
@Override

View File

@ -511,6 +511,45 @@ public class NettyConnection implements Connection {
return true;
}
@Override
public boolean isSameTarget(TransportConfiguration... configs) {
boolean result = false;
for (TransportConfiguration cfg : configs) {
if (cfg == null) {
continue;
}
if (NettyConnectorFactory.class.getName().equals(cfg.getFactoryClassName())) {
if (configuration.get(TransportConstants.PORT_PROP_NAME).equals(cfg.getParams().get(TransportConstants.PORT_PROP_NAME))) {
//port same, check host
Object hostParam = configuration.get(TransportConstants.HOST_PROP_NAME);
if (hostParam != null) {
if (hostParam.equals(cfg.getParams().get(TransportConstants.HOST_PROP_NAME))) {
result = true;
break;
} else {
//check special 'localhost' case
if (isLocalhost((String) configuration.get(TransportConstants.HOST_PROP_NAME)) && isLocalhost((String) cfg.getParams().get(TransportConstants.HOST_PROP_NAME))) {
result = true;
break;
}
}
} else if (cfg.getParams().get(TransportConstants.HOST_PROP_NAME) == null) {
result = true;
break;
}
}
}
}
return result;
}
//here we consider 'localhost' is equivalent to '127.0.0.1'
//other values of 127.0.0.x is not and the user makes sure
//not to mix use of 'localhost' and '127.0.0.x'
private boolean isLocalhost(String hostname) {
return "127.0.0.1".equals(hostname) || "localhost".equals(hostname);
}
@Override
public final String toString() {
return super.toString() + "[ID=" + getID() + ", local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";

View File

@ -21,6 +21,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@ -244,4 +245,7 @@ public interface RemotingConnection extends BufferHandler {
*/
String getTransportLocalAddress();
default boolean isSameTarget(TransportConfiguration... configs) {
return getTransportConnection().isSameTarget(configs);
}
}

View File

@ -149,4 +149,8 @@ public interface Connection {
* @return
*/
boolean isUsingProtocolHandling();
//returns true if one of the configs points to the same
//node as this connection does.
boolean isSameTarget(TransportConfiguration... configs);
}

View File

@ -280,4 +280,21 @@ public class InVMConnection implements Connection {
return "InVMConnection [serverID=" + serverID + ", id=" + id + "]";
}
@Override
public boolean isSameTarget(TransportConfiguration... configs) {
boolean result = false;
for (TransportConfiguration cfg : configs) {
if (cfg == null) {
continue;
}
if (InVMConnectorFactory.class.getName().equals(cfg.getFactoryClassName())) {
//factory same, get id
if (serverID == (int) cfg.getParams().get(TransportConstants.SERVER_ID_PROP_NAME)) {
result = true;
break;
}
}
}
return result;
}
}

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ha.LiveOnlyPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.jboss.logging.Logger;
public class LiveOnlyActivation extends Activation {
@ -109,8 +110,8 @@ public class LiveOnlyActivation extends Activation {
connectToScaleDownTarget(liveOnlyPolicy.getScaleDownPolicy());
}
TransportConfiguration tc = scaleDownClientSessionFactory == null ? null : scaleDownClientSessionFactory.getConnectorConfiguration();
String nodeID = tc == null ? null : scaleDownClientSessionFactory.getServerLocator().getTopology().getMember(tc).getNodeId();
RemotingConnection rc = scaleDownClientSessionFactory == null ? null : scaleDownClientSessionFactory.getConnection();
String nodeID = rc == null ? null : scaleDownClientSessionFactory.getServerLocator().getTopology().getMember(rc).getNodeId();
if (remotingService != null) {
remotingService.freeze(nodeID, null);
}

View File

@ -307,7 +307,7 @@ public class ScaleDownHandler {
}
private String getTargetNodeId(ClientSessionFactory sessionFactory) {
return sessionFactory.getServerLocator().getTopology().getMember(sessionFactory.getConnectorConfiguration()).getNodeId();
return sessionFactory.getServerLocator().getTopology().getMember(sessionFactory.getConnection()).getNodeId();
}
public void scaleDownTransactions(ClientSessionFactory sessionFactory,

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.unit.core.remoting.impl.invm;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class InVMConnectionTest {
@Test
public void testIsTargetNode() throws Exception {
int serverID = 0;
InVMConnection conn = new InVMConnection(serverID, null, null, null);
Map<String, Object> config0 = new HashMap<>();
config0.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
TransportConfiguration tf0 = new TransportConfiguration(InVMConnectorFactory.class.getName(), config0, "tf0");
Map<String, Object> config1 = new HashMap<>();
config1.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
TransportConfiguration tf1 = new TransportConfiguration(InVMConnectorFactory.class.getName(), config1, "tf1");
Map<String, Object> config2 = new HashMap<>();
config2.put(TransportConstants.SERVER_ID_PROP_NAME, 2);
TransportConfiguration tf2 = new TransportConfiguration(InVMConnectorFactory.class.getName(), config2, "tf2");
assertTrue(conn.isSameTarget(tf0));
assertFalse(conn.isSameTarget(tf1));
assertFalse(conn.isSameTarget(tf2));
assertTrue(conn.isSameTarget(tf0, tf1));
assertTrue(conn.isSameTarget(tf2, tf0));
assertFalse(conn.isSameTarget(tf2, tf1));
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -27,7 +28,9 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
@ -84,6 +87,60 @@ public class NettyConnectionTest extends ActiveMQTestBase {
conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS);
}
@Test
public void testIsTargetNode() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("host", "localhost");
config.put("port", "1234");
Map<String, Object> config1 = new HashMap<>();
config1.put("host", "localhost");
config1.put("port", "1234");
TransportConfiguration tf1 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config1, "tf1");
Map<String, Object> config2 = new HashMap<>();
config2.put("host", "127.0.0.1");
config2.put("port", "1234");
TransportConfiguration tf2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config2, "tf2");
Map<String, Object> config3 = new HashMap<>();
config3.put("host", "otherhost");
config3.put("port", "1234");
TransportConfiguration tf3 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config3, "tf3");
Map<String, Object> config4 = new HashMap<>();
config4.put("host", "127.0.0.1");
config4.put("port", "9999");
TransportConfiguration tf4 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config4, "tf4");
Map<String, Object> config5 = new HashMap<>();
config5.put("host", "127.0.0.2");
config5.put("port", "1234");
TransportConfiguration tf5 = new TransportConfiguration(NettyConnectorFactory.class.getName(), config5, "tf5");
Map<String, Object> config6 = new HashMap<>();
config6.put("host", "127.0.0.2");
config6.put("port", "1234");
TransportConfiguration tf6 = new TransportConfiguration("some.other.FactoryClass", config6, "tf6");
Channel channel = createChannel();
NettyConnection conn = new NettyConnection(config, channel, new MyListener(), false, false);
assertTrue(conn.isSameTarget(tf1));
assertTrue(conn.isSameTarget(tf2));
assertTrue(conn.isSameTarget(tf1, tf2));
assertFalse(conn.isSameTarget(tf3));
assertTrue(conn.isSameTarget(tf3, tf1));
assertTrue(conn.isSameTarget(tf3, tf2));
assertTrue(conn.isSameTarget(tf1, tf3));
assertFalse(conn.isSameTarget(tf4));
assertFalse(conn.isSameTarget(tf5));
assertFalse(conn.isSameTarget(tf4, tf5));
assertFalse(conn.isSameTarget(tf6));
assertTrue(conn.isSameTarget(tf1, tf6));
assertTrue(conn.isSameTarget(tf6, tf2));
}
private static EmbeddedChannel createChannel() {
return new EmbeddedChannel(new ChannelInboundHandlerAdapter());
}