apply parameters from discoveryURI to subsequent network connections so that options like inactivityTimeout can be configured on all discovered uris

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@805361 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-08-18 11:13:08 +00:00
parent 068f85bce9
commit 39e63c0051
6 changed files with 177 additions and 20 deletions

View File

@ -246,8 +246,12 @@
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock</artifactId>
<version>${jmock-version}</version>
<artifactId>jmock-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-legacy</artifactId>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.SslContext;
@ -29,8 +30,10 @@ import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -46,7 +49,8 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
private DiscoveryAgent discoveryAgent;
private ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
private Map<String, String> parameters;
public DiscoveryNetworkConnector() {
}
@ -56,6 +60,14 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
public void setUri(URI discoveryURI) throws IOException {
setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
try {
parameters = URISupport.parseParamters(discoveryURI);
// allow discovery agent to grab it's parameters
IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters);
} catch (URISyntaxException e) {
LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e);
}
}
public void onServiceAdd(DiscoveryEvent event) {
@ -83,6 +95,11 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return;
}
URI connectUri = uri;
try {
connectUri = URISupport.applyParameters(connectUri, parameters);
} catch (URISyntaxException e) {
LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
}
LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
Transport remoteTransport;
@ -93,7 +110,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
try {
remoteTransport = TransportFactory.connect(connectUri);
} catch (Exception e) {
LOG.warn("Could not connect to remote URI: " + localURIName + ": " + e.getMessage());
LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage());
LOG.debug("Connection failure exception: " + e, e);
return;
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -74,27 +75,13 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
URI uri = new URI(url);
serviceURIs.put(event.getServiceName(), uri);
LOG.info("Adding new broker connection URL: " + uri);
next.add(new URI[] {applyParameters(uri)});
next.add(new URI[] {URISupport.applyParameters(uri, parameters)});
} catch (URISyntaxException e) {
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
}
}
}
private URI applyParameters(URI uri) throws URISyntaxException {
if (parameters != null && !parameters.isEmpty()) {
StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ;
for ( Map.Entry<String, String> param: parameters.entrySet()) {
if (newQuery.length()!=0) {
newQuery.append(';');
}
newQuery.append(param.getKey()).append('=').append(param.getValue());
}
uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString(), uri.getFragment());
}
return uri;
}
public void onServiceRemove(DiscoveryEvent event) {
URI uri = serviceURIs.get(event.getServiceName());
if (uri != null) {

View File

@ -27,7 +27,6 @@ import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;

View File

@ -127,6 +127,20 @@ public class URISupport {
return uri.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(uri.getQuery(), "?"));
}
public static URI applyParameters(URI uri, Map<String, String> queryParameters) throws URISyntaxException {
if (queryParameters != null && !queryParameters.isEmpty()) {
StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ;
for ( Map.Entry<String, String> param: queryParameters.entrySet()) {
if (newQuery.length()!=0) {
newQuery.append('&');
}
newQuery.append(param.getKey()).append('=').append(param.getValue());
}
uri = createURIWithQuery(uri, newQuery.toString());
}
return uri;
}
@SuppressWarnings("unchecked")
private static Map<String, String> emptyMap() {
return Collections.EMPTY_MAP;

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery;
import static org.junit.Assert.*;
import java.net.URI;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
import org.jmock.integration.junit4.JUnit4Mockery;
import org.jmock.lib.legacy.ClassImposteriser;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(JMock.class)
public class DiscoveryNetworkReconnectTest {
private static final Log LOG = LogFactory.getLog(DiscoveryNetworkReconnectTest.class);
BrokerService brokerA, brokerB;
Mockery context;
ManagementContext managementContext;
final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest";
final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=600";
private DiscoveryAgent agent;
@Before
public void setUp() throws Exception {
context = new JUnit4Mockery() {{
setImposteriser(ClassImposteriser.INSTANCE);
}};
brokerA = new BrokerService();
brokerA.setBrokerName("BrokerA");
configure(brokerA);
brokerA.addConnector("tcp://localhost:0");
brokerA.start();
}
private void configure(BrokerService broker) {
broker.setPersistent(false);
broker.setUseJmx(true);
}
@Test
public void testReconnect() throws Exception {
final SocketProxy proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
// control multicast publish advertise agent to inject proxy
agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress));
agent.registerService(proxy.getUrl().toString());
agent.start();
managementContext = context.mock(ManagementContext.class);
context.checking(new Expectations(){{
allowing (managementContext).getJmxDomainName(); will (returnValue("Test"));
allowing (managementContext).start();
allowing (managementContext).stop();
allowing (managementContext).unregisterMBean(with(any(ObjectName.class)));
// expected MBeans
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost"))));
allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
// due to reconnect we get two registrations
atLeast(2).of (managementContext).registerMBean(with(any(Object.class)), with(equal(
new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"
+ proxy.getUrl().getPort()))));
}});
brokerB = new BrokerService();
brokerB.setManagementContext(managementContext);
brokerB.setBrokerName("BrokerNC");
configure(brokerB);
brokerB.addNetworkConnector(discoveryAddress + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000&trace=true");
brokerB.start();
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return proxy.connections.size() == 1;
}
});
// force an inactivity timeout timeout
proxy.pause();
// wait for the inactivity timeout
Thread.sleep(2000);
// let a reconnect succeed
proxy.goOn();
assertTrue("got a reconnect", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return proxy.connections.size() == 1;
}
}));
brokerB.stop();
// let mockery validate minimal duplicate mbean registrations
}
}