This commit is contained in:
Clebert Suconic 2017-09-25 10:33:56 -04:00
commit d170639bfe
6 changed files with 53 additions and 2 deletions

View File

@ -1504,7 +1504,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
int count = 0; int count = 0;
for (TopologyMemberImpl pair : membersCopy) { for (TopologyMemberImpl pair : membersCopy) {
topologyArrayLocal[count++] = pair.getConnector(); Pair<TransportConfiguration, TransportConfiguration> transportConfigs = pair.getConnector();
topologyArrayLocal[count++] = new Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()),
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
} }
this.topologyArray = topologyArrayLocal; this.topologyArray = topologyArrayLocal;

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.protocol.core.impl; package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
@ -49,4 +50,9 @@ public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManag
public ClientProtocolManager newProtocolManager() { public ClientProtocolManager newProtocolManager() {
return new ActiveMQClientProtocolManager(); return new ActiveMQClientProtocolManager();
} }
@Override
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
return tc;
}
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.spi.core.remoting; package org.apache.activemq.artemis.spi.core.remoting;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
public interface ClientProtocolManagerFactory { public interface ClientProtocolManagerFactory {
@ -25,4 +26,13 @@ public interface ClientProtocolManagerFactory {
void setLocator(ServerLocator locator); void setLocator(ServerLocator locator);
ServerLocator getLocator(); ServerLocator getLocator();
/**
* Adapt the transport configuration passed in parameter and return an adapted one that is suitable to use with ClientProtocolManager
* created by this factory.
*
* @param tc the original TransportConfiguration
* @return the adapted TransportConfiguration
*/
TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc);
} }

View File

@ -17,8 +17,10 @@
package org.apache.activemq.artemis.core.protocol.hornetq.client; package org.apache.activemq.artemis.core.protocol.hornetq.client;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor; import org.apache.activemq.artemis.core.protocol.hornetq.HQPropertiesConversionInterceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory;
import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Component;
@ -40,6 +42,27 @@ public class HornetQClientProtocolManagerFactory implements ClientProtocolManage
locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false)); locator.addOutgoingInterceptor(new HQPropertiesConversionInterceptor(false));
} }
/**
* Adapt the transport configuration by replacing the factoryClassName corresponding to an HornetQ's NettyConnectorFactory
* by the Artemis-based implementation.
*/
@Override
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
if (tc == null) {
return null;
}
String factoryClassName = tc.getFactoryClassName();
if (factoryClassName.equals("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory")) {
factoryClassName = NettyConnectorFactory.class.getName();
}
TransportConfiguration newConfig = new TransportConfiguration(factoryClassName,
tc.getParams(),
tc.getName(),
tc.getExtraParams());
return newConfig;
}
@Override @Override
public ClientProtocolManager newProtocolManager() { public ClientProtocolManager newProtocolManager() {
return new HornetQClientProtocolManager(); return new HornetQClientProtocolManager();

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.server.cluster; package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder; import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
@ -51,6 +52,11 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
private static final long serialVersionUID = 1; private static final long serialVersionUID = 1;
@Override
public TransportConfiguration adaptTransportConfiguration(TransportConfiguration tc) {
return tc;
}
@Override @Override
public ClientProtocolManager newProtocolManager() { public ClientProtocolManager newProtocolManager() {
return new ActiveMQReplicationProtocolManager(); return new ActiveMQReplicationProtocolManager();

View File

@ -65,7 +65,11 @@ public class XARecoveryConfig {
final ClientProtocolManagerFactory clientProtocolManager) { final ClientProtocolManagerFactory clientProtocolManager) {
TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length]; TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
for (int i = 0; i < transportConfiguration.length; i++) { for (int i = 0; i < transportConfiguration.length; i++) {
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig(""); if (clientProtocolManager != null) {
newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig(""));
} else {
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
}
} }
this.transportConfiguration = newTransportConfiguration; this.transportConfiguration = newTransportConfiguration;