From 5d99c99ab72fb500d66e07fe346dfd64ce7460ac Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 17 Apr 2008 18:09:37 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1670 you should now able able to add transports before configuring the reset of the broker properties. Also slightly changed the TransportFactory interface so that you are not forced to supply a brokerId to bind a transport. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@649211 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 10 +- .../activemq/broker/SslBrokerService.java | 2 +- .../activemq/broker/TransportConnection.java | 1 - .../activemq/broker/TransportConnector.java | 130 +++++++++--------- .../broker/jmx/ManagedTransportConnector.java | 4 +- .../apache/activemq/proxy/ProxyConnector.java | 2 +- .../activemq/transport/TransportFactory.java | 19 ++- .../discovery/DiscoveryTransportFactory.java | 2 +- .../failover/FailoverTransportFactory.java | 2 +- .../fanout/FanoutTransportFactory.java | 2 +- .../transport/mock/MockTransportFactory.java | 2 +- .../transport/peer/PeerTransportFactory.java | 2 +- .../transport/tcp/SslTransportFactory.java | 2 +- .../transport/tcp/TcpTransportFactory.java | 2 +- .../transport/udp/UdpTransportFactory.java | 2 +- .../transport/vm/VMTransportFactory.java | 5 +- .../org/apache/activemq/AMQDeadlockTest3.java | 1 - .../apache/activemq/ClientTestSupport.java | 4 +- .../activemq/network/NetworkTestSupport.java | 35 +++-- .../transport/tcp/InactivityMonitorTest.java | 2 +- .../tcp/SslTransportFactoryTest.java | 2 +- .../tcp/WireformatNegociationTest.java | 2 +- .../udp/UdpTransportUsingServerTest.java | 2 +- .../usecases/AMQDeadlockTestW4Brokers.java | 1 - .../activemq/usecases/AMQFailoverIssue.java | 1 - .../usecases/AMQStackOverFlowTest.java | 1 - 26 files changed, 117 insertions(+), 123 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 2af616835e..138ec446b2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -203,7 +203,7 @@ public class BrokerService implements Service { * @throws Exception */ public TransportConnector addConnector(URI bindAddress) throws Exception { - return addConnector(createTransportConnector(getBroker(), bindAddress)); + return addConnector(createTransportConnector(bindAddress)); } /** @@ -213,7 +213,7 @@ public class BrokerService implements Service { * @throws Exception */ public TransportConnector addConnector(TransportServer transport) throws Exception { - return addConnector(new TransportConnector(getBroker(), transport)); + return addConnector(new TransportConnector(transport)); } /** @@ -1665,9 +1665,9 @@ public class BrokerService implements Service { } } - protected TransportConnector createTransportConnector(Broker broker, URI brokerURI) throws Exception { + protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { TransportServer transport = TransportFactory.bind(this, brokerURI); - return new TransportConnector(broker, transport); + return new TransportConnector(transport); } /** @@ -1825,8 +1825,6 @@ public class BrokerService implements Service { } protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception { - connector.setBroker(getBroker()); - connector.setBrokerName(getBrokerName()); connector.setTaskRunnerFactory(getTaskRunnerFactory()); MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); if (policy != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/SslBrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/SslBrokerService.java index 194641c499..c09995553d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/SslBrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/SslBrokerService.java @@ -92,7 +92,7 @@ public class SslBrokerService extends BrokerService { SslTransportFactory transportFactory = new SslTransportFactory(); transportFactory.setKeyAndTrustManagers(km, tm, random); - return transportFactory.doBind(getBrokerName(), brokerURI); + return transportFactory.doBind(brokerURI); } else { // Else, business as usual. return TransportFactory.bind(this, brokerURI); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 8f13463989..db68c94789 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -170,7 +170,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.statistics.setParent(connector.getStatistics()); } this.taskRunnerFactory = taskRunnerFactory; - connector.setBrokerName(broker.getBrokerName()); this.transport = transport; this.transport.setTransportListener(new DefaultTransportListener() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index 67138c5078..73558711f7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -52,7 +52,6 @@ public class TransportConnector implements Connector, BrokerServiceAware { protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); protected TransportStatusDetector statusDector; - private Broker broker; private BrokerService brokerService; private TransportServer server; private URI uri; @@ -66,13 +65,13 @@ public class TransportConnector implements Connector, BrokerServiceAware { private String name; private boolean disableAsyncDispatch; private boolean enableStatusMonitor = false; + private Broker broker; public TransportConnector() { } - public TransportConnector(Broker broker, TransportServer server) { + public TransportConnector(TransportServer server) { this(); - setBroker(broker); setServer(server); if (server != null && server.getConnectURI() != null) { URI uri = server.getConnectURI(); @@ -96,8 +95,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { * connector */ public ManagedTransportConnector asManagedConnector(MBeanServer mbeanServer, ObjectName connectorName) throws IOException, URISyntaxException { - ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getBroker(), getServer()); - //rc.setBroker(getBroker()); + ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getServer()); rc.setBrokerInfo(getBrokerInfo()); rc.setConnectUri(getConnectUri()); rc.setDisableAsyncDispatch(isDisableAsyncDispatch()); @@ -106,9 +104,9 @@ public class TransportConnector implements Connector, BrokerServiceAware { rc.setEnableStatusMonitor(isEnableStatusMonitor()); rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); rc.setName(getName()); - //rc.setServer(getServer()); rc.setTaskRunnerFactory(getTaskRunnerFactory()); rc.setUri(getUri()); + rc.setBrokerService(brokerService); return rc; } @@ -127,59 +125,8 @@ public class TransportConnector implements Connector, BrokerServiceAware { return server; } - public Broker getBroker() { - return broker; - } - - public void setBroker(Broker broker) { - this.broker = broker; - brokerInfo.setBrokerId(broker.getBrokerId()); - brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); - brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); - } - - public void setBrokerName(String brokerName) { - brokerInfo.setBrokerName(brokerName); - } - public void setServer(TransportServer server) { this.server = server; - this.brokerInfo.setBrokerURL(server.getConnectURI().toString()); - this.server.setAcceptListener(new TransportAcceptListener() { - public void onAccept(final Transport transport) { - try { - // Starting the connection could block due to - // wireformat negotiation, so start it in an async thread. - Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress()) { - public void run() { - try { - Connection connection = createConnection(transport); - connection.start(); - } catch (Exception e) { - ServiceSupport.dispose(transport); - onAcceptError(e); - } - } - }; - startThread.setPriority(4); - startThread.start(); - } catch (Exception e) { - String remoteHost = transport.getRemoteAddress(); - ServiceSupport.dispose(transport); - onAcceptError(e, remoteHost); - } - } - - public void onAcceptError(Exception error) { - onAcceptError(error, null); - } - - private void onAcceptError(Exception error, String remoteHost) { - LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " + error.getMessage()); - LOG.debug("Reason: " + error.getMessage(), error); - } - }); - this.server.setBrokerInfo(brokerInfo); } public URI getUri() { @@ -232,7 +179,54 @@ public class TransportConnector implements Connector, BrokerServiceAware { } public void start() throws Exception { - getServer().start(); + + TransportServer server = getServer(); + + broker = brokerService.getBroker(); + brokerInfo.setBrokerName(broker.getBrokerName()); + brokerInfo.setBrokerId(broker.getBrokerId()); + brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); + brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); + brokerInfo.setBrokerURL(server.getConnectURI().toString()); + + server.setAcceptListener(new TransportAcceptListener() { + public void onAccept(final Transport transport) { + try { + // Starting the connection could block due to + // wireformat negotiation, so start it in an async thread. + Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress()) { + public void run() { + try { + Connection connection = createConnection(transport); + connection.start(); + } catch (Exception e) { + ServiceSupport.dispose(transport); + onAcceptError(e); + } + } + }; + startThread.setPriority(4); + startThread.start(); + } catch (Exception e) { + String remoteHost = transport.getRemoteAddress(); + ServiceSupport.dispose(transport); + onAcceptError(e, remoteHost); + } + } + + public void onAcceptError(Exception error) { + onAcceptError(error, null); + } + + private void onAcceptError(Exception error, String remoteHost) { + LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " + error.getMessage()); + LOG.debug("Reason: " + error.getMessage(), error); + } + }); + + server.setBrokerInfo(brokerInfo); + server.start(); + DiscoveryAgent da = getDiscoveryAgent(); if (da != null) { da.registerService(getConnectUri().toString()); @@ -280,14 +274,10 @@ public class TransportConnector implements Connector, BrokerServiceAware { if (uri == null) { throw new IllegalArgumentException("You must specify either a server or uri property"); } - if (broker == null) { - throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?"); - } - if (brokerService != null) { - return TransportFactory.bind(brokerService, uri); - } else { - return TransportFactory.bind(broker.getBrokerId().getValue(), uri); + if (brokerService == null) { + throw new IllegalArgumentException("You must specify the brokerService property. Maybe this connector should be added to a broker?"); } + return TransportFactory.bind(brokerService, uri); } public DiscoveryAgent getDiscoveryAgent() throws IOException { @@ -381,7 +371,11 @@ public class TransportConnector implements Connector, BrokerServiceAware { this.enableStatusMonitor = enableStatusMonitor; } - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - } + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + public Broker getBroker() { + return broker; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java index 5d9a82a531..5073c19ef0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java @@ -41,8 +41,8 @@ public class ManagedTransportConnector extends TransportConnector { private final MBeanServer mbeanServer; private final ObjectName connectorName; - public ManagedTransportConnector(MBeanServer mbeanServer, ObjectName connectorName, Broker next, TransportServer server) { - super(next, server); + public ManagedTransportConnector(MBeanServer mbeanServer, ObjectName connectorName, TransportServer server) { + super(server); this.mbeanServer = mbeanServer; this.connectorName = connectorName; } diff --git a/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java b/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java index c30bf7f9ae..d0137e5cf7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java @@ -126,7 +126,7 @@ public class ProxyConnector implements Service { if (bind == null) { throw new IllegalArgumentException("You must specify either a server or the bind property"); } - return TransportFactory.bind((String)null, bind); + return TransportFactory.bind(bind); } private Transport createRemoteTransport() throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java index 53dbd11ee3..91202493e7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java @@ -41,7 +41,7 @@ public abstract class TransportFactory { private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); private static final ConcurrentHashMap TRANSPORT_FACTORYS = new ConcurrentHashMap(); - public abstract TransportServer doBind(String brokerId, URI location) throws IOException; + public abstract TransportServer doBind(URI location) throws IOException; public Transport doConnect(URI location, Executor ex) throws Exception { return doConnect(location); @@ -103,17 +103,24 @@ public abstract class TransportFactory { return tf.doCompositeConnect(location, ex); } - public static TransportServer bind(String brokerId, URI location) throws IOException { + public static TransportServer bind(URI location) throws IOException { TransportFactory tf = findTransportFactory(location); - return tf.doBind(brokerId, location); + return tf.doBind(location); + } + + /** + * @deprecated + */ + public static TransportServer bind(String brokerId, URI location) throws IOException { + return bind(location); } public static TransportServer bind(BrokerService brokerService, URI location) throws IOException { TransportFactory tf = findTransportFactory(location); - if (tf instanceof BrokerServiceAware) { - ((BrokerServiceAware)tf).setBrokerService(brokerService); + if (brokerService != null && tf instanceof BrokerServiceAware) { + ((BrokerServiceAware)tf).setBrokerService(brokerService); } - return tf.doBind(brokerService.getBrokerName(), location); + return tf.doBind(location); } public Transport doConnect(URI location) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java index 37978874fe..4a4ab4b239 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransportFactory.java @@ -43,7 +43,7 @@ public class DiscoveryTransportFactory extends FailoverTransportFactory { return transport; } - public TransportServer doBind(String brokerId, URI location) throws IOException { + public TransportServer doBind(URI location) throws IOException { throw new IOException("Invalid server URI: " + location); // try{ // CompositeData compositData=URISupport.parseComposite(location); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java index 9d6468ac58..27e5c376c7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java @@ -72,7 +72,7 @@ public class FailoverTransportFactory extends TransportFactory { return transport; } - public TransportServer doBind(String brokerId, URI location) throws IOException { + public TransportServer doBind(URI location) throws IOException { throw new IOException("Invalid server URI: " + location); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java index 40c738291e..2e0657b593 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransportFactory.java @@ -80,7 +80,7 @@ public class FanoutTransportFactory extends TransportFactory { return transport; } - public TransportServer doBind(String brokerId, URI location) throws IOException { + public TransportServer doBind(URI location) throws IOException { throw new IOException("Invalid server URI: " + location); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransportFactory.java index 34d5b14ee3..7d8eb4f56d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransportFactory.java @@ -53,7 +53,7 @@ public class MockTransportFactory extends TransportFactory { return transport; } - public TransportServer doBind(String brokerId, URI location) throws IOException { + public TransportServer doBind(URI location) throws IOException { throw new IOException("This protocol does not support being bound."); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java index e2190fac51..4bdcdb29a4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/peer/PeerTransportFactory.java @@ -104,7 +104,7 @@ public class PeerTransportFactory extends TransportFactory { } } - public TransportServer doBind(String brokerId, URI location) throws IOException { + public TransportServer doBind(URI location) throws IOException { throw new IOException("This protocol does not support being bound."); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java index 11f38dd39f..9e75c828ae 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java @@ -74,7 +74,7 @@ public class SslTransportFactory extends TcpTransportFactory { /** * Overriding to use SslTransportServer and allow for proper reflection. */ - public TransportServer doBind(String brokerId, final URI location) throws IOException { + public TransportServer doBind(final URI location) throws IOException { try { Map options = new HashMap(URISupport.parseParamters(location)); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index c4292c9f4d..12d5af4f25 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -47,7 +47,7 @@ import org.apache.commons.logging.LogFactory; public class TcpTransportFactory extends TransportFactory { private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class); - public TransportServer doBind(String brokerId, final URI location) throws IOException { + public TransportServer doBind(final URI location) throws IOException { try { Map options = new HashMap(URISupport.parseParamters(location)); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java index b246fc8555..de51ddfb63 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java @@ -51,7 +51,7 @@ public class UdpTransportFactory extends TransportFactory { private static final Log log = LogFactory.getLog(TcpTransportFactory.class); - public TransportServer doBind(String brokerId, final URI location) throws IOException { + public TransportServer doBind(final URI location) throws IOException { try { Map options = new HashMap(URISupport.parseParamters(location)); if (options.containsKey("port")) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java index 1cefa908ea..b831a2084d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java @@ -126,7 +126,8 @@ public class VMTransportFactory extends TransportFactory { server = SERVERS.get(host); if (server == null) { server = (VMTransportServer)bind(location, true); - TransportConnector connector = new TransportConnector(broker.getBroker(), server); + TransportConnector connector = new TransportConnector(server); + connector.setBrokerService(broker); connector.setUri(location); connector.setTaskRunnerFactory(broker.getTaskRunnerFactory()); connector.start(); @@ -151,7 +152,7 @@ public class VMTransportFactory extends TransportFactory { return transport; } - public TransportServer doBind(String brokerId, URI location) throws IOException { + public TransportServer doBind(URI location) throws IOException { return bind(location, false); } diff --git a/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java b/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java index 219db51cc7..3c6b70689c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java +++ b/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java @@ -256,7 +256,6 @@ public class AMQDeadlockTest3 extends TestCase { final TransportConnector tConnector = new TransportConnector(); tConnector.setUri(new URI(uri1)); - tConnector.setBrokerName(brokerName); tConnector.setName(brokerName + ".transportConnector"); brokerService.addConnector(tConnector); diff --git a/activemq-core/src/test/java/org/apache/activemq/ClientTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/ClientTestSupport.java index 1cb27be280..00a69382dd 100755 --- a/activemq-core/src/test/java/org/apache/activemq/ClientTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/ClientTestSupport.java @@ -58,7 +58,7 @@ public class ClientTestSupport extends TestCase { try { broker = BrokerFactory.createBroker(new URI(this.brokerURL)); String brokerId = broker.getBrokerName(); - connector = new TransportConnector(broker.getBroker(), TransportFactory.bind(brokerId, new URI(this.brokerURL))) { + connector = new TransportConnector(TransportFactory.bind(new URI(this.brokerURL))) { // Hook into the connector so we can assert that the server // accepted a connection. protected org.apache.activemq.broker.Connection createConnection(org.apache.activemq.transport.Transport transport) throws IOException { @@ -66,7 +66,7 @@ public class ClientTestSupport extends TestCase { return super.createConnection(transport); } }; - connector.start(); + broker.addConnector(connector); broker.start(); } catch (IOException e) { diff --git a/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java index 28dd1ca8c2..b416b438ce 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java @@ -46,19 +46,24 @@ public class NetworkTestSupport extends BrokerTestSupport { protected TransportConnector remoteConnector; protected void setUp() throws Exception { - super.setUp(); - connector = createConnector(); - connector.start(); - remotePersistenceAdapter = createRemotePersistenceAdapter(true); - remotePersistenceAdapter.start(); + remotePersistenceAdapter = createRemotePersistenceAdapter(true); remoteBroker = createRemoteBroker(remotePersistenceAdapter); - remoteBroker.start(); - BrokerRegistry.getInstance().bind("remotehost", remoteBroker); remoteConnector = createRemoteConnector(); - remoteConnector.start(); + remoteBroker.addConnector( remoteConnector ); + BrokerRegistry.getInstance().bind("remotehost", remoteBroker); + remoteBroker.start(); } + + + protected BrokerService createBroker() throws Exception { + BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false&")); + connector = createConnector(); + broker.addConnector(connector); + return broker; + } + /** * @return @@ -67,7 +72,7 @@ public class NetworkTestSupport extends BrokerTestSupport { * @throws URISyntaxException */ protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException { - return new TransportConnector(remoteBroker.getBroker(), TransportFactory.bind(broker.getBrokerName(), new URI(getRemoteURI()))); + return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI()))); } /** @@ -78,7 +83,7 @@ public class NetworkTestSupport extends BrokerTestSupport { * @throws URISyntaxException */ protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException { - return new TransportConnector(broker.getBroker(), TransportFactory.bind(broker.getBrokerName(), new URI(getLocalURI()))); + return new TransportConnector(TransportFactory.bind(new URI(getLocalURI()))); } protected String getRemoteURI() { @@ -96,11 +101,6 @@ public class NetworkTestSupport extends BrokerTestSupport { return remotePersistenceAdapter; } - protected BrokerService createBroker() throws Exception { - BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false&")); - return broker; - } - protected BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception { BrokerService answer = new BrokerService(); answer.setBrokerName("remote"); @@ -148,11 +148,10 @@ public class NetworkTestSupport extends BrokerTestSupport { remotePersistenceAdapter.stop(); remotePersistenceAdapter = createRemotePersistenceAdapter(false); remotePersistenceAdapter.start(); + remoteBroker = createRemoteBroker(remotePersistenceAdapter); + remoteBroker.addConnector(getRemoteURI()); remoteBroker.start(); - String brokerId = remoteBroker.getBrokerName(); - remoteConnector = new TransportConnector(remoteBroker.getBroker(), TransportFactory.bind(brokerId, new URI(getRemoteURI()))); - remoteConnector.start(); BrokerRegistry.getInstance().bind("remotehost", remoteBroker); } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java index dac7a1d1d2..dd5f46618d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -95,7 +95,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra * @throws Exception */ private void startTransportServer() throws IOException, URISyntaxException, Exception { - server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000")); + server = TransportFactory.bind(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000")); server.setAcceptListener(this); server.start(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportFactoryTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportFactoryTest.java index 2d47b64224..71c202789f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportFactoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportFactoryTest.java @@ -53,7 +53,7 @@ public class SslTransportFactoryTest extends TestCase { + (needClientAuth ? "true" : "false"); try { - sslTransportServer = (SslTransportServer)factory.doBind("brokerId", new URI( + sslTransportServer = (SslTransportServer)factory.doBind(new URI( "ssl://localhost:61616?" + options)); } catch (Exception e) { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java index 5496b8a557..fe62147b3c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java @@ -90,7 +90,7 @@ public class WireformatNegociationTest extends CombinationTestSupport { * @throws Exception */ private void startServer(String uri) throws IOException, URISyntaxException, Exception { - server = TransportFactory.bind("localhost", new URI(uri)); + server = TransportFactory.bind(new URI(uri)); server.setAcceptListener(new TransportAcceptListener() { public void onAccept(Transport transport) { try { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java index 427d4af05e..1841191afe 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java @@ -57,7 +57,7 @@ public class UdpTransportUsingServerTest extends UdpTestSupport { } protected TransportServer createServer() throws Exception { - return TransportFactory.bind("byBroker", new URI(serverURI)); + return TransportFactory.bind(new URI(serverURI)); } protected Transport createConsumer() throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java b/activemq-core/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java index 9c73cae718..edba819345 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java @@ -190,7 +190,6 @@ public class AMQDeadlockTestW4Brokers extends TestCase { final TransportConnector tConnector = new TransportConnector(); tConnector.setUri(new URI(uri1)); - tConnector.setBrokerName(brokerName); tConnector.setName(brokerName + ".transportConnector"); brokerService.addConnector(tConnector); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java b/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java index e354d6a075..2d47ae340e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java @@ -120,7 +120,6 @@ public class AMQFailoverIssue extends TestCase { brokerService.setDestinationPolicy(policyMap); final TransportConnector tConnector = new TransportConnector(); tConnector.setUri(new URI(uri1)); - tConnector.setBrokerName(brokerName); tConnector.setName(brokerName + ".transportConnector"); brokerService.addConnector(tConnector); if (uri2 != null) { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java index 59241d7d4b..c9cdd30d9d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java @@ -134,7 +134,6 @@ public class AMQStackOverFlowTest extends TestCase { final TransportConnector tConnector = new TransportConnector(); tConnector.setUri(new URI(uri1)); - tConnector.setBrokerName(brokerName); tConnector.setName(brokerName + ".transportConnector"); brokerService.addConnector(tConnector);