mirror of https://github.com/apache/activemq.git
The Transport Connection now notifies it's Connector of lifecycle events so that the Connector does not have to wrap the broker in yet another filter to notice the connection's lifecycle events.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@371636 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1c64baadea
commit
d2d1c6d63d
|
@ -77,7 +77,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
|||
|
||||
protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
|
||||
protected final TaskRunner taskRunner;
|
||||
protected final Connector connector;
|
||||
protected final TransportConnector connector;
|
||||
protected BrokerInfo brokerInfo;
|
||||
private ConnectionStatistics statistics = new ConnectionStatistics();
|
||||
private boolean inServiceException=false;
|
||||
|
@ -107,7 +107,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
|||
* @param broker
|
||||
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport else commands are sent async.
|
||||
*/
|
||||
public AbstractConnection(Connector connector, Broker broker, TaskRunnerFactory taskRunnerFactory) {
|
||||
public AbstractConnection(TransportConnector connector, Broker broker, TaskRunnerFactory taskRunnerFactory) {
|
||||
|
||||
this.connector = connector;
|
||||
this.broker = broker;
|
||||
|
|
|
@ -73,9 +73,11 @@ public class TransportConnection extends AbstractConnection {
|
|||
transport.start();
|
||||
active = true;
|
||||
super.start();
|
||||
connector.onStarted(this);
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
connector.onStopped(this);
|
||||
try {
|
||||
if (masterBroker != null){
|
||||
masterBroker.stop();
|
||||
|
|
|
@ -27,7 +27,6 @@ import javax.management.ObjectName;
|
|||
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
|
||||
import org.apache.activemq.broker.region.ConnectorStatistics;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportAcceptListener;
|
||||
|
@ -51,7 +50,6 @@ public class TransportConnector implements Connector {
|
|||
private static final Log log = LogFactory.getLog(TransportConnector.class);
|
||||
|
||||
private Broker broker;
|
||||
private BrokerFilter brokerFilter;
|
||||
private TransportServer server;
|
||||
private URI uri;
|
||||
private BrokerInfo brokerInfo = new BrokerInfo();
|
||||
|
@ -195,8 +193,8 @@ public class TransportConnector implements Connector {
|
|||
}
|
||||
this.statusDector.stop();
|
||||
for (Iterator iter = connections.iterator(); iter.hasNext();) {
|
||||
ConnectionContext context = (ConnectionContext) iter.next();
|
||||
ss.stop(context.getConnection());
|
||||
TransportConnection c = (TransportConnection) iter.next();
|
||||
ss.stop(c);
|
||||
}
|
||||
ss.throwFirstException();
|
||||
}
|
||||
|
@ -204,28 +202,7 @@ public class TransportConnector implements Connector {
|
|||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
protected Connection createConnection(Transport transport) throws IOException {
|
||||
return new TransportConnection(this, transport, getBrokerFilter(), taskRunnerFactory);
|
||||
}
|
||||
|
||||
protected BrokerFilter getBrokerFilter() {
|
||||
if (brokerFilter == null) {
|
||||
if (broker == null) {
|
||||
throw new IllegalArgumentException("You must specify the broker property. Maybe this connector should be added to a broker?");
|
||||
}
|
||||
this.brokerFilter = new BrokerFilter(broker) {
|
||||
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Throwable {
|
||||
connections.add(context);
|
||||
super.addConnection(context, info);
|
||||
}
|
||||
|
||||
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Throwable {
|
||||
connections.remove(context);
|
||||
super.removeConnection(context, info, error);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
return brokerFilter;
|
||||
return new TransportConnection(this, transport, broker, taskRunnerFactory);
|
||||
}
|
||||
|
||||
protected TransportServer createTransportServer() throws IOException, URISyntaxException {
|
||||
|
@ -278,4 +255,12 @@ public class TransportConnector implements Connector {
|
|||
this.connectUri = transportUri;
|
||||
}
|
||||
|
||||
public void onStarted(TransportConnection connection) {
|
||||
connections.add(connection);
|
||||
}
|
||||
|
||||
public void onStopped(TransportConnection connection) {
|
||||
connections.remove(connection);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -75,14 +75,10 @@ public class TransportStatusDetector implements Service,Runnable{
|
|||
}
|
||||
protected void doSweep(){
|
||||
for(Iterator i=connector.getConnections().iterator();i.hasNext();){
|
||||
ConnectionContext cc=(ConnectionContext) i.next();
|
||||
Connection connection=cc.getConnection();
|
||||
if(connection instanceof TransportConnection){
|
||||
TransportConnection tc=(TransportConnection) connection;
|
||||
if(tc.isMarkedCandidate()){
|
||||
tc.doMark();
|
||||
collectionCandidates.add(tc);
|
||||
}
|
||||
TransportConnection connection=(TransportConnection) i.next();
|
||||
if(connection.isMarkedCandidate()){
|
||||
connection.doMark();
|
||||
collectionCandidates.add(connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ public class ManagedTransportConnector extends TransportConnector {
|
|||
connectionId = "" + (nextConnectionId++);
|
||||
}
|
||||
|
||||
return new ManagedTransportConnection(this, transport, getBrokerFilter(), getTaskRunnerFactory(), mbeanServer, connectorName, connectionId);
|
||||
return new ManagedTransportConnection(this, transport, getBroker(), getTaskRunnerFactory(), mbeanServer, connectorName, connectionId);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue