mirror of https://github.com/apache/activemq.git
AMQ-2028 fix thread safety problem in ActiveMQSessionExecutor
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@725020 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9e6f62b7c2
commit
577eae65fd
|
@ -17,9 +17,7 @@
|
|||
|
||||
package org.apache.activemq;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
|
@ -44,9 +42,8 @@ public class ActiveMQSessionExecutor implements Task {
|
|||
private ActiveMQSession session;
|
||||
private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
|
||||
private boolean dispatchedBySessionPool;
|
||||
private TaskRunner taskRunner;
|
||||
private volatile TaskRunner taskRunner;
|
||||
private boolean startedOrWarnedThatNotStarted;
|
||||
private AtomicBoolean taskRunnerCreated = new AtomicBoolean();
|
||||
|
||||
ActiveMQSessionExecutor(ActiveMQSession session) {
|
||||
this.session = session;
|
||||
|
@ -90,10 +87,14 @@ public class ActiveMQSessionExecutor implements Task {
|
|||
if (!dispatchedBySessionPool) {
|
||||
if (session.isSessionAsyncDispatch()) {
|
||||
try {
|
||||
if (taskRunnerCreated.compareAndSet(false, true)) {
|
||||
if (taskRunner == null) {
|
||||
taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
|
||||
"ActiveMQ Session: " + session.getSessionId());
|
||||
TaskRunner taskRunner = this.taskRunner;
|
||||
if (taskRunner == null) {
|
||||
synchronized (this) {
|
||||
if (this.taskRunner == null) {
|
||||
this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
|
||||
"ActiveMQ Session: " + session.getSessionId());
|
||||
}
|
||||
taskRunner = this.taskRunner;
|
||||
}
|
||||
}
|
||||
taskRunner.wakeup();
|
||||
|
@ -120,8 +121,7 @@ public class ActiveMQSessionExecutor implements Task {
|
|||
|
||||
// TODO - we should use a Map for this indexed by consumerId
|
||||
|
||||
for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
|
||||
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
|
||||
for (ActiveMQMessageConsumer consumer : this.session.consumers) {
|
||||
ConsumerId consumerId = message.getConsumerId();
|
||||
if (consumerId.equals(consumer.getConsumerId())) {
|
||||
consumer.dispatch(message);
|
||||
|
@ -143,10 +143,10 @@ public class ActiveMQSessionExecutor implements Task {
|
|||
try {
|
||||
if (messageQueue.isRunning()) {
|
||||
messageQueue.stop();
|
||||
TaskRunner taskRunner = this.taskRunner;
|
||||
if (taskRunner != null) {
|
||||
this.taskRunner = null;
|
||||
taskRunner.shutdown();
|
||||
taskRunner = null;
|
||||
taskRunnerCreated.set(false);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -168,7 +168,7 @@ public class ActiveMQSessionExecutor implements Task {
|
|||
}
|
||||
|
||||
MessageDispatch dequeueNoWait() {
|
||||
return (MessageDispatch)messageQueue.dequeueNoWait();
|
||||
return messageQueue.dequeueNoWait();
|
||||
}
|
||||
|
||||
protected void clearMessagesInProgress() {
|
||||
|
@ -182,8 +182,7 @@ public class ActiveMQSessionExecutor implements Task {
|
|||
public boolean iterate() {
|
||||
|
||||
// Deliver any messages queued on the consumer to their listeners.
|
||||
for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
|
||||
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
|
||||
for (ActiveMQMessageConsumer consumer : this.session.consumers) {
|
||||
if (consumer.iterate()) {
|
||||
return true;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue