From d2d1c6d63d67115448d9b929c2d2e2e3dcd7f0f1 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 23 Jan 2006 19:40:42 +0000 Subject: [PATCH] The Transport Connection now notifies it's Connector of lifecycle events so that the Connector does not have to wrap the broker in yet another filter to notice the connection's lifecycle events. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@371636 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/AbstractConnection.java | 4 +- .../activemq/broker/TransportConnection.java | 2 + .../activemq/broker/TransportConnector.java | 37 ++++++------------- .../broker/TransportStatusDetector.java | 12 ++---- .../broker/jmx/ManagedTransportConnector.java | 2 +- 5 files changed, 20 insertions(+), 37 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index a4d3d30ab0..610233dba5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -77,7 +77,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C protected final List dispatchQueue = Collections.synchronizedList(new LinkedList()); protected final TaskRunner taskRunner; - protected final Connector connector; + protected final TransportConnector connector; protected BrokerInfo brokerInfo; private ConnectionStatistics statistics = new ConnectionStatistics(); private boolean inServiceException=false; @@ -107,7 +107,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C * @param broker * @param taskRunnerFactory - can be null if you want direct dispatch to the transport else commands are sent async. */ - public AbstractConnection(Connector connector, Broker broker, TaskRunnerFactory taskRunnerFactory) { + public AbstractConnection(TransportConnector connector, Broker broker, TaskRunnerFactory taskRunnerFactory) { this.connector = connector; this.broker = broker; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 1895e97a24..3a16f71a8b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -73,9 +73,11 @@ public class TransportConnection extends AbstractConnection { transport.start(); active = true; super.start(); + connector.onStarted(this); } public void stop() throws Exception { + connector.onStopped(this); try { if (masterBroker != null){ masterBroker.stop(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index d851d1307a..48f332b639 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -27,7 +27,6 @@ import javax.management.ObjectName; import org.apache.activemq.broker.jmx.ManagedTransportConnector; import org.apache.activemq.broker.region.ConnectorStatistics; import org.apache.activemq.command.BrokerInfo; -import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; @@ -51,7 +50,6 @@ public class TransportConnector implements Connector { private static final Log log = LogFactory.getLog(TransportConnector.class); private Broker broker; - private BrokerFilter brokerFilter; private TransportServer server; private URI uri; private BrokerInfo brokerInfo = new BrokerInfo(); @@ -195,8 +193,8 @@ public class TransportConnector implements Connector { } this.statusDector.stop(); for (Iterator iter = connections.iterator(); iter.hasNext();) { - ConnectionContext context = (ConnectionContext) iter.next(); - ss.stop(context.getConnection()); + TransportConnection c = (TransportConnection) iter.next(); + ss.stop(c); } ss.throwFirstException(); } @@ -204,28 +202,7 @@ public class TransportConnector implements Connector { // Implementation methods // ------------------------------------------------------------------------- protected Connection createConnection(Transport transport) throws IOException { - return new TransportConnection(this, transport, getBrokerFilter(), taskRunnerFactory); - } - - protected BrokerFilter getBrokerFilter() { - if (brokerFilter == null) { - if (broker == null) { - throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?"); - } - this.brokerFilter = new BrokerFilter(broker) { - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Throwable { - connections.add(context); - super.addConnection(context, info); - } - - public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Throwable { - connections.remove(context); - super.removeConnection(context, info, error); - } - }; - - } - return brokerFilter; + return new TransportConnection(this, transport, broker, taskRunnerFactory); } protected TransportServer createTransportServer() throws IOException, URISyntaxException { @@ -278,4 +255,12 @@ public class TransportConnector implements Connector { this.connectUri = transportUri; } + public void onStarted(TransportConnection connection) { + connections.add(connection); + } + + public void onStopped(TransportConnection connection) { + connections.remove(connection); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java index c40b8cee73..598acb799a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java @@ -75,14 +75,10 @@ public class TransportStatusDetector implements Service,Runnable{ } protected void doSweep(){ for(Iterator i=connector.getConnections().iterator();i.hasNext();){ - ConnectionContext cc=(ConnectionContext) i.next(); - Connection connection=cc.getConnection(); - if(connection instanceof TransportConnection){ - TransportConnection tc=(TransportConnection) connection; - if(tc.isMarkedCandidate()){ - tc.doMark(); - collectionCandidates.add(tc); - } + TransportConnection connection=(TransportConnection) i.next(); + if(connection.isMarkedCandidate()){ + connection.doMark(); + collectionCandidates.add(connection); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java index cb8c9ebaa1..066215e008 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnector.java @@ -57,7 +57,7 @@ public class ManagedTransportConnector extends TransportConnector { connectionId = "" + (nextConnectionId++); } - return new ManagedTransportConnection(this, transport, getBrokerFilter(), getTaskRunnerFactory(), mbeanServer, connectorName, connectionId); + return new ManagedTransportConnection(this, transport, getBroker(), getTaskRunnerFactory(), mbeanServer, connectorName, connectionId); } }