Concurrent read / write to the list is not protected, instead a read lock around both access points is used.

Switch to using a ConcurrentLinkQueue instead and remove the read lock around that code.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1142245 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-07-02 16:37:14 +00:00
parent 7be63f21b1
commit f24b4f7569
1 changed files with 7 additions and 27 deletions

View File

@ -25,11 +25,11 @@ import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
@ -58,11 +58,9 @@ import org.apache.activemq.command.*;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
@ -349,7 +347,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>(); ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -403,13 +401,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (sub instanceof QueueBrowserSubscription) { if (sub instanceof QueueBrowserSubscription) {
// tee up for dispatch in next iterate // tee up for dispatch in next iterate
QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
pagedInMessagesLock.readLock().lock();
try{
BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
browserDispatches.addLast(browserDispatch); browserDispatches.add(browserDispatch);
}finally {
pagedInMessagesLock.readLock().unlock();
}
} }
if (!(this.optimizedDispatch || isSlave())) { if (!(this.optimizedDispatch || isSlave())) {
@ -1357,19 +1350,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return movedCounter; return movedCounter;
} }
BrowserDispatch getNextBrowserDispatch() {
pagedInMessagesLock.readLock().lock();
try{
if (browserDispatches.isEmpty()) {
return null;
}
return browserDispatches.removeFirst();
}finally {
pagedInMessagesLock.readLock().unlock();
}
}
/** /**
* @return true if we would like to iterate again * @return true if we would like to iterate again
* @see org.apache.activemq.thread.Task#iterate() * @see org.apache.activemq.thread.Task#iterate()
@ -1425,7 +1405,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
} }
BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch(); BrowserDispatch pendingBrowserDispatch = browserDispatches.poll();
messagesLock.readLock().lock(); messagesLock.readLock().lock();
try{ try{
@ -1486,7 +1466,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e); LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
} }
} while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null); } while ((pendingBrowserDispatch = browserDispatches.poll()) != null);
} }
if (pendingWakeups.get() > 0) { if (pendingWakeups.get() > 0) {
@ -2074,7 +2054,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return sub; return sub;
} }
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage) { if (oldPercentUsage > newPercentUsage) {
asyncWakeup(); asyncWakeup();
} }