https://issues.apache.org/jira/browse/AMQ-5266 - fix edge case with optimizedDispatch=true where a single message could be pending till the next page in event

This commit is contained in:
gtully 2014-09-11 16:59:50 +01:00
parent 26807cd452
commit 5861d86ad3
2 changed files with 20 additions and 15 deletions

View File

@ -781,12 +781,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
sendLock.unlock();
}
for (MessageContext messageContext : orderedUpdates) {
if (!messageContext.duplicate) {
messageSent(messageContext.context, messageContext.message);
}
if (messageContext.onCompletion != null) {
messageContext.onCompletion.run();
}
if (!messageContext.duplicate) {
messageSent(messageContext.context, messageContext.message);
}
}
orderedUpdates.clear();
}

View File

@ -87,22 +87,26 @@ public class AMQ5266Test {
@Parameterized.Parameter(5)
public boolean useDefaultStore = false;
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5}")
@Parameterized.Parameter(6)
public boolean optimizeDispatch = false;
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
// jdbc
{1000, 20, 5, 50*1024, true, false},
{100, 20, 5, 50*1024, false, false},
{1000, 5, 20, 50*1024, true, false},
{1000, 20, 20, 1024*1024, true, false},
{1000, 100, 100, 1024*1024, true, false},
{1, 1, 1, 50*1024, false, false, true},
{1000, 20, 5, 50*1024, true, false, false},
{100, 20, 5, 50*1024, false, false, false},
{1000, 5, 20, 50*1024, true, false, false},
{1000, 20, 20, 1024*1024, true, false, false},
// default store
{1000, 20, 5, 50*1024, true, true},
{100, 20, 5, 50*1024, false, true},
{1000, 5, 20, 50*1024, true, true},
{1000, 20, 20, 1024*1024, true, true},
{1000, 100, 100, 1024*1024, true, true}
{1, 1, 1, 50*1024, false, true, true},
{100, 5, 5, 50*1024, false, true, false},
{1000, 20, 5, 50*1024, true, true, false},
{100, 20, 5, 50*1024, false, true, false},
{1000, 5, 20, 50*1024, true, true, false},
{1000, 20, 20, 1024*1024, true, true, false},
});
}
@ -127,6 +131,7 @@ public class AMQ5266Test {
kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
}
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setUseJmx(false);
PolicyMap policyMap = new PolicyMap();
@ -136,7 +141,7 @@ public class AMQ5266Test {
defaultEntry.setEnableAudit(true);
defaultEntry.setUseCache(useCache);
defaultEntry.setMaxPageSize(1000);
defaultEntry.setOptimizedDispatch(false);
defaultEntry.setOptimizedDispatch(optimizeDispatch);
defaultEntry.setMemoryLimit(destMemoryLimit);
defaultEntry.setExpireMessagesPeriod(0);
policyMap.setDefaultEntry(defaultEntry);