mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-2981 - Connecting to broker using discovery protocol fails
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1076192 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6ebedcc1ed
commit
330c9a896d
|
@ -37,7 +37,7 @@ public class DiscoveryTransportFactory extends FailoverTransportFactory {
|
||||||
public Transport createTransport(CompositeData compositeData) throws IOException {
|
public Transport createTransport(CompositeData compositeData) throws IOException {
|
||||||
Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
|
Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
|
||||||
FailoverTransport failoverTransport = createTransport(parameters);
|
FailoverTransport failoverTransport = createTransport(parameters);
|
||||||
return createTransport(failoverTransport, compositeData);
|
return createTransport(failoverTransport, compositeData, parameters);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,10 +48,9 @@ public class DiscoveryTransportFactory extends FailoverTransportFactory {
|
||||||
* @return a transport that reports discovered brokers to a specific composite transport.
|
* @return a transport that reports discovered brokers to a specific composite transport.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static DiscoveryTransport createTransport(CompositeTransport compositeTransport, CompositeData compositeData) throws IOException {
|
public static DiscoveryTransport createTransport(CompositeTransport compositeTransport, CompositeData compositeData, Map<String, String> parameters) throws IOException {
|
||||||
DiscoveryTransport transport = new DiscoveryTransport(compositeTransport);
|
DiscoveryTransport transport = new DiscoveryTransport(compositeTransport);
|
||||||
|
|
||||||
Map<String, String> parameters = new HashMap<String, String>(compositeData.getParameters());
|
|
||||||
IntrospectionSupport.setProperties(transport, parameters);
|
IntrospectionSupport.setProperties(transport, parameters);
|
||||||
transport.setParameters(parameters);
|
transport.setParameters(parameters);
|
||||||
|
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class FanoutTransportFactory extends TransportFactory {
|
||||||
CompositeData compositeData = URISupport.parseComposite(location);
|
CompositeData compositeData = URISupport.parseComposite(location);
|
||||||
Map<String, String> parameters = compositeData.getParameters();
|
Map<String, String> parameters = compositeData.getParameters();
|
||||||
FanoutTransport fanoutTransport = createTransport(parameters);
|
FanoutTransport fanoutTransport = createTransport(parameters);
|
||||||
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(fanoutTransport, compositeData);
|
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(fanoutTransport, compositeData, parameters);
|
||||||
return discoveryTransport;
|
return discoveryTransport;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -125,7 +125,7 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
|
||||||
CompositeData compositeData = URISupport.parseComposite(uri);
|
CompositeData compositeData = URISupport.parseComposite(uri);
|
||||||
|
|
||||||
StubCompositeTransport compositeTransport = new StubCompositeTransport();
|
StubCompositeTransport compositeTransport = new StubCompositeTransport();
|
||||||
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData);
|
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData, compositeData.getParameters());
|
||||||
|
|
||||||
discoveryTransport.onServiceAdd(new DiscoveryEvent("tcp://localhost:61616"));
|
discoveryTransport.onServiceAdd(new DiscoveryEvent("tcp://localhost:61616"));
|
||||||
assertEquals("expected added URI after discovery event", compositeTransport.getTransportURIs().length, 1);
|
assertEquals("expected added URI after discovery event", compositeTransport.getTransportURIs().length, 1);
|
||||||
|
@ -141,7 +141,7 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
|
||||||
CompositeData compositeData = URISupport.parseComposite(uri);
|
CompositeData compositeData = URISupport.parseComposite(uri);
|
||||||
|
|
||||||
StubCompositeTransport compositeTransport = new StubCompositeTransport();
|
StubCompositeTransport compositeTransport = new StubCompositeTransport();
|
||||||
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData);
|
DiscoveryTransport discoveryTransport = DiscoveryTransportFactory.createTransport(compositeTransport, compositeData, compositeData.getParameters());
|
||||||
|
|
||||||
final String serviceName = "tcp://localhost:61616";
|
final String serviceName = "tcp://localhost:61616";
|
||||||
discoveryTransport.onServiceAdd(new DiscoveryEvent(serviceName));
|
discoveryTransport.onServiceAdd(new DiscoveryEvent(serviceName));
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class DiscoveryUriTest extends EmbeddedBrokerTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testConnect() throws Exception {
|
public void testConnect() throws Exception {
|
||||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test&reconnectDelay=1000&maxReconnectAttempts=30&useExponentialBackOff=false)");
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test)?reconnectDelay=1000&maxReconnectAttempts=30&useExponentialBackOff=false");
|
||||||
Connection conn = factory.createConnection();
|
Connection conn = factory.createConnection();
|
||||||
conn.start();
|
conn.start();
|
||||||
|
|
||||||
|
@ -51,4 +51,15 @@ public class DiscoveryUriTest extends EmbeddedBrokerTestSupport {
|
||||||
Message msg = consumer.receive(1000);
|
Message msg = consumer.receive(1000);
|
||||||
assertNotNull(msg);
|
assertNotNull(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testFailedConnect() throws Exception {
|
||||||
|
try {
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("discovery:(multicast://default?group=test1)?reconnectDelay=1000&maxReconnectAttempts=3&useExponentialBackOff=false");
|
||||||
|
Connection conn = factory.createConnection();
|
||||||
|
conn.start();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
fail("Expected connection failure");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue