tightened synchronization around dispatchQueue

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637878 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-17 13:28:06 +00:00
parent 0ff0ee3766
commit 372f69aba9
1 changed files with 25 additions and 16 deletions

View File

@ -112,7 +112,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
// The broker and wireformat info that was exchanged. // The broker and wireformat info that was exchanged.
protected BrokerInfo brokerInfo; protected BrokerInfo brokerInfo;
protected final List<Command> dispatchQueue = Collections.synchronizedList(new LinkedList<Command>()); protected final List<Command> dispatchQueue = new LinkedList<Command>();
protected TaskRunner taskRunner; protected TaskRunner taskRunner;
protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>(); protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
@ -205,7 +205,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
* @return size of dispatch queue * @return size of dispatch queue
*/ */
public int getDispatchQueueSize() { public int getDispatchQueueSize() {
return dispatchQueue.size(); synchronized(dispatchQueue) {
return dispatchQueue.size();
}
} }
public void serviceTransportException(IOException e) { public void serviceTransportException(IOException e) {
@ -743,7 +745,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if (taskRunner == null) { if (taskRunner == null) {
dispatchSync(message); dispatchSync(message);
} else { } else {
dispatchQueue.add(message); synchronized(dispatchQueue) {
dispatchQueue.add(message);
}
try { try {
taskRunner.wakeup(); taskRunner.wakeup();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -780,7 +784,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
sub.run(); sub.run();
} }
} }
getStatistics().getDequeues().increment(); //getStatistics().getDequeues().increment();
} }
} }
@ -800,11 +804,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
if (!dispatchStopped.get()) { if (!dispatchStopped.get()) {
Command command = null;
if (dispatchQueue.isEmpty()) { synchronized(dispatchQueue) {
return false; if (dispatchQueue.isEmpty()) {
return false;
}
command = dispatchQueue.remove(0);
} }
Command command = dispatchQueue.remove(0);
processDispatch(command); processDispatch(command);
return true; return true;
} }
@ -968,16 +974,19 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// Run the MessageDispatch callbacks so that message references get // Run the MessageDispatch callbacks so that message references get
// cleaned up. // cleaned up.
for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) { synchronized(dispatchQueue) {
Command command = iter.next(); for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
if (command.isMessageDispatch()) { Command command = iter.next();
MessageDispatch md = (MessageDispatch)command; if (command.isMessageDispatch()) {
Runnable sub = md.getTransmitCallback(); MessageDispatch md = (MessageDispatch)command;
broker.postProcessDispatch(md); Runnable sub = md.getTransmitCallback();
if (sub != null) { broker.postProcessDispatch(md);
sub.run(); if (sub != null) {
sub.run();
}
} }
} }
dispatchQueue.clear();
} }
// //
// Remove all logical connection associated with this connection // Remove all logical connection associated with this connection