resolve https://issues.apache.org/activemq/browse/AMQ-2283. In suppressing asyncColos I found the need to suppress brodcast of query parameters by the multicast discovery agent as server side and client side options are not compatible. Added support to the DiscoveryTransport to apply query parameters to each uri that it discovers.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@792127 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-07-08 13:00:44 +00:00
parent 2a6c87259c
commit bd1b10c207
5 changed files with 93 additions and 5 deletions

View File

@ -240,7 +240,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
DiscoveryAgent da = getDiscoveryAgent();
if (da != null) {
da.registerService(getConnectUri().toString());
da.registerService(getPublishableConnectString());
da.start();
}
if (enableStatusMonitor) {
@ -251,6 +251,20 @@ public class TransportConnector implements Connector, BrokerServiceAware {
LOG.info("Connector " + getName() + " Started");
}
private String getPublishableConnectString() throws Exception {
URI connectUri = getConnectUri();
String publishableConnectString = connectUri.toString();
// strip off server side query parameters which may not be compatible to clients
if (connectUri.getRawQuery() != null) {
publishableConnectString =
publishableConnectString.substring(0, publishableConnectString.indexOf(connectUri.getRawQuery()) -1);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + connectUri);
}
return publishableConnectString;
}
public void stop() throws Exception {
ServiceStopper ss = new ServiceStopper();
if (discoveryAgent != null) {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.discovery;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.DiscoveryEvent;
@ -41,6 +42,8 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
private DiscoveryAgent discoveryAgent;
private final ConcurrentHashMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
private Map<String, String> parameters;
public DiscoveryTransport(CompositeTransport next) {
super(next);
this.next = next;
@ -71,13 +74,27 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
URI uri = new URI(url);
serviceURIs.put(event.getServiceName(), uri);
LOG.info("Adding new broker connection URL: " + uri);
next.add(new URI[] {uri});
next.add(new URI[] {applyParameters(uri)});
} catch (URISyntaxException e) {
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
}
}
}
private URI applyParameters(URI uri) throws URISyntaxException {
if (parameters != null && !parameters.isEmpty()) {
StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ;
for ( Map.Entry<String, String> param: parameters.entrySet()) {
if (newQuery.length()!=0) {
newQuery.append(';');
}
newQuery.append(param.getKey()).append('=').append(param.getValue());
}
uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString(), uri.getFragment());
}
return uri;
}
public void onServiceRemove(DiscoveryEvent event) {
URI uri = serviceURIs.get(event.getServiceName());
if (uri != null) {
@ -93,4 +110,8 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
this.discoveryAgent = discoveryAgent;
}
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
}
}

View File

@ -39,7 +39,7 @@ public class DiscoveryTransportFactory extends FailoverTransportFactory {
DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositData.getComponents()[0]);
transport.setDiscoveryAgent(discoveryAgent);
IntrospectionSupport.setProperties(transport, parameters);
transport.setParameters(parameters);
return transport;
}

View File

@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -176,7 +177,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
private long lastAdvertizeTime;
private AtomicBoolean started = new AtomicBoolean(false);
private boolean reportAdvertizeFailed = true;
private Executor executor = null;
private ExecutorService executor = null;
/**
* Set the discovery listener
@ -315,6 +316,8 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
if (mcast != null) {
mcast.close();
}
runner.interrupt();
getExecutor().shutdownNow();
}
}
@ -488,7 +491,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
}
}
private Executor getExecutor() {
private ExecutorService getExecutor() {
if (executor == null) {
final String threadName = "Notifier-" + this.toString();
executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {

View File

@ -16,11 +16,16 @@
*/
package org.apache.activemq.transport.discovery;
import java.net.URI;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -28,6 +33,51 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
private static final Log LOG = LogFactory.getLog(DiscoveryTransportNoBrokerTest.class);
public void testNoExtraThreads() throws Exception {
BrokerService broker = new BrokerService();
TransportConnector tcp = broker.addConnector("tcp://localhost:0?transport.closeAsync=false");
String group = "GR-" + System.currentTimeMillis();
URI discoveryUri = new URI("multicast://default?group=" + group);
tcp.setDiscoveryUri(discoveryUri);
broker.start();
broker.waitUntilStarted();
Vector<String> existingNames = new Vector<String>();
Thread[] threads = getThreads();
for (Thread t : threads) {
existingNames.add(t.getName());
}
final int idleThreadCount = threads.length;
LOG.info("Broker started - thread Count:" + idleThreadCount);
final int noConnectionToCreate = 10;
for (int i=0; i<10;i++) {
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory("discovery:(multicast://239.255.2.3:6155?group=" + group +")?closeAsync=false");
LOG.info("Connecting.");
Connection connection = factory.createConnection();
connection.setClientID("test");
connection.close();
}
Thread.sleep(2000);
threads = getThreads();
for (Thread t : threads) {
if (!existingNames.contains(t.getName())) {
LOG.info("Remaining thread:" + t);
}
}
assertTrue("no extra threads per connection", Thread.activeCount() - idleThreadCount < noConnectionToCreate);
}
private Thread[] getThreads() {
Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
return threads;
}
public void testMaxReconnectAttempts() throws JMSException {
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://doesNOTexist)");