diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index ae105e2726..662645eb49 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -24,7 +24,6 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; - import javax.jms.Connection; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -42,7 +41,6 @@ import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.XAConnection; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTempDestination; @@ -71,8 +69,8 @@ import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; @@ -80,14 +78,13 @@ import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; -public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, StreamConnection { +public class ActiveMQConnection extends DefaultTransportListener implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection { public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index 657ea96d76..c88ead1654 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -442,6 +442,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C } public Response processRemoveSession(SessionId id) throws Throwable { + ConnectionId connectionId = id.getParentId(); ConnectionState cs = lookupConnectionState(connectionId); @@ -491,22 +492,37 @@ public abstract class AbstractConnection implements Service, Connection, Task, C return null; } - public Response processRemoveConnection(ConnectionId id) throws Throwable { + public Response processRemoveConnection(ConnectionId id) { ConnectionState cs = lookupConnectionState(id); // Cascade the connection stop to the sessions. for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { - processRemoveSession((SessionId) iter.next()); + + SessionId sessionId = (SessionId) iter.next(); + try{ + processRemoveSession(sessionId); + }catch(Throwable e){ + serviceLog.warn("Failed to remove session " + sessionId,e); + } } // Cascade the connection stop to temp destinations. for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) { - broker.removeDestination(cs.getContext(), (ActiveMQDestination) iter.next(), 0); + ActiveMQDestination dest = (ActiveMQDestination) iter.next(); + try{ + broker.removeDestination(cs.getContext(), dest, 0); + }catch(Throwable e){ + serviceLog.warn("Failed to remove tmp destination " + dest,e); + } iter.remove(); } - broker.removeConnection(cs.getContext(), cs.getInfo(), null); + try{ + broker.removeConnection(cs.getContext(), cs.getInfo(), null); + }catch(Throwable e){ + serviceLog.warn("Failed to remove connection " + cs.getInfo(),e); + } connectionStates.remove(id); return null; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 3a16f71a8b..fa7c728329 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; import org.apache.commons.logging.Log; @@ -55,7 +56,7 @@ public class TransportConnection extends AbstractConnection { super(connector, broker, taskRunnerFactory); connector.setBrokerName(broker.getBrokerName()); this.transport = transport; - this.transport.setTransportListener(new TransportListener() { + this.transport.setTransportListener(new DefaultTransportListener() { public void onCommand(Command command) { Response response = service(command); if( response!=null ) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java index 96286cd6c7..9ea84fd7ec 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java @@ -16,8 +16,8 @@ */ package org.apache.activemq.broker.ft; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; - +import java.io.IOException; +import java.net.URI; import org.apache.activemq.Service; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; @@ -31,17 +31,15 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import java.io.IOException; -import java.net.URI; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** * Used by a Slave Broker to Connect to the Master @@ -83,7 +81,7 @@ public class MasterConnector implements Service{ remoteBroker=TransportFactory.connect(remoteURI); log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established."); - localBroker.setTransportListener(new TransportListener(){ + localBroker.setTransportListener(new DefaultTransportListener(){ public void onCommand(Command command){ } public void onException(IOException error){ @@ -93,7 +91,7 @@ public class MasterConnector implements Service{ } }); - remoteBroker.setTransportListener(new TransportListener(){ + remoteBroker.setTransportListener(new DefaultTransportListener(){ public void onCommand(Command command){ if( started.get() ) { serviceRemoteCommand(command); diff --git a/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnection.java b/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnection.java index 8934d7a07e..c4cdadf687 100644 --- a/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnection.java @@ -17,17 +17,16 @@ package org.apache.activemq.proxy; import java.io.IOException; - import org.apache.activemq.Service; import org.apache.activemq.command.Command; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; class ProxyConnection implements Service { @@ -59,7 +58,7 @@ class ProxyConnection implements Service { return; } - this.localTransport.setTransportListener(new TransportListener() { + this.localTransport.setTransportListener(new DefaultTransportListener() { public void onCommand(Command command) { boolean shutdown=false; if( command.getClass() == ShutdownInfo.class ) { @@ -81,7 +80,7 @@ class ProxyConnection implements Service { } }); - this.remoteTransport.setTransportListener(new TransportListener() { + this.remoteTransport.setTransportListener(new DefaultTransportListener() { public void onCommand(Command command) { try { localTransport.oneway(command); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java b/activemq-core/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java new file mode 100755 index 0000000000..58b4fadfd7 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java @@ -0,0 +1,58 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import java.io.IOException; + +import org.apache.activemq.command.Command; + +/** + * An asynchronous listener of commands + * + * @version $Revision$ + */ +public class DefaultTransportListener implements TransportListener { + + /** + * called to process a command + * @param command + */ + public void onCommand(Command command){ + } + /** + * An unrecoverable exception has occured on the transport + * @param error + */ + public void onException(IOException error){ + } + + /** + * The transport has suffered an interuption from which it hopes to recover + * + */ + public void transportInterupted(){ + } + + + /** + * The transport has resumed after an interuption + * + */ + public void transportResumed(){ + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java index 93326e1dba..7b035c54d8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java @@ -25,7 +25,7 @@ import org.apache.activemq.command.Response; /** * @version $Revision: 1.5 $ */ -public class TransportFilter implements Transport, TransportListener { +public class TransportFilter extends DefaultTransportListener implements Transport { final protected Transport next; protected TransportListener commandListener; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportListener.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportListener.java index 6115545b51..fc74dbd566 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportListener.java @@ -27,7 +27,28 @@ import org.apache.activemq.command.Command; */ public interface TransportListener { + /** + * called to process a command + * @param command + */ public void onCommand(Command command); + /** + * An unrecoverable exception has occured on the transport + * @param error + */ public void onException(IOException error); + /** + * The transport has suffered an interuption from which it hopes to recover + * + */ + public void transportInterupted(); + + + /** + * The transport has resumed after an interuption + * + */ + public void transportResumed(); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index b7a34024b2..7a976a6242 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -31,6 +31,7 @@ import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.transport.CompositeTransport; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; @@ -77,7 +78,7 @@ public class FailoverTransport implements CompositeTransport { private long reconnectDelay = initialReconnectDelay; private Exception connectionFailure; - private final TransportListener myTransportListener = new TransportListener() { + private final TransportListener myTransportListener = new DefaultTransportListener() { public void onCommand(Command command) { if (command == null) { return; @@ -152,6 +153,9 @@ public class FailoverTransport implements CompositeTransport { connectedTransport = t; reconnectMutex.notifyAll(); connectFailures = 0; + if (transportListener != null){ + transportListener.transportResumed(); + } return false; } catch (Exception e) { @@ -191,6 +195,9 @@ public class FailoverTransport implements CompositeTransport { } private void handleTransportFailure(IOException e) throws InterruptedException { + if (transportListener != null){ + transportListener.transportInterupted(); + } synchronized (reconnectMutex) { log.debug("Transport failed, starting up reconnect task", e); if (connectedTransport != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 3e93e507f3..2b5c237cc8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -31,6 +31,7 @@ import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.transport.CompositeTransport; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; @@ -91,7 +92,7 @@ public class FanoutTransport implements CompositeTransport { } } - class FanoutTransportHandler implements TransportListener { + class FanoutTransportHandler extends DefaultTransportListener { private final URI uri; private Transport transport; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java index a77a27593b..118ee29b7a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; @@ -29,7 +30,7 @@ import org.apache.activemq.transport.TransportListener; /** * @version $Revision: 1.5 $ */ -public class MockTransport implements Transport, TransportListener { +public class MockTransport extends DefaultTransportListener implements Transport { protected Transport next; protected TransportListener commandListener; diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java b/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java index bc61fe320c..a895790a7d 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java @@ -17,22 +17,17 @@ package org.apache.activemq.broker; import java.io.IOException; - import org.apache.activemq.Service; -import org.apache.activemq.broker.AbstractConnection; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.Connection; import org.apache.activemq.command.Command; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.Message; import org.apache.activemq.command.Response; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.ServiceSupport; import org.axiondb.engine.commands.ShutdownCommand; - import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; @@ -70,7 +65,7 @@ public class StubConnection implements Service { public StubConnection(Transport transport) throws Exception { this.transport = transport; - transport.setTransportListener(new TransportListener() { + transport.setTransportListener(new DefaultTransportListener() { public void onCommand(Command command) { try { if (command.getClass() == ShutdownCommand.class) {