From c14dc3a06fb4b25db3e69f232ea565a4ff161824 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 22 Jun 2010 15:20:49 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-2790 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@956918 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 15 + .../activemq/ActiveMQConnectionFactory.java | 17 ++ .../apache/activemq/ActiveMQInputStream.java | 8 +- .../activemq/ActiveMQMessageConsumer.java | 7 +- .../activemq/ActiveMQSessionExecutor.java | 11 +- .../activemq/FifoMessageDispatchChannel.java | 198 +++++++++++++ .../activemq/MessageDispatchChannel.java | 134 ++------- .../SimplePriorityMessageDispatchChannel.java | 273 ++++++++++++++++++ 8 files changed, 538 insertions(+), 125 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 9326ab59c5..b60ca4a013 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -191,6 +191,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected volatile CountDownLatch transportInterruptionProcessingComplete; private long consumerFailoverRedeliveryWaitPeriod; private final Scheduler scheduler; + private boolean messagePrioritySupported=true; /** * Construct an ActiveMQConnection @@ -1433,6 +1434,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void setAlwaysSyncSend(boolean alwaysSyncSend) { this.alwaysSyncSend = alwaysSyncSend; } + + /** + * @return the messagePrioritySupported + */ + public boolean isMessagePrioritySupported() { + return this.messagePrioritySupported; + } + + /** + * @param messagePrioritySupported the messagePrioritySupported to set + */ + public void setMessagePrioritySupported(boolean messagePrioritySupported) { + this.messagePrioritySupported = messagePrioritySupported; + } /** * Cleans up this connection so that it's state is as if the connection was diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index ebf22a387a..70e93b6bb0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -115,6 +115,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private long consumerFailoverRedeliveryWaitPeriod = 0; private boolean checkForDuplicates = true; private ClientInternalExceptionListener clientInternalExceptionListener; + private boolean messagePrioritySupported = true; // ///////////////////////////////////////////// // @@ -318,6 +319,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); connection.setCheckForDuplicates(isCheckForDuplicates()); + connection.setMessagePrioritySupported(isMessagePrioritySupported()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -583,6 +585,20 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public void setSendAcksAsync(boolean sendAcksAsync) { this.sendAcksAsync = sendAcksAsync; } + + /** + * @return the messagePrioritySupported + */ + public boolean isMessagePrioritySupported() { + return this.messagePrioritySupported; + } + + /** + * @param messagePrioritySupported the messagePrioritySupported to set + */ + public void setMessagePrioritySupported(boolean messagePrioritySupported) { + this.messagePrioritySupported = messagePrioritySupported; + } /** @@ -685,6 +701,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("auditDepth", Integer.toString(getAuditDepth())); props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); + props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); } public boolean isUseCompression() { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java index fb82b1f104..ed2659d804 100644 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java @@ -20,11 +20,9 @@ import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; - import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; - import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -47,7 +45,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch private final ActiveMQConnection connection; private final ConsumerInfo info; // These are the messages waiting to be delivered to the client - private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); + private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel(); private int deliveredCounter; private MessageDispatch lastDelivered; @@ -113,6 +111,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch unconsumedMessages.start(); } + @Override public void close() throws IOException { if (!unconsumedMessages.isClosed()) { try { @@ -172,6 +171,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch } } + @Override public int read() throws IOException { fillBuffer(); if (eosReached || buffer.length == 0) { @@ -181,6 +181,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch return buffer[pos++] & 0xff; } + @Override public int read(byte[] b, int off, int len) throws IOException { fillBuffer(); if (eosReached || buffer.length == 0) { @@ -241,6 +242,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch unconsumedMessages.enqueue(md); } + @Override public String toString() { return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index d601043556..9845e7895d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -114,7 +114,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC protected final ConsumerInfo info; // These are the messages waiting to be delivered to the client - protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); + protected final MessageDispatchChannel unconsumedMessages; // The are the messages that were delivered to the consumer but that have // not been acknowledged. It's kept in reverse order since we @@ -198,6 +198,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC throw new JMSException("Cannot have a prefetch size less than zero"); } } + if (session.connection.isMessagePrioritySupported()) { + this.unconsumedMessages = new SimplePriorityMessageDispatchChannel(); + }else { + this.unconsumedMessages = new FifoMessageDispatchChannel(); + } this.session = session; this.scheduler = session.getScheduler(); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index bfe0c635cc..0c129d21ba 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -18,9 +18,7 @@ package org.apache.activemq; import java.util.List; - import javax.jms.JMSException; - import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.thread.Task; @@ -39,14 +37,19 @@ import org.apache.commons.logging.LogFactory; public class ActiveMQSessionExecutor implements Task { private static final Log LOG = LogFactory.getLog(ActiveMQSessionExecutor.class); - private ActiveMQSession session; - private MessageDispatchChannel messageQueue = new MessageDispatchChannel(); + private final ActiveMQSession session; + private final MessageDispatchChannel messageQueue; private boolean dispatchedBySessionPool; private volatile TaskRunner taskRunner; private boolean startedOrWarnedThatNotStarted; ActiveMQSessionExecutor(ActiveMQSession session) { this.session = session; + if (this.session.connection.isMessagePrioritySupported()) { + this.messageQueue = new SimplePriorityMessageDispatchChannel(); + }else { + this.messageQueue = new FifoMessageDispatchChannel(); + } } void setDispatchedBySessionPool(boolean value) { diff --git a/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java b/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java new file mode 100644 index 0000000000..40fc5ab3ff --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import org.apache.activemq.command.MessageDispatch; + +public class FifoMessageDispatchChannel implements MessageDispatchChannel { + + private final Object mutex = new Object(); + private final LinkedList list; + private boolean closed; + private boolean running; + + public FifoMessageDispatchChannel() { + this.list = new LinkedList(); + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch) + */ + public void enqueue(MessageDispatch message) { + synchronized (mutex) { + list.addLast(message); + mutex.notify(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch) + */ + public void enqueueFirst(MessageDispatch message) { + synchronized (mutex) { + list.addFirst(message); + mutex.notify(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#isEmpty() + */ + public boolean isEmpty() { + synchronized (mutex) { + return list.isEmpty(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long) + */ + public MessageDispatch dequeue(long timeout) throws InterruptedException { + synchronized (mutex) { + // Wait until the consumer is ready to deliver messages. + while (timeout != 0 && !closed && (list.isEmpty() || !running)) { + if (timeout == -1) { + mutex.wait(); + } else { + mutex.wait(timeout); + break; + } + } + if (closed || !running || list.isEmpty()) { + return null; + } + return list.removeFirst(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait() + */ + public MessageDispatch dequeueNoWait() { + synchronized (mutex) { + if (closed || !running || list.isEmpty()) { + return null; + } + return list.removeFirst(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#peek() + */ + public MessageDispatch peek() { + synchronized (mutex) { + if (closed || !running || list.isEmpty()) { + return null; + } + return list.getFirst(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#start() + */ + public void start() { + synchronized (mutex) { + running = true; + mutex.notifyAll(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#stop() + */ + public void stop() { + synchronized (mutex) { + running = false; + mutex.notifyAll(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#close() + */ + public void close() { + synchronized (mutex) { + if (!closed) { + running = false; + closed = true; + } + mutex.notifyAll(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#clear() + */ + public void clear() { + synchronized (mutex) { + list.clear(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#isClosed() + */ + public boolean isClosed() { + return closed; + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#size() + */ + public int size() { + synchronized (mutex) { + return list.size(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#getMutex() + */ + public Object getMutex() { + return mutex; + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#isRunning() + */ + public boolean isRunning() { + return running; + } + + /* (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#removeAll() + */ + public List removeAll() { + synchronized (mutex) { + ArrayList rc = new ArrayList(list); + list.clear(); + return rc; + } + } + + @Override + public String toString() { + synchronized (mutex) { + return list.toString(); + } + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java b/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java index 37e516c7b9..d0354f3272 100755 --- a/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java @@ -16,44 +16,17 @@ */ package org.apache.activemq; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; - import javax.jms.JMSException; - import org.apache.activemq.command.MessageDispatch; -public class MessageDispatchChannel { +public interface MessageDispatchChannel { - private final Object mutex = new Object(); - private final LinkedList list; - private boolean closed; - private boolean running; + public abstract void enqueue(MessageDispatch message); - public MessageDispatchChannel() { - this.list = new LinkedList(); - } + public abstract void enqueueFirst(MessageDispatch message); - public void enqueue(MessageDispatch message) { - synchronized (mutex) { - list.addLast(message); - mutex.notify(); - } - } - - public void enqueueFirst(MessageDispatch message) { - synchronized (mutex) { - list.addFirst(message); - mutex.notify(); - } - } - - public boolean isEmpty() { - synchronized (mutex) { - return list.isEmpty(); - } - } + public abstract boolean isEmpty(); /** * Used to get an enqueued message. The amount of time this method blocks is @@ -67,101 +40,28 @@ public class MessageDispatchChannel { * @return null if we timeout or if the consumer is closed. * @throws InterruptedException */ - public MessageDispatch dequeue(long timeout) throws InterruptedException { - synchronized (mutex) { - // Wait until the consumer is ready to deliver messages. - while (timeout != 0 && !closed && (list.isEmpty() || !running)) { - if (timeout == -1) { - mutex.wait(); - } else { - mutex.wait(timeout); - break; - } - } - if (closed || !running || list.isEmpty()) { - return null; - } - return list.removeFirst(); - } - } + public abstract MessageDispatch dequeue(long timeout) throws InterruptedException; - public MessageDispatch dequeueNoWait() { - synchronized (mutex) { - if (closed || !running || list.isEmpty()) { - return null; - } - return list.removeFirst(); - } - } + public abstract MessageDispatch dequeueNoWait(); - public MessageDispatch peek() { - synchronized (mutex) { - if (closed || !running || list.isEmpty()) { - return null; - } - return list.getFirst(); - } - } + public abstract MessageDispatch peek(); - public void start() { - synchronized (mutex) { - running = true; - mutex.notifyAll(); - } - } + public abstract void start(); - public void stop() { - synchronized (mutex) { - running = false; - mutex.notifyAll(); - } - } + public abstract void stop(); - public void close() { - synchronized (mutex) { - if (!closed) { - running = false; - closed = true; - } - mutex.notifyAll(); - } - } + public abstract void close(); - public void clear() { - synchronized (mutex) { - list.clear(); - } - } + public abstract void clear(); - public boolean isClosed() { - return closed; - } + public abstract boolean isClosed(); - public int size() { - synchronized (mutex) { - return list.size(); - } - } + public abstract int size(); - public Object getMutex() { - return mutex; - } + public abstract Object getMutex(); - public boolean isRunning() { - return running; - } + public abstract boolean isRunning(); - public List removeAll() { - synchronized (mutex) { - ArrayList rc = new ArrayList(list); - list.clear(); - return rc; - } - } + public abstract List removeAll(); - public String toString() { - synchronized (mutex) { - return list.toString(); - } - } -} +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java b/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java new file mode 100644 index 0000000000..9a5cec8264 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import org.apache.activemq.command.MessageDispatch; + +public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel { + private static Integer MAX_PRIORITY = 10; + private final Object mutex = new Object(); + private final LinkedList[] lists; + private boolean closed; + private boolean running; + private int size = 0; + + public SimplePriorityMessageDispatchChannel() { + this.lists = new LinkedList[MAX_PRIORITY]; + for (int i = 0; i < MAX_PRIORITY; i++) { + lists[i] = new LinkedList(); + } + } + + /* + * (non-Javadoc) + * @see + * org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq + * .command.MessageDispatch) + */ + public void enqueue(MessageDispatch message) { + synchronized (mutex) { + getList(message).addLast(message); + + this.size++; + mutex.notify(); + } + } + + /* + * (non-Javadoc) + * @see + * org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq + * .command.MessageDispatch) + */ + public void enqueueFirst(MessageDispatch message) { + synchronized (mutex) { + getList(message).addFirst(message); + this.size++; + mutex.notify(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#isEmpty() + */ + public boolean isEmpty() { + // synchronized (mutex) { + return this.size == 0; + // } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long) + */ + public MessageDispatch dequeue(long timeout) throws InterruptedException { + synchronized (mutex) { + // Wait until the consumer is ready to deliver messages. + while (timeout != 0 && !closed && (isEmpty() || !running)) { + if (timeout == -1) { + mutex.wait(); + } else { + mutex.wait(timeout); + break; + } + } + if (closed || !running || isEmpty()) { + return null; + } + return removeFirst(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait() + */ + public MessageDispatch dequeueNoWait() { + synchronized (mutex) { + if (closed || !running || isEmpty()) { + return null; + } + return removeFirst(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#peek() + */ + public MessageDispatch peek() { + synchronized (mutex) { + if (closed || !running || isEmpty()) { + return null; + } + return getFirst(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#start() + */ + public void start() { + synchronized (mutex) { + running = true; + mutex.notifyAll(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#stop() + */ + public void stop() { + synchronized (mutex) { + running = false; + mutex.notifyAll(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#close() + */ + public void close() { + synchronized (mutex) { + if (!closed) { + running = false; + closed = true; + } + mutex.notifyAll(); + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#clear() + */ + public void clear() { + synchronized (mutex) { + for (int i = 0; i < MAX_PRIORITY; i++) { + lists[i].clear(); + } + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#isClosed() + */ + public boolean isClosed() { + return closed; + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#size() + */ + public int size() { + synchronized (mutex) { + return this.size; + } + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#getMutex() + */ + public Object getMutex() { + return mutex; + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#isRunning() + */ + public boolean isRunning() { + return running; + } + + /* + * (non-Javadoc) + * @see org.apache.activemq.MessageDispatchChannelI#removeAll() + */ + public List removeAll() { + + synchronized (mutex) { + ArrayList result = new ArrayList(size()); + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + List list = lists[i]; + result.addAll(list); + list.clear(); + } + return result; + } + } + + @Override + public String toString() { + + String result = ""; + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + result += i + ":{" + lists[i].toString() + "}"; + } + return result; + + } + + protected int getPriority(MessageDispatch message) { + int priority = Message.DEFAULT_PRIORITY; + if (message.getMessage() != null) { + Math.max(message.getMessage().getPriority(), 0); + priority = Math.min(priority, 9); + } + return priority; + } + + protected LinkedList getList(MessageDispatch md) { + return lists[getPriority(md)]; + } + + private final MessageDispatch removeFirst() { + if (this.size > 0) { + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + LinkedList list = lists[i]; + if (!list.isEmpty()) { + this.size--; + return list.removeFirst(); + } + } + } + return null; + } + + private final MessageDispatch getFirst() { + if (this.size > 0) { + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + LinkedList list = lists[i]; + if (!list.isEmpty()) { + return list.getFirst(); + } + } + } + return null; + } +}