[ARTMIS-1431] Adapt transport configuration in ClientProtocolManagerFactory

add the adaptTransportConfiguration() method to the
ClientProtocolManagerFactory so that transport configurations used by
the ClientProtocolManager have an opportunity to adapt their transport
configuration.

This allows the HornetQClientProtocolManagerFactory to adapt the
transport configuration received by remote HornetQ broker to replace the
HornetQ-based NettyConnectorFactory by the Artemis-based one.

JIRA: https://issues.apache.org/jira/browse/ARTEMIS-1431
This commit is contained in:
Jeff Mesnil 2017-09-22 16:35:36 +02:00
parent eacaffe16a
commit 492022618d
6 changed files with 49 additions and 2 deletions

View File

@ -1500,7 +1500,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
int count = 0;
for (TopologyMemberImpl pair : membersCopy) {
topologyArrayLocal[count++] = pair.getConnector();
Pair<TransportConfiguration, TransportConfiguration> transportConfigs = pair.getConnector();
topologyArrayLocal[count++] = new Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()),
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
}
this.topologyArray = topologyArrayLocal;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
@ -49,4 +50,9 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag
public ClientProtocolManager newProtocolManager() {
return new ActiveMQClientProtocolManager();
}
@Override
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
return tc;
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.spi.core.remoting;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public interface ClientProtocolManagerFactory {
@ -25,4 +26,13 @@ public interface ClientProtocolManagerFactory {
void setLocator(ServerLocator locator);
ServerLocator getLocator();
/**
* Adapt the transport configuration passed in parameter and return an adapted one that is suitable to use with ClientProtocolManager
* created by this factory.
*
* @param tc the original TransportConfiguration
* @return the adapted TransportConfiguration
*/
TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc);
}

View File

@ -17,8 +17,10 @@
package org.apache.activemq.artemis.core.protocol.hornetq.client;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
import org.osgi.service.component.annotations.Component;
@ -40,6 +42,27 @@ public class HornetQClientProtocolManagerFactory implements ClientProtocolManage
locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false));
}
/**
* Adapt the transport configuration by replacing the factoryClassName corresponding to an HornetQ's NettyConnectorFactory
* by the Artemis-based implementation.
*/
@Override
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
if (tc == null) {
return null;
}
String factoryClassName = tc.getFactoryClassName();
if (factoryClassName.equals("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory")) {
factoryClassName = NettyConnectorFactory.class.getName();
}
TransportConfiguration newConfig = new TransportConfiguration(factoryClassName,
tc.getParams(),
tc.getName(),
tc.getExtraParams());
return newConfig;
}
@Override
public ClientProtocolManager newProtocolManager() {
return new HornetQClientProtocolManager();

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
@ -51,6 +52,11 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
private static final long serialVersionUID = 1;
@Override
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
return tc;
}
@Override
public ClientProtocolManager newProtocolManager() {
return new ActiveMQReplicationProtocolManager();

View File

@ -65,7 +65,7 @@ public class XARecoveryConfig {
final ClientProtocolManagerFactory clientProtocolManager) {
TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
for (int i = 0; i < transportConfiguration.length; i++) {
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig(""));
}
this.transportConfiguration = newTransportConfiguration;