Added synconization to the methods that setup connection state so that when an async error is detected, it properly does a full cleanup. Previously subscriptions were not properly cleaned up since they were being setup at the same time as they were being cleaned up.

Also start 1 async exception thread ever since an async exception leads to connection tear down.  Subsequent failures do not need additional async threads started.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@474769 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-14 12:44:16 +00:00
parent 0e9369f5dd
commit caa30ffdc0
1 changed files with 26 additions and 23 deletions

View File

@ -123,6 +123,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
private AtomicBoolean stopped = new AtomicBoolean(false); private AtomicBoolean stopped = new AtomicBoolean(false);
protected final AtomicBoolean disposed=new AtomicBoolean(false); protected final AtomicBoolean disposed=new AtomicBoolean(false);
private CountDownLatch stopLatch = new CountDownLatch(1); private CountDownLatch stopLatch = new CountDownLatch(1);
protected final AtomicBoolean asyncException = new AtomicBoolean(false);
static class ConnectionState extends org.apache.activemq.state.ConnectionState { static class ConnectionState extends org.apache.activemq.state.ConnectionState {
private final ConnectionContext context; private final ConnectionContext context;
@ -217,11 +218,13 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
* @param e * @param e
*/ */
public void serviceExceptionAsync(final IOException e) { public void serviceExceptionAsync(final IOException e) {
new Thread("Async Exception Handler") { if( asyncException.compareAndSet(false, true) ) {
public void run() { new Thread("Async Exception Handler") {
serviceException(e); public void run() {
} serviceException(e);
}.start(); }
}.start();
}
} }
/** /**
@ -349,7 +352,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processBeginTransaction(TransactionInfo info) throws Exception { synchronized public Response processBeginTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null; ConnectionContext context=null;
if( cs!=null ) { if( cs!=null ) {
@ -364,14 +367,14 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processEndTransaction(TransactionInfo info) throws Exception { synchronized public Response processEndTransaction(TransactionInfo info) throws Exception {
// No need to do anything. This packet is just sent by the client // No need to do anything. This packet is just sent by the client
// make sure he is synced with the server as commit command could // make sure he is synced with the server as commit command could
// come from a different connection. // come from a different connection.
return null; return null;
} }
public Response processPrepareTransaction(TransactionInfo info) throws Exception { synchronized public Response processPrepareTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null; ConnectionContext context=null;
if( cs!=null ) { if( cs!=null ) {
@ -395,7 +398,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
} }
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { synchronized public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null; ConnectionContext context=null;
if( cs!=null ) { if( cs!=null ) {
@ -409,7 +412,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { synchronized public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null; ConnectionContext context=null;
if( cs!=null ) { if( cs!=null ) {
@ -421,7 +424,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processRollbackTransaction(TransactionInfo info) throws Exception { synchronized public Response processRollbackTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null; ConnectionContext context=null;
if( cs!=null ) { if( cs!=null ) {
@ -433,7 +436,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processForgetTransaction(TransactionInfo info) throws Exception { synchronized public Response processForgetTransaction(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null; ConnectionContext context=null;
if( cs!=null ) { if( cs!=null ) {
@ -443,7 +446,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processRecoverTransactions(TransactionInfo info) throws Exception { synchronized public Response processRecoverTransactions(TransactionInfo info) throws Exception {
ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId()); ConnectionState cs = (ConnectionState) localConnectionStates.get(info.getConnectionId());
ConnectionContext context=null; ConnectionContext context=null;
if( cs!=null ) { if( cs!=null ) {
@ -498,7 +501,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processAddDestination(DestinationInfo info) throws Exception { synchronized public Response processAddDestination(DestinationInfo info) throws Exception {
ConnectionState cs = lookupConnectionState(info.getConnectionId()); ConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.addDestinationInfo(cs.getContext(), info); broker.addDestinationInfo(cs.getContext(), info);
if( info.getDestination().isTemporary() ) { if( info.getDestination().isTemporary() ) {
@ -507,7 +510,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processRemoveDestination(DestinationInfo info) throws Exception { synchronized public Response processRemoveDestination(DestinationInfo info) throws Exception {
ConnectionState cs = lookupConnectionState(info.getConnectionId()); ConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.removeDestinationInfo(cs.getContext(), info); broker.removeDestinationInfo(cs.getContext(), info);
if( info.getDestination().isTemporary() ) { if( info.getDestination().isTemporary() ) {
@ -517,7 +520,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
public Response processAddProducer(ProducerInfo info) throws Exception { synchronized public Response processAddProducer(ProducerInfo info) throws Exception {
SessionId sessionId = info.getProducerId().getParentId(); SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
@ -538,7 +541,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processRemoveProducer(ProducerId id) throws Exception { synchronized public Response processRemoveProducer(ProducerId id) throws Exception {
SessionId sessionId = id.getParentId(); SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
@ -554,7 +557,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processAddConsumer(ConsumerInfo info) throws Exception { synchronized public Response processAddConsumer(ConsumerInfo info) throws Exception {
SessionId sessionId = info.getConsumerId().getParentId(); SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
@ -576,7 +579,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processRemoveConsumer(ConsumerId id) throws Exception { synchronized public Response processRemoveConsumer(ConsumerId id) throws Exception {
SessionId sessionId = id.getParentId(); SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId(); ConnectionId connectionId = sessionId.getParentId();
@ -593,7 +596,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processAddSession(SessionInfo info) throws Exception { synchronized public Response processAddSession(SessionInfo info) throws Exception {
ConnectionId connectionId = info.getSessionId().getParentId(); ConnectionId connectionId = info.getSessionId().getParentId();
ConnectionState cs = lookupConnectionState(connectionId); ConnectionState cs = lookupConnectionState(connectionId);
@ -609,7 +612,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processRemoveSession(SessionId id) throws Exception { synchronized public Response processRemoveSession(SessionId id) throws Exception {
ConnectionId connectionId = id.getParentId(); ConnectionId connectionId = id.getParentId();
@ -646,7 +649,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processAddConnection(ConnectionInfo info) throws Exception { synchronized public Response processAddConnection(ConnectionInfo info) throws Exception {
ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId()); ConnectionState state = (ConnectionState) brokerConnectionStates.get(info.getConnectionId());
@ -695,7 +698,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processRemoveConnection(ConnectionId id) { synchronized public Response processRemoveConnection(ConnectionId id) {
ConnectionState cs = lookupConnectionState(id); ConnectionState cs = lookupConnectionState(id);