From 372f69aba9c7f9cf7ea25dbc19865c48c632583c Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 17 Mar 2008 13:28:06 +0000 Subject: [PATCH] tightened synchronization around dispatchQueue git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637878 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index e832acbef3..9f7af7736c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -112,7 +112,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { protected final Map brokerConnectionStates; // The broker and wireformat info that was exchanged. protected BrokerInfo brokerInfo; - protected final List dispatchQueue = Collections.synchronizedList(new LinkedList()); + protected final List dispatchQueue = new LinkedList(); protected TaskRunner taskRunner; protected final AtomicReference transportException = new AtomicReference(); protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); @@ -205,7 +205,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { * @return size of dispatch queue */ public int getDispatchQueueSize() { - return dispatchQueue.size(); + synchronized(dispatchQueue) { + return dispatchQueue.size(); + } } public void serviceTransportException(IOException e) { @@ -743,7 +745,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { if (taskRunner == null) { dispatchSync(message); } else { - dispatchQueue.add(message); + synchronized(dispatchQueue) { + dispatchQueue.add(message); + } try { taskRunner.wakeup(); } catch (InterruptedException e) { @@ -780,7 +784,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { sub.run(); } } - getStatistics().getDequeues().increment(); + //getStatistics().getDequeues().increment(); } } @@ -800,11 +804,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } if (!dispatchStopped.get()) { - - if (dispatchQueue.isEmpty()) { - return false; + Command command = null; + synchronized(dispatchQueue) { + if (dispatchQueue.isEmpty()) { + return false; + } + command = dispatchQueue.remove(0); } - Command command = dispatchQueue.remove(0); processDispatch(command); return true; } @@ -968,16 +974,19 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // Run the MessageDispatch callbacks so that message references get // cleaned up. - for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) { - Command command = iter.next(); - if (command.isMessageDispatch()) { - MessageDispatch md = (MessageDispatch)command; - Runnable sub = md.getTransmitCallback(); - broker.postProcessDispatch(md); - if (sub != null) { - sub.run(); + synchronized(dispatchQueue) { + for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) { + Command command = iter.next(); + if (command.isMessageDispatch()) { + MessageDispatch md = (MessageDispatch)command; + Runnable sub = md.getTransmitCallback(); + broker.postProcessDispatch(md); + if (sub != null) { + sub.run(); + } } } + dispatchQueue.clear(); } // // Remove all logical connection associated with this connection