If you have a multi-threaded client which share 1 connection to the broker it is possible you may have seen errors like:

2007-12-05 14:14:52,606 [VMTransport] ERROR - org.apache.activemq.broker.TransportConnection.Service - Async error occurred: java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:bubba-38184-1196882086290-2:1
java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:bubba-38184-1196882086290-2:1
at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:113)
at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1313)
at org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:538)
at org.apache.activemq.command.RemoveInfo.visit(RemoveInfo.java:64)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:118)
at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:42)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)


root cause is that the client side has multiple threads calling close() on consumer/session and the connection objects. If the timing of those concurrent calls was just right, it is possible for the client to send the connection close message followed by a session or consumer close message which is invalid and would result in IllegalStateException reported by the broker.

- Simplified the ActiveMQConnection and TransportConnection shutdown so that this does does not happen.


git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@602529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-12-08 18:56:11 +00:00
parent 8c541377bd
commit 2984963912
2 changed files with 93 additions and 95 deletions

View File

@ -580,8 +580,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// If we announced ourselfs to the broker.. Try to let // If we announced ourselfs to the broker.. Try to let
// the broker // the broker
// know that the connection is being shutdown. // know that the connection is being shutdown.
syncSendPacket(info.createRemoveCommand(), closeTimeout); doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
asyncSendPacket(new ShutdownInfo()); doAsyncSendPacket(new ShutdownInfo());
} }
ServiceSupport.dispose(this.transport); ServiceSupport.dispose(this.transport);
@ -1144,18 +1144,21 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @throws JMSException * @throws JMSException
*/ */
public void asyncSendPacket(Command command) throws JMSException { public void asyncSendPacket(Command command) throws JMSException {
if (isClosed()) { if (isClosed() || closing.get()) {
throw new ConnectionClosedException(); throw new ConnectionClosedException();
} else { } else {
doAsyncSendPacket(command);
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
} }
} }
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
/** /**
* Send a packet through a Connection - for internal use only * Send a packet through a Connection - for internal use only
* *
@ -1193,27 +1196,31 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @throws JMSException * @throws JMSException
*/ */
public Response syncSendPacket(Command command, int timeout) throws JMSException { public Response syncSendPacket(Command command, int timeout) throws JMSException {
if (isClosed()) { if (isClosed() || closing.get()) {
throw new ConnectionClosedException(); throw new ConnectionClosedException();
} else { } else {
return doSyncSendPacket(command, timeout);
try {
Response response = (Response)this.transport.request(command, timeout);
if (response != null && response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
} else {
throw JMSExceptionSupport.create(er.getException());
}
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
} }
} }
private Response doSyncSendPacket(Command command, int timeout)
throws JMSException {
try {
Response response = (Response)this.transport.request(command, timeout);
if (response != null && response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
} else {
throw JMSExceptionSupport.create(er.getException());
}
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
/** /**
* @return statistics for this Connection * @return statistics for this Connection
*/ */

View File

@ -139,7 +139,6 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
private long timeStamp; private long timeStamp;
private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean transportDisposed = new AtomicBoolean(); private final AtomicBoolean transportDisposed = new AtomicBoolean();
private final AtomicBoolean disposed = new AtomicBoolean(false);
private CountDownLatch stopLatch = new CountDownLatch(1); private CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean asyncException = new AtomicBoolean(false); private final AtomicBoolean asyncException = new AtomicBoolean(false);
private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
@ -198,7 +197,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
public void serviceTransportException(IOException e) { public void serviceTransportException(IOException e) {
if (!disposed.get()) { if (!stopped.get()) {
transportException.set(e); transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) { if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug("Transport failed: " + e, e); TRANSPORTLOG.debug("Transport failed: " + e, e);
@ -240,7 +239,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
// Handle the case where the broker is stopped // Handle the case where the broker is stopped
// But the client is still connected. // But the client is still connected.
if (!disposed.get()) { if (!stopped.get()) {
if (SERVICELOG.isDebugEnabled()) { if (SERVICELOG.isDebugEnabled()) {
SERVICELOG SERVICELOG
.debug("Broker has been stopped. Notifying client and closing his connection."); .debug("Broker has been stopped. Notifying client and closing his connection.");
@ -260,7 +259,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
// notification gets to him. // notification gets to him.
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
} }
} else if (!disposed.get() && !inServiceException) { } else if (!stopped.get() && !inServiceException) {
inServiceException = true; inServiceException = true;
try { try {
SERVICELOG.error("Async error occurred: " + e, e); SERVICELOG.error("Async error occurred: " + e, e);
@ -669,11 +668,12 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
public Response processRemoveConnection(ConnectionId id) { public Response processRemoveConnection(ConnectionId id) throws InterruptedException {
TransportConnectionState cs = lookupConnectionState(id); TransportConnectionState cs = lookupConnectionState(id);
// Don't allow things to be added to the connection state while we are // Don't allow things to be added to the connection state while we are
// shutting down. // shutting down.
cs.shutdown(); cs.shutdown();
// Cascade the connection stop to the sessions. // Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
SessionId sessionId = (SessionId)iter.next(); SessionId sessionId = (SessionId)iter.next();
@ -731,7 +731,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
public void dispatchAsync(Command message) { public void dispatchAsync(Command message) {
if (!disposed.get()) { if (!stopped.get()) {
getStatistics().getEnqueues().increment(); getStatistics().getEnqueues().increment();
if (taskRunner == null) { if (taskRunner == null) {
dispatchSync(message); dispatchSync(message);
@ -759,7 +759,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch() final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch()
? command : null); ? command : null);
try { try {
if (!disposed.get()) { if (!stopped.get()) {
if (messageDispatch != null) { if (messageDispatch != null) {
broker.preProcessDispatch(messageDispatch); broker.preProcessDispatch(messageDispatch);
} }
@ -779,7 +779,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
public boolean iterate() { public boolean iterate() {
try { try {
if (disposed.get()) { if (stopped.get()) {
if (dispatchStopped.compareAndSet(false, true)) { if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) { if (transportException.get() == null) {
try { try {
@ -901,64 +901,67 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} catch (Exception ignore) { } catch (Exception ignore) {
LOG.trace("Exception caught stopping", ignore); LOG.trace("Exception caught stopping", ignore);
} }
if (disposed.compareAndSet(false, true)) {
// Let all the connection contexts know we are shutting down // Let all the connection contexts know we are shutting down
// so that in progress operations can notice and unblock. // so that in progress operations can notice and unblock.
List<TransportConnectionState> connectionStates = listConnectionStates(); List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
}
if (taskRunner != null) {
taskRunner.wakeup();
// Give it a change to stop gracefully.
dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
}
try {
transport.stop();
LOG.debug("Stopped connection: " + transport.getRemoteAddress());
} catch (Exception e) {
LOG.debug("Could not stop transport: " + e, e);
}
if (taskRunner != null) {
taskRunner.shutdown();
}
active = false;
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = iter.next();
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch)command;
Runnable sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
sub.run();
}
}
}
//
// Remove all logical connection associated with this connection
// from the broker.
if (!broker.isStopped()) {
connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) { for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true); cs.getContext().getStopping().set(true);
} try {
LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
if (taskRunner != null) { processRemoveConnection(cs.getInfo().getConnectionId());
taskRunner.wakeup(); } catch (Throwable ignore) {
// Give it a change to stop gracefully. ignore.printStackTrace();
dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
disposeTransport();
taskRunner.shutdown();
} else {
disposeTransport();
}
if (taskRunner != null) {
taskRunner.shutdown();
}
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = iter.next();
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch)command;
Runnable sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
sub.run();
}
} }
} }
//
// Remove all logical connection associated with this connection
// from the broker.
if (!broker.isStopped()) { if (brokerInfo != null) {
connectionStates = listConnectionStates(); broker.removeBroker(this, brokerInfo);
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
try {
LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
processRemoveConnection(cs.getInfo().getConnectionId());
} catch (Throwable ignore) {
ignore.printStackTrace();
}
}
if (brokerInfo != null) {
broker.removeBroker(this, brokerInfo);
}
} }
LOG.debug("Connection Stopped: " + getRemoteAddress());
} }
LOG.debug("Connection Stopped: " + getRemoteAddress());
} }
/** /**
@ -1233,18 +1236,6 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
} }
protected void disposeTransport() {
if (transportDisposed.compareAndSet(false, true)) {
try {
transport.stop();
active = false;
LOG.debug("Stopped connection: " + transport.getRemoteAddress());
} catch (Exception e) {
LOG.debug("Could not stop transport: " + e, e);
}
}
}
public int getProtocolVersion() { public int getProtocolVersion() {
return protocolVersion.get(); return protocolVersion.get();
} }