AMQ-4181 - revert mod to testQueueBrowserWith2ConsumersInterleaved which cause intermittent ci failure - browse is a snapshot at time of creation. tidy up some gaps in pagein logic sync

This commit is contained in:
gtully 2016-05-23 13:29:25 +01:00
parent b1d8e66eaf
commit b4e35fe8a3
2 changed files with 23 additions and 23 deletions

View File

@ -410,14 +410,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
browser.incrementQueueRef(); browser.incrementQueueRef();
} }
void done() {
try {
browser.decrementQueueRef();
} catch (Exception e) {
LOG.warn("decrement ref on browser: " + browser, e);
}
}
public QueueBrowserSubscription getBrowser() { public QueueBrowserSubscription getBrowser() {
return browser; return browser;
} }
@ -1602,12 +1594,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInPendingDispatchLock.readLock().unlock(); pagedInPendingDispatchLock.readLock().unlock();
} }
// Perhaps we should page always into the pagedInPendingDispatch boolean hasBrowsers = !browserDispatches.isEmpty();
// list if
// !messages.isEmpty(), and then if
// !pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
boolean hasBrowsers = browserDispatches.size() > 0;
if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
try { try {
@ -1618,12 +1605,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
if (hasBrowsers) { if (hasBrowsers) {
PendingList alreadyDispatchedMessages = isPrioritizedMessages() ? PendingList messagesInMemory = isPrioritizedMessages() ?
new PrioritizedPendingList() : new OrderedPendingList(); new PrioritizedPendingList() : new OrderedPendingList();
pagedInMessagesLock.readLock().lock(); pagedInMessagesLock.readLock().lock();
try{ try {
alreadyDispatchedMessages.addAll(pagedInMessages); messagesInMemory.addAll(pagedInMessages);
}finally { } finally {
pagedInMessagesLock.readLock().unlock(); pagedInMessagesLock.readLock().unlock();
} }
@ -1636,9 +1623,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
QueueBrowserSubscription browser = browserDispatch.getBrowser(); QueueBrowserSubscription browser = browserDispatch.getBrowser();
LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size());
boolean added = false; boolean added = false;
for (MessageReference node : alreadyDispatchedMessages) { for (MessageReference node : messagesInMemory) {
if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
msgContext.setMessageReference(node); msgContext.setMessageReference(node);
if (browser.matches(node, msgContext)) { if (browser.matches(node, msgContext)) {
@ -1902,7 +1889,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
List<QueueMessageReference> result = null; List<QueueMessageReference> result = null;
PendingList resultList = null; PendingList resultList = null;
int toPageIn = Math.min(maxPageSize, messages.size()); int toPageIn = maxPageSize;
messagesLock.readLock().lock();
try {
toPageIn = Math.min(toPageIn, messages.size());
} finally {
messagesLock.readLock().unlock();
}
int pagedInPendingSize = 0; int pagedInPendingSize = 0;
pagedInPendingDispatchLock.readLock().lock(); pagedInPendingDispatchLock.readLock().lock();
try { try {
@ -1913,7 +1906,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (isLazyDispatch() && !force) { if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be // Only page in the minimum number of messages which can be
// dispatched immediately. // dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull());
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -265,7 +265,9 @@ public class BrokerTest extends BrokerTestSupport {
messages.add(m1); messages.add(m1);
} }
for (int i = 0; i < 4; i++) { // a browse is a snapshot - only guarantee to see messages produced before
// the browser
for (int i = 0; i < 1; i++) {
Message m1 = messages.get(i); Message m1 = messages.get(i);
Message m2 = receiveMessage(connection2); Message m2 = receiveMessage(connection2);
assertNotNull("m2 is null for index: " + i, m2); assertNotNull("m2 is null for index: " + i, m2);
@ -275,6 +277,11 @@ public class BrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection1); assertNoMessagesLeft(connection1);
assertNoMessagesLeft(connection2); assertNoMessagesLeft(connection2);
connection1.request(closeConnectionInfo(connectionInfo1));
connection1.stop();
connection2.request(closeConnectionInfo(connectionInfo2));
connection2.stop();
} }