[ARTMIS-1431] Adapt transport configuration in ClientProtocolManagerFactory
add the adaptTransportConfiguration() method to the ClientProtocolManagerFactory so that transport configurations used by the ClientProtocolManager have an opportunity to adapt their transport configuration. This allows the HornetQClientProtocolManagerFactory to adapt the transport configuration received by remote HornetQ broker to replace the HornetQ-based NettyConnectorFactory by the Artemis-based one. JIRA: https://issues.apache.org/jira/browse/ARTEMIS-1431
This commit is contained in:
parent
bb8c11b1e3
commit
0010b0a090
|
@ -1504,7 +1504,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
|
||||
int count = 0;
|
||||
for (TopologyMemberImpl pair : membersCopy) {
|
||||
topologyArrayLocal[count++] = pair.getConnector();
|
||||
Pair<TransportConfiguration, TransportConfiguration> transportConfigs = pair.getConnector();
|
||||
topologyArrayLocal[count++] = new Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()),
|
||||
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
|
||||
}
|
||||
|
||||
this.topologyArray = topologyArrayLocal;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
|
||||
|
@ -49,4 +50,9 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag
|
|||
public ClientProtocolManager newProtocolManager() {
|
||||
return new ActiveMQClientProtocolManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
|
||||
return tc;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.spi.core.remoting;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
|
||||
public interface ClientProtocolManagerFactory {
|
||||
|
@ -25,4 +26,13 @@ public interface ClientProtocolManagerFactory {
|
|||
void setLocator(ServerLocator locator);
|
||||
|
||||
ServerLocator getLocator();
|
||||
|
||||
/**
|
||||
* Adapt the transport configuration passed in parameter and return an adapted one that is suitable to use with ClientProtocolManager
|
||||
* created by this factory.
|
||||
*
|
||||
* @param tc the original TransportConfiguration
|
||||
* @return the adapted TransportConfiguration
|
||||
*/
|
||||
TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc);
|
||||
}
|
||||
|
|
|
@ -17,8 +17,10 @@
|
|||
|
||||
package org.apache.activemq.artemis.core.protocol.hornetq.client;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
|
@ -40,6 +42,27 @@ public class HornetQClientProtocolManagerFactory implements ClientProtocolManage
|
|||
locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Adapt the transport configuration by replacing the factoryClassName corresponding to an HornetQ's NettyConnectorFactory
|
||||
* by the Artemis-based implementation.
|
||||
*/
|
||||
@Override
|
||||
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
|
||||
if (tc == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String factoryClassName = tc.getFactoryClassName();
|
||||
if (factoryClassName.equals("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory")) {
|
||||
factoryClassName = NettyConnectorFactory.class.getName();
|
||||
}
|
||||
TransportConfiguration newConfig = new TransportConfiguration(factoryClassName,
|
||||
tc.getParams(),
|
||||
tc.getName(),
|
||||
tc.getExtraParams());
|
||||
return newConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientProtocolManager newProtocolManager() {
|
||||
return new HornetQClientProtocolManager();
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.cluster;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
|
||||
|
@ -51,6 +52,11 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
|
|||
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
@Override
|
||||
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
|
||||
return tc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientProtocolManager newProtocolManager() {
|
||||
return new ActiveMQReplicationProtocolManager();
|
||||
|
|
|
@ -65,7 +65,11 @@ public class XARecoveryConfig {
|
|||
final ClientProtocolManagerFactory clientProtocolManager) {
|
||||
TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
|
||||
for (int i = 0; i < transportConfiguration.length; i++) {
|
||||
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
|
||||
if (clientProtocolManager != null) {
|
||||
newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig(""));
|
||||
} else {
|
||||
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
|
||||
}
|
||||
}
|
||||
|
||||
this.transportConfiguration = newTransportConfiguration;
|
||||
|
|
Loading…
Reference in New Issue