Fix for https://issues.apache.org/activemq/browse/AMQ-1670 you should now able able to add transports before configuring the reset of

the broker properties.  Also slightly changed the TransportFactory interface so that you are not forced to supply a brokerId to bind
a transport.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@649211 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-04-17 18:09:37 +00:00
parent 6707ac60cc
commit 5d99c99ab7
26 changed files with 117 additions and 123 deletions

View File

@ -203,7 +203,7 @@ public class BrokerService implements Service {
* @throws Exception
*/
public TransportConnector addConnector(URI bindAddress) throws Exception {
return addConnector(createTransportConnector(getBroker(), bindAddress));
return addConnector(createTransportConnector(bindAddress));
}
/**
@ -213,7 +213,7 @@ public class BrokerService implements Service {
* @throws Exception
*/
public TransportConnector addConnector(TransportServer transport) throws Exception {
return addConnector(new TransportConnector(getBroker(), transport));
return addConnector(new TransportConnector(transport));
}
/**
@ -1665,9 +1665,9 @@ public class BrokerService implements Service {
}
}
protected TransportConnector createTransportConnector(Broker broker, URI brokerURI) throws Exception {
protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
TransportServer transport = TransportFactory.bind(this, brokerURI);
return new TransportConnector(broker, transport);
return new TransportConnector(transport);
}
/**
@ -1825,8 +1825,6 @@ public class BrokerService implements Service {
}
protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
connector.setBroker(getBroker());
connector.setBrokerName(getBrokerName());
connector.setTaskRunnerFactory(getTaskRunnerFactory());
MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
if (policy != null) {

View File

@ -92,7 +92,7 @@ public class SslBrokerService extends BrokerService {
SslTransportFactory transportFactory = new SslTransportFactory();
transportFactory.setKeyAndTrustManagers(km, tm, random);
return transportFactory.doBind(getBrokerName(), brokerURI);
return transportFactory.doBind(brokerURI);
} else {
// Else, business as usual.
return TransportFactory.bind(this, brokerURI);

View File

@ -170,7 +170,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.statistics.setParent(connector.getStatistics());
}
this.taskRunnerFactory = taskRunnerFactory;
connector.setBrokerName(broker.getBrokerName());
this.transport = transport;
this.transport.setTransportListener(new DefaultTransportListener() {

View File

@ -52,7 +52,6 @@ public class TransportConnector implements Connector, BrokerServiceAware {
protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
protected TransportStatusDetector statusDector;
private Broker broker;
private BrokerService brokerService;
private TransportServer server;
private URI uri;
@ -66,13 +65,13 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private String name;
private boolean disableAsyncDispatch;
private boolean enableStatusMonitor = false;
private Broker broker;
public TransportConnector() {
}
public TransportConnector(Broker broker, TransportServer server) {
public TransportConnector(TransportServer server) {
this();
setBroker(broker);
setServer(server);
if (server != null && server.getConnectURI() != null) {
URI uri = server.getConnectURI();
@ -96,8 +95,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
* connector
*/
public ManagedTransportConnector asManagedConnector(MBeanServer mbeanServer, ObjectName connectorName) throws IOException, URISyntaxException {
ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getBroker(), getServer());
//rc.setBroker(getBroker());
ManagedTransportConnector rc = new ManagedTransportConnector(mbeanServer, connectorName, getServer());
rc.setBrokerInfo(getBrokerInfo());
rc.setConnectUri(getConnectUri());
rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
@ -106,9 +104,9 @@ public class TransportConnector implements Connector, BrokerServiceAware {
rc.setEnableStatusMonitor(isEnableStatusMonitor());
rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
rc.setName(getName());
//rc.setServer(getServer());
rc.setTaskRunnerFactory(getTaskRunnerFactory());
rc.setUri(getUri());
rc.setBrokerService(brokerService);
return rc;
}
@ -127,59 +125,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
return server;
}
public Broker getBroker() {
return broker;
}
public void setBroker(Broker broker) {
this.broker = broker;
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
}
public void setBrokerName(String brokerName) {
brokerInfo.setBrokerName(brokerName);
}
public void setServer(TransportServer server) {
this.server = server;
this.brokerInfo.setBrokerURL(server.getConnectURI().toString());
this.server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
// Starting the connection could block due to
// wireformat negotiation, so start it in an async thread.
Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress()) {
public void run() {
try {
Connection connection = createConnection(transport);
connection.start();
} catch (Exception e) {
ServiceSupport.dispose(transport);
onAcceptError(e);
}
}
};
startThread.setPriority(4);
startThread.start();
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);
onAcceptError(e, remoteHost);
}
}
public void onAcceptError(Exception error) {
onAcceptError(error, null);
}
private void onAcceptError(Exception error, String remoteHost) {
LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " + error.getMessage());
LOG.debug("Reason: " + error.getMessage(), error);
}
});
this.server.setBrokerInfo(brokerInfo);
}
public URI getUri() {
@ -232,7 +179,54 @@ public class TransportConnector implements Connector, BrokerServiceAware {
}
public void start() throws Exception {
getServer().start();
TransportServer server = getServer();
broker = brokerService.getBroker();
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
brokerInfo.setBrokerURL(server.getConnectURI().toString());
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
// Starting the connection could block due to
// wireformat negotiation, so start it in an async thread.
Thread startThread = new Thread("ActiveMQ Transport Initiator: " + transport.getRemoteAddress()) {
public void run() {
try {
Connection connection = createConnection(transport);
connection.start();
} catch (Exception e) {
ServiceSupport.dispose(transport);
onAcceptError(e);
}
}
};
startThread.setPriority(4);
startThread.start();
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);
onAcceptError(e, remoteHost);
}
}
public void onAcceptError(Exception error) {
onAcceptError(error, null);
}
private void onAcceptError(Exception error, String remoteHost) {
LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " + error.getMessage());
LOG.debug("Reason: " + error.getMessage(), error);
}
});
server.setBrokerInfo(brokerInfo);
server.start();
DiscoveryAgent da = getDiscoveryAgent();
if (da != null) {
da.registerService(getConnectUri().toString());
@ -280,14 +274,10 @@ public class TransportConnector implements Connector, BrokerServiceAware {
if (uri == null) {
throw new IllegalArgumentException("You must specify either a server or uri property");
}
if (broker == null) {
throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?");
}
if (brokerService != null) {
return TransportFactory.bind(brokerService, uri);
} else {
return TransportFactory.bind(broker.getBrokerId().getValue(), uri);
if (brokerService == null) {
throw new IllegalArgumentException("You must specify the brokerService property. Maybe this connector should be added to a broker?");
}
return TransportFactory.bind(brokerService, uri);
}
public DiscoveryAgent getDiscoveryAgent() throws IOException {
@ -381,7 +371,11 @@ public class TransportConnector implements Connector, BrokerServiceAware {
this.enableStatusMonitor = enableStatusMonitor;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
public Broker getBroker() {
return broker;
}
}

View File

@ -41,8 +41,8 @@ public class ManagedTransportConnector extends TransportConnector {
private final MBeanServer mbeanServer;
private final ObjectName connectorName;
public ManagedTransportConnector(MBeanServer mbeanServer, ObjectName connectorName, Broker next, TransportServer server) {
super(next, server);
public ManagedTransportConnector(MBeanServer mbeanServer, ObjectName connectorName, TransportServer server) {
super(server);
this.mbeanServer = mbeanServer;
this.connectorName = connectorName;
}

View File

@ -126,7 +126,7 @@ public class ProxyConnector implements Service {
if (bind == null) {
throw new IllegalArgumentException("You must specify either a server or the bind property");
}
return TransportFactory.bind((String)null, bind);
return TransportFactory.bind(bind);
}
private Transport createRemoteTransport() throws Exception {

View File

@ -41,7 +41,7 @@ public abstract class TransportFactory {
private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
public abstract TransportServer doBind(String brokerId, URI location) throws IOException;
public abstract TransportServer doBind(URI location) throws IOException;
public Transport doConnect(URI location, Executor ex) throws Exception {
return doConnect(location);
@ -103,17 +103,24 @@ public abstract class TransportFactory {
return tf.doCompositeConnect(location, ex);
}
public static TransportServer bind(String brokerId, URI location) throws IOException {
public static TransportServer bind(URI location) throws IOException {
TransportFactory tf = findTransportFactory(location);
return tf.doBind(brokerId, location);
return tf.doBind(location);
}
/**
* @deprecated
*/
public static TransportServer bind(String brokerId, URI location) throws IOException {
return bind(location);
}
public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
TransportFactory tf = findTransportFactory(location);
if (tf instanceof BrokerServiceAware) {
((BrokerServiceAware)tf).setBrokerService(brokerService);
if (brokerService != null && tf instanceof BrokerServiceAware) {
((BrokerServiceAware)tf).setBrokerService(brokerService);
}
return tf.doBind(brokerService.getBrokerName(), location);
return tf.doBind(location);
}
public Transport doConnect(URI location) throws Exception {

View File

@ -43,7 +43,7 @@ public class DiscoveryTransportFactory extends FailoverTransportFactory {
return transport;
}
public TransportServer doBind(String brokerId, URI location) throws IOException {
public TransportServer doBind(URI location) throws IOException {
throw new IOException("Invalid server URI: " + location);
// try{
// CompositeData compositData=URISupport.parseComposite(location);

View File

@ -72,7 +72,7 @@ public class FailoverTransportFactory extends TransportFactory {
return transport;
}
public TransportServer doBind(String brokerId, URI location) throws IOException {
public TransportServer doBind(URI location) throws IOException {
throw new IOException("Invalid server URI: " + location);
}

View File

@ -80,7 +80,7 @@ public class FanoutTransportFactory extends TransportFactory {
return transport;
}
public TransportServer doBind(String brokerId, URI location) throws IOException {
public TransportServer doBind(URI location) throws IOException {
throw new IOException("Invalid server URI: " + location);
}

View File

@ -53,7 +53,7 @@ public class MockTransportFactory extends TransportFactory {
return transport;
}
public TransportServer doBind(String brokerId, URI location) throws IOException {
public TransportServer doBind(URI location) throws IOException {
throw new IOException("This protocol does not support being bound.");
}

View File

@ -104,7 +104,7 @@ public class PeerTransportFactory extends TransportFactory {
}
}
public TransportServer doBind(String brokerId, URI location) throws IOException {
public TransportServer doBind(URI location) throws IOException {
throw new IOException("This protocol does not support being bound.");
}

View File

@ -74,7 +74,7 @@ public class SslTransportFactory extends TcpTransportFactory {
/**
* Overriding to use SslTransportServer and allow for proper reflection.
*/
public TransportServer doBind(String brokerId, final URI location) throws IOException {
public TransportServer doBind(final URI location) throws IOException {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));

View File

@ -47,7 +47,7 @@ import org.apache.commons.logging.LogFactory;
public class TcpTransportFactory extends TransportFactory {
private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class);
public TransportServer doBind(String brokerId, final URI location) throws IOException {
public TransportServer doBind(final URI location) throws IOException {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));

View File

@ -51,7 +51,7 @@ public class UdpTransportFactory extends TransportFactory {
private static final Log log = LogFactory.getLog(TcpTransportFactory.class);
public TransportServer doBind(String brokerId, final URI location) throws IOException {
public TransportServer doBind(final URI location) throws IOException {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
if (options.containsKey("port")) {

View File

@ -126,7 +126,8 @@ public class VMTransportFactory extends TransportFactory {
server = SERVERS.get(host);
if (server == null) {
server = (VMTransportServer)bind(location, true);
TransportConnector connector = new TransportConnector(broker.getBroker(), server);
TransportConnector connector = new TransportConnector(server);
connector.setBrokerService(broker);
connector.setUri(location);
connector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
connector.start();
@ -151,7 +152,7 @@ public class VMTransportFactory extends TransportFactory {
return transport;
}
public TransportServer doBind(String brokerId, URI location) throws IOException {
public TransportServer doBind(URI location) throws IOException {
return bind(location, false);
}

View File

@ -256,7 +256,6 @@ public class AMQDeadlockTest3 extends TestCase {
final TransportConnector tConnector = new TransportConnector();
tConnector.setUri(new URI(uri1));
tConnector.setBrokerName(brokerName);
tConnector.setName(brokerName + ".transportConnector");
brokerService.addConnector(tConnector);

View File

@ -58,7 +58,7 @@ public class ClientTestSupport extends TestCase {
try {
broker = BrokerFactory.createBroker(new URI(this.brokerURL));
String brokerId = broker.getBrokerName();
connector = new TransportConnector(broker.getBroker(), TransportFactory.bind(brokerId, new URI(this.brokerURL))) {
connector = new TransportConnector(TransportFactory.bind(new URI(this.brokerURL))) {
// Hook into the connector so we can assert that the server
// accepted a connection.
protected org.apache.activemq.broker.Connection createConnection(org.apache.activemq.transport.Transport transport) throws IOException {
@ -66,7 +66,7 @@ public class ClientTestSupport extends TestCase {
return super.createConnection(transport);
}
};
connector.start();
broker.addConnector(connector);
broker.start();
} catch (IOException e) {

View File

@ -46,19 +46,24 @@ public class NetworkTestSupport extends BrokerTestSupport {
protected TransportConnector remoteConnector;
protected void setUp() throws Exception {
super.setUp();
connector = createConnector();
connector.start();
remotePersistenceAdapter = createRemotePersistenceAdapter(true);
remotePersistenceAdapter.start();
remotePersistenceAdapter = createRemotePersistenceAdapter(true);
remoteBroker = createRemoteBroker(remotePersistenceAdapter);
remoteBroker.start();
BrokerRegistry.getInstance().bind("remotehost", remoteBroker);
remoteConnector = createRemoteConnector();
remoteConnector.start();
remoteBroker.addConnector( remoteConnector );
BrokerRegistry.getInstance().bind("remotehost", remoteBroker);
remoteBroker.start();
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false&"));
connector = createConnector();
broker.addConnector(connector);
return broker;
}
/**
* @return
@ -67,7 +72,7 @@ public class NetworkTestSupport extends BrokerTestSupport {
* @throws URISyntaxException
*/
protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException {
return new TransportConnector(remoteBroker.getBroker(), TransportFactory.bind(broker.getBrokerName(), new URI(getRemoteURI())));
return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
}
/**
@ -78,7 +83,7 @@ public class NetworkTestSupport extends BrokerTestSupport {
* @throws URISyntaxException
*/
protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
return new TransportConnector(broker.getBroker(), TransportFactory.bind(broker.getBrokerName(), new URI(getLocalURI())));
return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
}
protected String getRemoteURI() {
@ -96,11 +101,6 @@ public class NetworkTestSupport extends BrokerTestSupport {
return remotePersistenceAdapter;
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false&useJmx=false&"));
return broker;
}
protected BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception {
BrokerService answer = new BrokerService();
answer.setBrokerName("remote");
@ -148,11 +148,10 @@ public class NetworkTestSupport extends BrokerTestSupport {
remotePersistenceAdapter.stop();
remotePersistenceAdapter = createRemotePersistenceAdapter(false);
remotePersistenceAdapter.start();
remoteBroker = createRemoteBroker(remotePersistenceAdapter);
remoteBroker.addConnector(getRemoteURI());
remoteBroker.start();
String brokerId = remoteBroker.getBrokerName();
remoteConnector = new TransportConnector(remoteBroker.getBroker(), TransportFactory.bind(brokerId, new URI(getRemoteURI())));
remoteConnector.start();
BrokerRegistry.getInstance().bind("remotehost", remoteBroker);
}

View File

@ -95,7 +95,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
* @throws Exception
*/
private void startTransportServer() throws IOException, URISyntaxException, Exception {
server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
server = TransportFactory.bind(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
server.setAcceptListener(this);
server.start();
}

View File

@ -53,7 +53,7 @@ public class SslTransportFactoryTest extends TestCase {
+ (needClientAuth ? "true" : "false");
try {
sslTransportServer = (SslTransportServer)factory.doBind("brokerId", new URI(
sslTransportServer = (SslTransportServer)factory.doBind(new URI(
"ssl://localhost:61616?"
+ options));
} catch (Exception e) {

View File

@ -90,7 +90,7 @@ public class WireformatNegociationTest extends CombinationTestSupport {
* @throws Exception
*/
private void startServer(String uri) throws IOException, URISyntaxException, Exception {
server = TransportFactory.bind("localhost", new URI(uri));
server = TransportFactory.bind(new URI(uri));
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) {
try {

View File

@ -57,7 +57,7 @@ public class UdpTransportUsingServerTest extends UdpTestSupport {
}
protected TransportServer createServer() throws Exception {
return TransportFactory.bind("byBroker", new URI(serverURI));
return TransportFactory.bind(new URI(serverURI));
}
protected Transport createConsumer() throws Exception {

View File

@ -190,7 +190,6 @@ public class AMQDeadlockTestW4Brokers extends TestCase {
final TransportConnector tConnector = new TransportConnector();
tConnector.setUri(new URI(uri1));
tConnector.setBrokerName(brokerName);
tConnector.setName(brokerName + ".transportConnector");
brokerService.addConnector(tConnector);

View File

@ -120,7 +120,6 @@ public class AMQFailoverIssue extends TestCase {
brokerService.setDestinationPolicy(policyMap);
final TransportConnector tConnector = new TransportConnector();
tConnector.setUri(new URI(uri1));
tConnector.setBrokerName(brokerName);
tConnector.setName(brokerName + ".transportConnector");
brokerService.addConnector(tConnector);
if (uri2 != null) {

View File

@ -134,7 +134,6 @@ public class AMQStackOverFlowTest extends TestCase {
final TransportConnector tConnector = new TransportConnector();
tConnector.setUri(new URI(uri1));
tConnector.setBrokerName(brokerName);
tConnector.setName(brokerName + ".transportConnector");
brokerService.addConnector(tConnector);