This closes #456

This commit is contained in:
Clebert Suconic 2016-04-12 16:15:55 -04:00
commit 03642fc1d8
1 changed files with 50 additions and 44 deletions

View File

@ -146,40 +146,18 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
this.flushExecutor = flushExecutor; this.flushExecutor = flushExecutor;
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName()); ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
// this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory); this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory);
if (config.isResolveProtocols()) { if (config.isResolveProtocols()) {
resolveProtocols(server, this.getClass().getClassLoader()); resolveProtocols(this.getClass().getClassLoader());
if (this.getClass().getClassLoader() != Thread.currentThread().getContextClassLoader()) { if (this.getClass().getClassLoader() != Thread.currentThread().getContextClassLoader()) {
resolveProtocols(server, Thread.currentThread().getContextClassLoader()); resolveProtocols(Thread.currentThread().getContextClassLoader());
} }
} }
if (protocolManagerFactories != null) { if (protocolManagerFactories != null) {
for (ProtocolManagerFactory protocolManagerFactory : protocolManagerFactories) { loadProtocolManagerFactories(protocolManagerFactories);
String[] protocols = protocolManagerFactory.getProtocols();
for (String protocolName : protocols) {
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocolName, protocolManagerFactory.getModuleName());
// protocolMap.put(protocol, protocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors));
protocolMap.put(protocolName, protocolManagerFactory);
}
}
}
}
private void resolveProtocols(ActiveMQServer server, ClassLoader loader) {
ServiceLoader<ProtocolManagerFactory> serviceLoader = ServiceLoader.load(ProtocolManagerFactory.class, loader);
if (serviceLoader != null) {
for (ProtocolManagerFactory next : serviceLoader) {
String[] protocols = next.getProtocols();
for (String protocol : protocols) {
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName());
// protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors), next.filterInterceptors(outgoingInterceptors)));
protocolMap.put(protocol, next);
}
}
} }
} }
@ -277,25 +255,6 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
started = true; started = true;
} }
private void locateProtocols(String protocolList,
Object transportConfig,
Map<String, ProtocolManagerFactory> protocolMap) {
String[] protocolsSplit = protocolList.split(",");
if (protocolsSplit != null) {
for (String protocolItem : protocolsSplit) {
ProtocolManagerFactory protocolManagerFactory = protocolMap.get(protocolItem);
if (protocolManagerFactory == null) {
ActiveMQServerLogger.LOGGER.noProtocolManagerFound(protocolItem, transportConfig.toString());
}
else {
protocolMap.put(protocolItem, protocolManagerFactory);
}
}
}
}
@Override @Override
public synchronized void startAcceptors() throws Exception { public synchronized void startAcceptors() throws Exception {
if (isStarted()) { if (isStarted()) {
@ -742,4 +701,51 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
} }
} }
/**
* Locates protocols from the internal default map and moves them into the input protocol map.
*
* @param protocolList
* @param transportConfig
* @param protocolMap
*/
private void locateProtocols(String protocolList,
Object transportConfig,
Map<String, ProtocolManagerFactory> protocolMap) {
String[] protocolsSplit = protocolList.split(",");
for (String protocolItem : protocolsSplit) {
ProtocolManagerFactory protocolManagerFactory = this.protocolMap.get(protocolItem);
if (protocolManagerFactory == null) {
ActiveMQServerLogger.LOGGER.noProtocolManagerFound(protocolItem, transportConfig.toString());
}
else {
protocolMap.put(protocolItem, protocolManagerFactory);
}
}
}
/**
* Finds protocol support from a given classloader.
* @param loader
*/
private void resolveProtocols(ClassLoader loader) {
ServiceLoader<ProtocolManagerFactory> serviceLoader = ServiceLoader.load(ProtocolManagerFactory.class, loader);
loadProtocolManagerFactories(serviceLoader);
}
/**
* Loads the protocols found into a map.
* @param protocolManagerFactoryCollection
*/
private void loadProtocolManagerFactories(Iterable<ProtocolManagerFactory> protocolManagerFactoryCollection) {
for (ProtocolManagerFactory next : protocolManagerFactoryCollection) {
String[] protocols = next.getProtocols();
for (String protocol : protocols) {
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName());
protocolMap.put(protocol, next);
}
}
}
} }