mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-1112 - resolve issues with optimizedDispatch, no need for message expiry to respect optimizeDispatch - iterate can always use the task runner in the expiry case, similar to usage change
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@812214 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c1051c8ee1
commit
8d9f47aa48
|
@ -1266,7 +1266,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
} catch (IOException e) {
|
||||
LOG.error("Failed to remove expired Message from the store ",e);
|
||||
}
|
||||
wakeup();
|
||||
asyncWakeup();
|
||||
}
|
||||
|
||||
protected ConnectionContext createConnectionContext() {
|
||||
|
@ -1297,15 +1297,18 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
public void wakeup() {
|
||||
if (optimizedDispatch || isSlave()) {
|
||||
iterate();
|
||||
}else {
|
||||
try {
|
||||
taskRunner.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Task Runner failed to wakeup ", e);
|
||||
}
|
||||
} else {
|
||||
asyncWakeup();
|
||||
}
|
||||
}
|
||||
|
||||
public void asyncWakeup() {
|
||||
try {
|
||||
this.taskRunner.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Async task tunner failed to wakeup ", e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isSlave() {
|
||||
return broker.getBrokerService().isSlave();
|
||||
|
@ -1371,6 +1374,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
|
||||
private void doDispatch(List<QueueMessageReference> list) throws Exception {
|
||||
boolean doWakeUp = false;
|
||||
synchronized(dispatchMutex) {
|
||||
|
||||
synchronized (pagedInPendingDispatch) {
|
||||
|
@ -1391,11 +1395,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
pagedInPendingDispatch.add(qmr);
|
||||
}
|
||||
}
|
||||
wakeup();
|
||||
doWakeUp = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (doWakeUp) {
|
||||
wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1603,11 +1610,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
|
||||
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
|
||||
if (oldPercentUsage > newPercentUsage) {
|
||||
try {
|
||||
this.taskRunner.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn(getName() + " failed to wakeup task runner on usageChange: " + e);
|
||||
}
|
||||
asyncWakeup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
// TODO Optimize dispatch makes this test hang
|
||||
//defaultEntry.setOptimizedDispatch(true);
|
||||
defaultEntry.setOptimizedDispatch(true);
|
||||
defaultEntry.setExpireMessagesPeriod(100);
|
||||
defaultEntry.setMaxExpirePageSize(800);
|
||||
|
||||
|
|
Loading…
Reference in New Issue