An async error could cause a deadlock when using the VM transport since all it's operations are sync. The error handling is now done in an async thread to avoid the deadlock.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@394729 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-04-17 17:04:59 +00:00
parent 178f34bd9b
commit dcf7dea53f
1 changed files with 46 additions and 13 deletions

View File

@ -88,12 +88,19 @@ import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingDeque;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener { public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000); public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
private final Executor asyncConnectionThread;
private static final Log log = LogFactory.getLog(ActiveMQConnection.class); private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
private static final IdGenerator connectionIdGenerator = new IdGenerator(); private static final IdGenerator connectionIdGenerator = new IdGenerator();
@ -165,6 +172,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*/ */
protected ActiveMQConnection(Transport transport, JMSStatsImpl factoryStats) protected ActiveMQConnection(Transport transport, JMSStatsImpl factoryStats)
throws Exception { throws Exception {
// Configure a single threaded executor who's core thread can timeout if idle
asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, "Connection task");
}});
asyncConnectionThread.allowCoreThreadTimeOut(true);
this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId())); this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
this.info.setManageable(true); this.info.setManageable(true);
this.connectionSessionId = new SessionId(info.getConnectionId(), -1); this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
@ -1388,7 +1403,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/** /**
* @param command - the command to consume * @param command - the command to consume
*/ */
public void onCommand(Command command) { public void onCommand(final Command command) {
if (!closed.get() && command != null) { if (!closed.get() && command != null) {
if (command.isMessageDispatch()) { if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command; MessageDispatch md = (MessageDispatch) command;
@ -1416,7 +1431,13 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
onControlCommand((ControlCommand) command); onControlCommand((ControlCommand) command);
} }
else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
asyncConnectionThread.execute(new Runnable(){
public void run() {
onAsyncException(((ConnectionError)command).getException()); onAsyncException(((ConnectionError)command).getException());
}
});
new Thread("Async error worker") {
}.start();
}else if (command instanceof ConnectionControl){ }else if (command instanceof ConnectionControl){
onConnectionControl((ConnectionControl) command); onConnectionControl((ConnectionControl) command);
}else if (command instanceof ConsumerControl){ }else if (command instanceof ConsumerControl){
@ -1437,19 +1458,29 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void onAsyncException(Throwable error) { public void onAsyncException(Throwable error) {
if (!closed.get() && !closing.get()) { if (!closed.get() && !closing.get()) {
if (this.exceptionListener != null) { if (this.exceptionListener != null) {
if (!(error instanceof JMSException)) if (!(error instanceof JMSException))
error = JMSExceptionSupport.create(error); error = JMSExceptionSupport.create(error);
this.exceptionListener.onException((JMSException) error); final JMSException e = (JMSException) error;
asyncConnectionThread.execute(new Runnable(){
public void run() {
ActiveMQConnection.this.exceptionListener.onException(e);
}
});
} else { } else {
log.warn("Async exception with no exception listener: " + error, error); log.warn("Async exception with no exception listener: " + error, error);
} }
} }
} }
public void onException(IOException error) { public void onException(final IOException error) {
onAsyncException(error); onAsyncException(error);
asyncConnectionThread.execute(new Runnable(){
public void run() {
transportFailed(error); transportFailed(error);
ServiceSupport.dispose(this.transport); ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown(); brokerInfoReceived.countDown();
for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { for (Iterator iter = transportListeners.iterator(); iter.hasNext();) {
@ -1457,6 +1488,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
listener.onException(error); listener.onException(error);
} }
} }
});
}
public void transportInterupted() { public void transportInterupted() {
for (Iterator i = this.sessions.iterator(); i.hasNext();) { for (Iterator i = this.sessions.iterator(); i.hasNext();) {