diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 2487b77f1a..8c49ee8f4e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -72,7 +72,6 @@ import org.apache.activemq.management.JMSSessionStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; import org.apache.activemq.thread.Scheduler; -import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.Callback; import org.apache.activemq.util.LongSequenceGenerator; @@ -198,7 +197,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta protected boolean closed; protected boolean asyncDispatch; protected boolean sessionAsyncDispatch; - protected TaskRunner taskRunner; /** * Construct the Session diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index efc0285b8b..5411a42e92 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -61,11 +61,16 @@ public class ActiveMQSessionExecutor implements Task { } private void wakeup() { - if( taskRunner!=null && !dispatchedBySessionPool && hasUncomsumedMessages() ) { - try { - taskRunner.wakeup(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if( !dispatchedBySessionPool && hasUncomsumedMessages() ) { + if( taskRunner!=null ) { + try { + taskRunner.wakeup(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + while( iterate() ) + ; } } } @@ -79,12 +84,6 @@ public class ActiveMQSessionExecutor implements Task { return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); } - /** - * implementation of Runnable - */ - public void run() { - } - void dispatch(MessageDispatch message){ // TODO - we should use a Map for this indexed by consumerId @@ -101,7 +100,9 @@ public class ActiveMQSessionExecutor implements Task { synchronized void start() { if( !messageQueue.isRunning() ) { messageQueue.start(); - taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId()); + if( session.isSessionAsyncDispatch() || dispatchedBySessionPool ) { + taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId()); + } wakeup(); } } @@ -110,7 +111,10 @@ public class ActiveMQSessionExecutor implements Task { try { if( messageQueue.isRunning() ) { messageQueue.stop(); - taskRunner.shutdown(); + if( taskRunner!=null ) { + taskRunner.shutdown(); + taskRunner=null; + } } } catch (InterruptedException e) { throw JMSExceptionSupport.create(e); @@ -147,7 +151,7 @@ public class ActiveMQSessionExecutor implements Task { return false; } else { dispatch(message); - return messageQueue.isRunning(); + return !messageQueue.isEmpty(); } }