mirror of https://github.com/apache/activemq.git
resolve: https://issues.apache.org/activemq/browse/AMQ-2849 - patch applied with thanks
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@979277 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0004917cd8
commit
9ecc679b38
|
@ -74,9 +74,10 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
|
||||||
if (url != null) {
|
if (url != null) {
|
||||||
try {
|
try {
|
||||||
URI uri = new URI(url);
|
URI uri = new URI(url);
|
||||||
serviceURIs.put(event.getServiceName(), uri);
|
|
||||||
LOG.info("Adding new broker connection URL: " + uri);
|
LOG.info("Adding new broker connection URL: " + uri);
|
||||||
next.add(false,new URI[] {URISupport.applyParameters(uri, parameters)});
|
uri = URISupport.applyParameters(uri, parameters);
|
||||||
|
serviceURIs.put(event.getServiceName(), uri);
|
||||||
|
next.add(false,new URI[] {uri});
|
||||||
} catch (URISyntaxException e) {
|
} catch (URISyntaxException e) {
|
||||||
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
|
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,8 +21,10 @@ import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.activemq.transport.CompositeTransport;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportServer;
|
import org.apache.activemq.transport.TransportServer;
|
||||||
|
import org.apache.activemq.transport.failover.FailoverTransport;
|
||||||
import org.apache.activemq.transport.failover.FailoverTransportFactory;
|
import org.apache.activemq.transport.failover.FailoverTransportFactory;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.util.URISupport.CompositeData;
|
import org.apache.activemq.util.URISupport.CompositeData;
|
||||||
|
@ -32,14 +34,30 @@ import org.apache.activemq.util.URISupport.CompositeData;
|
||||||
*/
|
*/
|
||||||
public class DiscoveryTransportFactory extends FailoverTransportFactory {
|
public class DiscoveryTransportFactory extends FailoverTransportFactory {
|
||||||
|
|
||||||
public Transport createTransport(CompositeData compositData) throws IOException {
|
public Transport createTransport(CompositeData compositeData) throws IOException {
|
||||||
Map<String, String> parameters = new HashMap<String, String>(compositData.getParameters());
|
Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
|
||||||
DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters));
|
FailoverTransport failoverTransport = createTransport(parameters);
|
||||||
|
return createTransport(failoverTransport, compositeData);
|
||||||
DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositData.getComponents()[0]);
|
}
|
||||||
transport.setDiscoveryAgent(discoveryAgent);
|
|
||||||
|
/**
|
||||||
|
* Creates a transport that reports discovered brokers to a specific composite transport.
|
||||||
|
*
|
||||||
|
* @param compositeTransport transport to report discovered brokers to
|
||||||
|
* @param compositeData used to apply parameters to this transport
|
||||||
|
* @return a transport that reports discovered brokers to a specific composite transport.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static DiscoveryTransport createTransport(CompositeTransport compositeTransport, CompositeData compositeData) throws IOException {
|
||||||
|
DiscoveryTransport transport = new DiscoveryTransport(compositeTransport);
|
||||||
|
|
||||||
|
Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
|
||||||
IntrospectionSupport.setProperties(transport, parameters);
|
IntrospectionSupport.setProperties(transport, parameters);
|
||||||
transport.setParameters(parameters);
|
transport.setParameters(parameters);
|
||||||
|
|
||||||
|
URI discoveryAgentURI = compositeData.getComponents()[0];
|
||||||
|
DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryAgentURI);
|
||||||
|
transport.setDiscoveryAgent(discoveryAgent);
|
||||||
return transport;
|
return transport;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,9 +27,8 @@ import org.apache.activemq.transport.ResponseCorrelator;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportFactory;
|
import org.apache.activemq.transport.TransportFactory;
|
||||||
import org.apache.activemq.transport.TransportServer;
|
import org.apache.activemq.transport.TransportServer;
|
||||||
import org.apache.activemq.transport.discovery.DiscoveryAgent;
|
|
||||||
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
|
|
||||||
import org.apache.activemq.transport.discovery.DiscoveryTransport;
|
import org.apache.activemq.transport.discovery.DiscoveryTransport;
|
||||||
|
import org.apache.activemq.transport.discovery.DiscoveryTransportFactory;
|
||||||
import org.apache.activemq.util.IntrospectionSupport;
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.util.URISupport;
|
import org.apache.activemq.util.URISupport;
|
||||||
import org.apache.activemq.util.URISupport.CompositeData;
|
import org.apache.activemq.util.URISupport.CompositeData;
|
||||||
|
@ -62,19 +61,14 @@ public class FanoutTransportFactory extends TransportFactory {
|
||||||
* @throws URISyntaxException
|
* @throws URISyntaxException
|
||||||
*/
|
*/
|
||||||
public Transport createTransport(URI location) throws IOException, URISyntaxException {
|
public Transport createTransport(URI location) throws IOException, URISyntaxException {
|
||||||
|
|
||||||
CompositeData compositeData = URISupport.parseComposite(location);
|
CompositeData compositeData = URISupport.parseComposite(location);
|
||||||
Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
|
Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
|
||||||
DiscoveryTransport transport = new DiscoveryTransport(createTransport(parameters));
|
FanoutTransport fanoutTransport = createTransport(parameters);
|
||||||
|
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(fanoutTransport, compositeData);
|
||||||
DiscoveryAgent discoveryAgent = DiscoveryAgentFactory.createDiscoveryAgent(compositeData.getComponents()[0]);
|
return discoveryTransport;
|
||||||
transport.setDiscoveryAgent(discoveryAgent);
|
|
||||||
|
|
||||||
return transport;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public FanoutTransport createTransport(Map parameters) throws IOException {
|
public FanoutTransport createTransport(Map<String,String> parameters) throws IOException {
|
||||||
FanoutTransport transport = new FanoutTransport();
|
FanoutTransport transport = new FanoutTransport();
|
||||||
IntrospectionSupport.setProperties(transport, parameters);
|
IntrospectionSupport.setProperties(transport, parameters);
|
||||||
return transport;
|
return transport;
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class StubCompositeTransport extends StubTransport implements CompositeTransport
|
||||||
|
{
|
||||||
|
private List<URI> transportURIs = new ArrayList<URI>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see org.apache.activemq.transport.CompositeTransport#add(java.net.URI[])
|
||||||
|
*/
|
||||||
|
public void add(boolean rebalance, URI[] uris)
|
||||||
|
{
|
||||||
|
transportURIs.addAll(Arrays.asList(uris));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see org.apache.activemq.transport.CompositeTransport#remove(java.net.URI[])
|
||||||
|
*/
|
||||||
|
public void remove(boolean rebalance, URI[] uris)
|
||||||
|
{
|
||||||
|
transportURIs.removeAll(Arrays.asList(uris));
|
||||||
|
}
|
||||||
|
|
||||||
|
public URI[] getTransportURIs()
|
||||||
|
{
|
||||||
|
return transportURIs.toArray(new URI[0]);
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.transport.discovery;
|
package org.apache.activemq.transport.discovery;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Vector;
|
import java.util.Vector;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
@ -26,6 +27,10 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.CombinationTestSupport;
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.command.DiscoveryEvent;
|
||||||
|
import org.apache.activemq.transport.StubCompositeTransport;
|
||||||
|
import org.apache.activemq.util.URISupport;
|
||||||
|
import org.apache.activemq.util.URISupport.CompositeData;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
@ -112,4 +117,37 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
|
||||||
assertTrue("took at least initialReconnectDelay time: " + duration + " e:" + expected, duration >= initialReconnectDelay);
|
assertTrue("took at least initialReconnectDelay time: " + duration + " e:" + expected, duration >= initialReconnectDelay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSetDiscoveredBrokerProperties() throws Exception {
|
||||||
|
final String extraParameterName = "connectionTimeout";
|
||||||
|
final String extraParameterValue = "3000";
|
||||||
|
final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&" + extraParameterName + "=" + extraParameterValue);
|
||||||
|
CompositeData compositeData = URISupport.parseComposite(uri);
|
||||||
|
|
||||||
|
StubCompositeTransport compositeTransport = new StubCompositeTransport();
|
||||||
|
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData);
|
||||||
|
|
||||||
|
discoveryTransport.onServiceAdd(new DiscoveryEvent("tcp://localhost:61616"));
|
||||||
|
assertEquals("expected added URI after discovery event", compositeTransport.getTransportURIs().length, 1);
|
||||||
|
|
||||||
|
URI discoveredServiceURI = compositeTransport.getTransportURIs()[0];
|
||||||
|
Map<String, String> parameters = URISupport.parseParamters(discoveredServiceURI);
|
||||||
|
assertTrue("unable to add parameter to discovered service", parameters.containsKey(extraParameterName));
|
||||||
|
assertEquals("incorrect value for parameter added to discovered service", parameters.get(extraParameterName), extraParameterValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAddRemoveDiscoveredBroker() throws Exception {
|
||||||
|
final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&connectionTimeout=3000");
|
||||||
|
CompositeData compositeData = URISupport.parseComposite(uri);
|
||||||
|
|
||||||
|
StubCompositeTransport compositeTransport = new StubCompositeTransport();
|
||||||
|
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData);
|
||||||
|
|
||||||
|
final String serviceName = "tcp://localhost:61616";
|
||||||
|
discoveryTransport.onServiceAdd(new DiscoveryEvent(serviceName));
|
||||||
|
assertEquals("expected added URI after discovery event", 1, compositeTransport.getTransportURIs().length);
|
||||||
|
|
||||||
|
discoveryTransport.onServiceRemove(new DiscoveryEvent(serviceName));
|
||||||
|
assertEquals("expected URI removed after discovery event", 0, compositeTransport.getTransportURIs().length);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue