mirror of https://github.com/apache/activemq.git
tightened up synchronization around dispatching
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@476100 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9dcd00823b
commit
3b28c7c3ec
|
@ -90,6 +90,7 @@ public class Queue implements Destination, Task {
|
|||
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
|
||||
private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
|
||||
private final Object exclusiveLockMutex = new Object();
|
||||
private final Object doDispatchMutex = new Object();
|
||||
private TaskRunner taskRunner;
|
||||
|
||||
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
|
||||
|
@ -513,7 +514,7 @@ public class Queue implements Destination, Task {
|
|||
return rc;
|
||||
}
|
||||
|
||||
MessageStore getMessageStore() {
|
||||
public MessageStore getMessageStore() {
|
||||
return store;
|
||||
}
|
||||
|
||||
|
@ -591,7 +592,7 @@ public class Queue implements Destination, Task {
|
|||
|
||||
public void purge() throws Exception {
|
||||
|
||||
doDispatch(doPageIn());
|
||||
pageInMessages();
|
||||
|
||||
synchronized (pagedInMessages) {
|
||||
ConnectionContext c = createConnectionContext();
|
||||
|
@ -652,7 +653,7 @@ public class Queue implements Destination, Task {
|
|||
* @return the number of messages removed
|
||||
*/
|
||||
public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
|
||||
doDispatch(doPageIn());
|
||||
pageInMessages();
|
||||
int counter = 0;
|
||||
synchronized (pagedInMessages) {
|
||||
ConnectionContext c = createConnectionContext();
|
||||
|
@ -701,7 +702,7 @@ public class Queue implements Destination, Task {
|
|||
* @return the number of messages copied
|
||||
*/
|
||||
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
|
||||
doDispatch(doPageIn());
|
||||
pageInMessages();
|
||||
int counter = 0;
|
||||
synchronized (pagedInMessages) {
|
||||
for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
|
||||
|
@ -751,7 +752,7 @@ public class Queue implements Destination, Task {
|
|||
* Moves the messages matching the given filter up to the maximum number of matched messages
|
||||
*/
|
||||
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
|
||||
doDispatch(doPageIn());
|
||||
pageInMessages();
|
||||
int counter = 0;
|
||||
synchronized (pagedInMessages) {
|
||||
for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
|
||||
|
@ -784,7 +785,7 @@ public class Queue implements Destination, Task {
|
|||
*/
|
||||
public boolean iterate(){
|
||||
try{
|
||||
doDispatch(doPageIn(false));
|
||||
pageInMessages(false);
|
||||
}catch(Exception e){
|
||||
log.error("Failed to page in more queue messages ",e);
|
||||
}
|
||||
|
@ -844,7 +845,7 @@ public class Queue implements Destination, Task {
|
|||
}
|
||||
destinationStatistics.getEnqueues().increment();
|
||||
destinationStatistics.getMessages().increment();
|
||||
doDispatch(doPageIn(false));
|
||||
pageInMessages(false);
|
||||
}
|
||||
|
||||
private List doPageIn() throws Exception{
|
||||
|
@ -895,6 +896,15 @@ public class Queue implements Destination, Task {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void pageInMessages() throws Exception{
|
||||
pageInMessages(true);
|
||||
}
|
||||
private void pageInMessages(boolean force) throws Exception{
|
||||
synchronized(doDispatchMutex) {
|
||||
doDispatch(doPageIn(force));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue