If sessionAsyncDispatch==false we do not need to create a session thread.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418285 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-06-30 13:48:30 +00:00
parent 2f7da7fa9b
commit 686db796b9
2 changed files with 18 additions and 16 deletions

View File

@ -72,7 +72,6 @@ import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback; import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
@ -198,7 +197,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected boolean closed; protected boolean closed;
protected boolean asyncDispatch; protected boolean asyncDispatch;
protected boolean sessionAsyncDispatch; protected boolean sessionAsyncDispatch;
protected TaskRunner taskRunner;
/** /**
* Construct the Session * Construct the Session

View File

@ -61,12 +61,17 @@ public class ActiveMQSessionExecutor implements Task {
} }
private void wakeup() { private void wakeup() {
if( taskRunner!=null && !dispatchedBySessionPool && hasUncomsumedMessages() ) { if( !dispatchedBySessionPool && hasUncomsumedMessages() ) {
if( taskRunner!=null ) {
try { try {
taskRunner.wakeup(); taskRunner.wakeup();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} else {
while( iterate() )
;
}
} }
} }
@ -79,12 +84,6 @@ public class ActiveMQSessionExecutor implements Task {
return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
} }
/**
* implementation of Runnable
*/
public void run() {
}
void dispatch(MessageDispatch message){ void dispatch(MessageDispatch message){
// TODO - we should use a Map for this indexed by consumerId // TODO - we should use a Map for this indexed by consumerId
@ -101,7 +100,9 @@ public class ActiveMQSessionExecutor implements Task {
synchronized void start() { synchronized void start() {
if( !messageQueue.isRunning() ) { if( !messageQueue.isRunning() ) {
messageQueue.start(); messageQueue.start();
if( session.isSessionAsyncDispatch() || dispatchedBySessionPool ) {
taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId()); taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId());
}
wakeup(); wakeup();
} }
} }
@ -110,7 +111,10 @@ public class ActiveMQSessionExecutor implements Task {
try { try {
if( messageQueue.isRunning() ) { if( messageQueue.isRunning() ) {
messageQueue.stop(); messageQueue.stop();
if( taskRunner!=null ) {
taskRunner.shutdown(); taskRunner.shutdown();
taskRunner=null;
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw JMSExceptionSupport.create(e); throw JMSExceptionSupport.create(e);
@ -147,7 +151,7 @@ public class ActiveMQSessionExecutor implements Task {
return false; return false;
} else { } else {
dispatch(message); dispatch(message);
return messageQueue.isRunning(); return !messageQueue.isEmpty();
} }
} }