mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3193 - rework of fix to always populate pagedInPending
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1075846 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
645037e539
commit
462efdb9de
|
@ -1437,23 +1437,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
} catch (Throwable e) {
|
||||
LOG.error("Failed to page in more queue messages ", e);
|
||||
}
|
||||
} else {
|
||||
// if there are already paged messages
|
||||
// dispatch them
|
||||
if (pagedInMessages.size() != 0) {
|
||||
pagedInMessagesLock.writeLock().lock();
|
||||
ArrayList paged = new ArrayList();
|
||||
try {
|
||||
paged.addAll(pagedInMessages.values());
|
||||
} finally {
|
||||
pagedInMessagesLock.writeLock().unlock();
|
||||
}
|
||||
try {
|
||||
doDispatch(paged);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to dispatch already paged messages ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pendingBrowserDispatch != null) {
|
||||
|
@ -1671,7 +1654,25 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
return broker.getBrokerService().isSlave();
|
||||
}
|
||||
|
||||
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
|
||||
private void doPageIn(boolean force) throws Exception {
|
||||
List<QueueMessageReference> newlyPaged = doPageInForDispatch(force);
|
||||
pagedInPendingDispatchLock.writeLock().lock();
|
||||
try {
|
||||
if (pagedInPendingDispatch.isEmpty()) {
|
||||
pagedInPendingDispatch.addAll(newlyPaged);
|
||||
} else {
|
||||
for (QueueMessageReference qmr : newlyPaged) {
|
||||
if (!pagedInPendingDispatch.contains(qmr)) {
|
||||
pagedInPendingDispatch.add(qmr);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
pagedInPendingDispatchLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private List<QueueMessageReference> doPageInForDispatch(boolean force) throws Exception {
|
||||
List<QueueMessageReference> result = null;
|
||||
List<QueueMessageReference> resultList = null;
|
||||
|
||||
|
@ -1922,7 +1923,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
|
||||
protected void pageInMessages(boolean force) throws Exception {
|
||||
doDispatch(doPageIn(force));
|
||||
doDispatch(doPageInForDispatch(force));
|
||||
}
|
||||
|
||||
private void addToConsumerList(Subscription sub) {
|
||||
|
|
Loading…
Reference in New Issue