git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1487874 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-30 14:28:29 +00:00
parent 0c08d62fa3
commit c90322fff0
1 changed files with 37 additions and 14 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.command.MessageDispatch;
public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
@ -29,6 +30,7 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
private boolean running;
private int size = 0;
@SuppressWarnings("unchecked")
public SimplePriorityMessageDispatchChannel() {
this.lists = new LinkedList[MAX_PRIORITY];
for (int i = 0; i < MAX_PRIORITY; i++) {
@ -38,14 +40,13 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
* @see
* org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
* .command.MessageDispatch)
*
* @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
*/
@Override
public void enqueue(MessageDispatch message) {
synchronized (mutex) {
getList(message).addLast(message);
this.size++;
mutex.notify();
}
@ -53,10 +54,10 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
* @see
* org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
* .command.MessageDispatch)
*
* @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
*/
@Override
public void enqueueFirst(MessageDispatch message) {
synchronized (mutex) {
getList(message).addFirst(message);
@ -67,18 +68,20 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
*/
@Override
public boolean isEmpty() {
// synchronized (mutex) {
return this.size == 0;
// }
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
*/
@Override
public MessageDispatch dequeue(long timeout) throws InterruptedException {
synchronized (mutex) {
// Wait until the consumer is ready to deliver messages.
@ -99,8 +102,10 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
*/
@Override
public MessageDispatch dequeueNoWait() {
synchronized (mutex) {
if (closed || !running || isEmpty()) {
@ -112,8 +117,10 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#peek()
*/
@Override
public MessageDispatch peek() {
synchronized (mutex) {
if (closed || !running || isEmpty()) {
@ -125,8 +132,10 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#start()
*/
@Override
public void start() {
synchronized (mutex) {
running = true;
@ -136,8 +145,10 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#stop()
*/
@Override
public void stop() {
synchronized (mutex) {
running = false;
@ -147,8 +158,10 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#close()
*/
@Override
public void close() {
synchronized (mutex) {
if (!closed) {
@ -161,28 +174,35 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#clear()
*/
@Override
public void clear() {
synchronized (mutex) {
for (int i = 0; i < MAX_PRIORITY; i++) {
lists[i].clear();
}
this.size = 0;
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#isClosed()
*/
@Override
public boolean isClosed() {
return closed;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#size()
*/
@Override
public int size() {
synchronized (mutex) {
return this.size;
@ -191,26 +211,31 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#getMutex()
*/
@Override
public Object getMutex() {
return mutex;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#isRunning()
*/
@Override
public boolean isRunning() {
return running;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.MessageDispatchChannelI#removeAll()
*/
@Override
public List<MessageDispatch> removeAll() {
synchronized (mutex) {
ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
@ -225,20 +250,18 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
@Override
public String toString() {
String result = "";
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
result += i + ":{" + lists[i].toString() + "}";
}
return result;
}
protected int getPriority(MessageDispatch message) {
int priority = javax.jms.Message.DEFAULT_PRIORITY;
if (message.getMessage() != null) {
priority = Math.max(message.getMessage().getPriority(), 0);
priority = Math.min(priority, 9);
priority = Math.max(message.getMessage().getPriority(), 0);
priority = Math.min(priority, 9);
}
return priority;
}