git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@956918 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-06-22 15:20:49 +00:00
parent 9ae41192a9
commit c14dc3a06f
8 changed files with 538 additions and 125 deletions

View File

@ -191,6 +191,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected volatile CountDownLatch transportInterruptionProcessingComplete;
private long consumerFailoverRedeliveryWaitPeriod;
private final Scheduler scheduler;
private boolean messagePrioritySupported=true;
/**
* Construct an <code>ActiveMQConnection</code>
@ -1434,6 +1435,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.alwaysSyncSend = alwaysSyncSend;
}
/**
* @return the messagePrioritySupported
*/
public boolean isMessagePrioritySupported() {
return this.messagePrioritySupported;
}
/**
* @param messagePrioritySupported the messagePrioritySupported to set
*/
public void setMessagePrioritySupported(boolean messagePrioritySupported) {
this.messagePrioritySupported = messagePrioritySupported;
}
/**
* Cleans up this connection so that it's state is as if the connection was
* just created. This allows the Resource Adapter to clean up a connection

View File

@ -115,6 +115,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private long consumerFailoverRedeliveryWaitPeriod = 0;
private boolean checkForDuplicates = true;
private ClientInternalExceptionListener clientInternalExceptionListener;
private boolean messagePrioritySupported = true;
// /////////////////////////////////////////////
//
@ -318,6 +319,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
connection.setCheckForDuplicates(isCheckForDuplicates());
connection.setMessagePrioritySupported(isMessagePrioritySupported());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@ -584,6 +586,20 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
this.sendAcksAsync = sendAcksAsync;
}
/**
* @return the messagePrioritySupported
*/
public boolean isMessagePrioritySupported() {
return this.messagePrioritySupported;
}
/**
* @param messagePrioritySupported the messagePrioritySupported to set
*/
public void setMessagePrioritySupported(boolean messagePrioritySupported) {
this.messagePrioritySupported = messagePrioritySupported;
}
/**
* Sets the transformer used to transform messages before they are sent on
@ -685,6 +701,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
}
public boolean isUseCompression() {

View File

@ -20,11 +20,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -47,7 +45,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
private final ActiveMQConnection connection;
private final ConsumerInfo info;
// These are the messages waiting to be delivered to the client
private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
private int deliveredCounter;
private MessageDispatch lastDelivered;
@ -113,6 +111,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
unconsumedMessages.start();
}
@Override
public void close() throws IOException {
if (!unconsumedMessages.isClosed()) {
try {
@ -172,6 +171,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
}
}
@Override
public int read() throws IOException {
fillBuffer();
if (eosReached || buffer.length == 0) {
@ -181,6 +181,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
return buffer[pos++] & 0xff;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
fillBuffer();
if (eosReached || buffer.length == 0) {
@ -241,6 +242,7 @@ public class ActiveMQInputStream extends InputStream implements ActiveMQDispatch
unconsumedMessages.enqueue(md);
}
@Override
public String toString() {
return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
}

View File

@ -114,7 +114,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
protected final ConsumerInfo info;
// These are the messages waiting to be delivered to the client
protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
protected final MessageDispatchChannel unconsumedMessages;
// The are the messages that were delivered to the consumer but that have
// not been acknowledged. It's kept in reverse order since we
@ -198,6 +198,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
throw new JMSException("Cannot have a prefetch size less than zero");
}
}
if (session.connection.isMessagePrioritySupported()) {
this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
}else {
this.unconsumedMessages = new FifoMessageDispatchChannel();
}
this.session = session;
this.scheduler = session.getScheduler();

View File

@ -18,9 +18,7 @@
package org.apache.activemq;
import java.util.List;
import javax.jms.JMSException;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.thread.Task;
@ -39,14 +37,19 @@ import org.apache.commons.logging.LogFactory;
public class ActiveMQSessionExecutor implements Task {
private static final Log LOG = LogFactory.getLog(ActiveMQSessionExecutor.class);
private ActiveMQSession session;
private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
private final ActiveMQSession session;
private final MessageDispatchChannel messageQueue;
private boolean dispatchedBySessionPool;
private volatile TaskRunner taskRunner;
private boolean startedOrWarnedThatNotStarted;
ActiveMQSessionExecutor(ActiveMQSession session) {
this.session = session;
if (this.session.connection.isMessagePrioritySupported()) {
this.messageQueue = new SimplePriorityMessageDispatchChannel();
}else {
this.messageQueue = new FifoMessageDispatchChannel();
}
}
void setDispatchedBySessionPool(boolean value) {

View File

@ -0,0 +1,198 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.command.MessageDispatch;
public class FifoMessageDispatchChannel implements MessageDispatchChannel {
private final Object mutex = new Object();
private final LinkedList<MessageDispatch> list;
private boolean closed;
private boolean running;
public FifoMessageDispatchChannel() {
this.list = new LinkedList<MessageDispatch>();
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
*/
public void enqueue(MessageDispatch message) {
synchronized (mutex) {
list.addLast(message);
mutex.notify();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
*/
public void enqueueFirst(MessageDispatch message) {
synchronized (mutex) {
list.addFirst(message);
mutex.notify();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
*/
public boolean isEmpty() {
synchronized (mutex) {
return list.isEmpty();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
*/
public MessageDispatch dequeue(long timeout) throws InterruptedException {
synchronized (mutex) {
// Wait until the consumer is ready to deliver messages.
while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
if (timeout == -1) {
mutex.wait();
} else {
mutex.wait(timeout);
break;
}
}
if (closed || !running || list.isEmpty()) {
return null;
}
return list.removeFirst();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
*/
public MessageDispatch dequeueNoWait() {
synchronized (mutex) {
if (closed || !running || list.isEmpty()) {
return null;
}
return list.removeFirst();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#peek()
*/
public MessageDispatch peek() {
synchronized (mutex) {
if (closed || !running || list.isEmpty()) {
return null;
}
return list.getFirst();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#start()
*/
public void start() {
synchronized (mutex) {
running = true;
mutex.notifyAll();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#stop()
*/
public void stop() {
synchronized (mutex) {
running = false;
mutex.notifyAll();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#close()
*/
public void close() {
synchronized (mutex) {
if (!closed) {
running = false;
closed = true;
}
mutex.notifyAll();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#clear()
*/
public void clear() {
synchronized (mutex) {
list.clear();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#isClosed()
*/
public boolean isClosed() {
return closed;
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#size()
*/
public int size() {
synchronized (mutex) {
return list.size();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#getMutex()
*/
public Object getMutex() {
return mutex;
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#isRunning()
*/
public boolean isRunning() {
return running;
}
/* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#removeAll()
*/
public List<MessageDispatch> removeAll() {
synchronized (mutex) {
ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
list.clear();
return rc;
}
}
@Override
public String toString() {
synchronized (mutex) {
return list.toString();
}
}
}

View File

@ -16,44 +16,17 @@
*/
package org.apache.activemq;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import javax.jms.JMSException;
import org.apache.activemq.command.MessageDispatch;
public class MessageDispatchChannel {
public interface MessageDispatchChannel {
private final Object mutex = new Object();
private final LinkedList<MessageDispatch> list;
private boolean closed;
private boolean running;
public abstract void enqueue(MessageDispatch message);
public MessageDispatchChannel() {
this.list = new LinkedList<MessageDispatch>();
}
public abstract void enqueueFirst(MessageDispatch message);
public void enqueue(MessageDispatch message) {
synchronized (mutex) {
list.addLast(message);
mutex.notify();
}
}
public void enqueueFirst(MessageDispatch message) {
synchronized (mutex) {
list.addFirst(message);
mutex.notify();
}
}
public boolean isEmpty() {
synchronized (mutex) {
return list.isEmpty();
}
}
public abstract boolean isEmpty();
/**
* Used to get an enqueued message. The amount of time this method blocks is
@ -67,101 +40,28 @@ public class MessageDispatchChannel {
* @return null if we timeout or if the consumer is closed.
* @throws InterruptedException
*/
public MessageDispatch dequeue(long timeout) throws InterruptedException {
synchronized (mutex) {
// Wait until the consumer is ready to deliver messages.
while (timeout != 0 && !closed && (list.isEmpty() || !running)) {
if (timeout == -1) {
mutex.wait();
} else {
mutex.wait(timeout);
break;
}
}
if (closed || !running || list.isEmpty()) {
return null;
}
return list.removeFirst();
}
}
public abstract MessageDispatch dequeue(long timeout) throws InterruptedException;
public MessageDispatch dequeueNoWait() {
synchronized (mutex) {
if (closed || !running || list.isEmpty()) {
return null;
}
return list.removeFirst();
}
}
public abstract MessageDispatch dequeueNoWait();
public MessageDispatch peek() {
synchronized (mutex) {
if (closed || !running || list.isEmpty()) {
return null;
}
return list.getFirst();
}
}
public abstract MessageDispatch peek();
public void start() {
synchronized (mutex) {
running = true;
mutex.notifyAll();
}
}
public abstract void start();
public void stop() {
synchronized (mutex) {
running = false;
mutex.notifyAll();
}
}
public abstract void stop();
public void close() {
synchronized (mutex) {
if (!closed) {
running = false;
closed = true;
}
mutex.notifyAll();
}
}
public abstract void close();
public void clear() {
synchronized (mutex) {
list.clear();
}
}
public abstract void clear();
public boolean isClosed() {
return closed;
}
public abstract boolean isClosed();
public int size() {
synchronized (mutex) {
return list.size();
}
}
public abstract int size();
public Object getMutex() {
return mutex;
}
public abstract Object getMutex();
public boolean isRunning() {
return running;
}
public abstract boolean isRunning();
public List<MessageDispatch> removeAll() {
synchronized (mutex) {
ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
list.clear();
return rc;
}
}
public abstract List<MessageDispatch> removeAll();
public String toString() {
synchronized (mutex) {
return list.toString();
}
}
}

View File

@ -0,0 +1,273 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.command.MessageDispatch;
public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
private static Integer MAX_PRIORITY = 10;
private final Object mutex = new Object();
private final LinkedList<MessageDispatch>[] lists;
private boolean closed;
private boolean running;
private int size = 0;
public SimplePriorityMessageDispatchChannel() {
this.lists = new LinkedList[MAX_PRIORITY];
for (int i = 0; i < MAX_PRIORITY; i++) {
lists[i] = new LinkedList<MessageDispatch>();
}
}
/*
* (non-Javadoc)
* @see
* org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
* .command.MessageDispatch)
*/
public void enqueue(MessageDispatch message) {
synchronized (mutex) {
getList(message).addLast(message);
this.size++;
mutex.notify();
}
}
/*
* (non-Javadoc)
* @see
* org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
* .command.MessageDispatch)
*/
public void enqueueFirst(MessageDispatch message) {
synchronized (mutex) {
getList(message).addFirst(message);
this.size++;
mutex.notify();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
*/
public boolean isEmpty() {
// synchronized (mutex) {
return this.size == 0;
// }
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
*/
public MessageDispatch dequeue(long timeout) throws InterruptedException {
synchronized (mutex) {
// Wait until the consumer is ready to deliver messages.
while (timeout != 0 && !closed && (isEmpty() || !running)) {
if (timeout == -1) {
mutex.wait();
} else {
mutex.wait(timeout);
break;
}
}
if (closed || !running || isEmpty()) {
return null;
}
return removeFirst();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
*/
public MessageDispatch dequeueNoWait() {
synchronized (mutex) {
if (closed || !running || isEmpty()) {
return null;
}
return removeFirst();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#peek()
*/
public MessageDispatch peek() {
synchronized (mutex) {
if (closed || !running || isEmpty()) {
return null;
}
return getFirst();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#start()
*/
public void start() {
synchronized (mutex) {
running = true;
mutex.notifyAll();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#stop()
*/
public void stop() {
synchronized (mutex) {
running = false;
mutex.notifyAll();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#close()
*/
public void close() {
synchronized (mutex) {
if (!closed) {
running = false;
closed = true;
}
mutex.notifyAll();
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#clear()
*/
public void clear() {
synchronized (mutex) {
for (int i = 0; i < MAX_PRIORITY; i++) {
lists[i].clear();
}
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#isClosed()
*/
public boolean isClosed() {
return closed;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#size()
*/
public int size() {
synchronized (mutex) {
return this.size;
}
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#getMutex()
*/
public Object getMutex() {
return mutex;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#isRunning()
*/
public boolean isRunning() {
return running;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.MessageDispatchChannelI#removeAll()
*/
public List<MessageDispatch> removeAll() {
synchronized (mutex) {
ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
List<MessageDispatch> list = lists[i];
result.addAll(list);
list.clear();
}
return result;
}
}
@Override
public String toString() {
String result = "";
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
result += i + ":{" + lists[i].toString() + "}";
}
return result;
}
protected int getPriority(MessageDispatch message) {
int priority = Message.DEFAULT_PRIORITY;
if (message.getMessage() != null) {
Math.max(message.getMessage().getPriority(), 0);
priority = Math.min(priority, 9);
}
return priority;
}
protected LinkedList<MessageDispatch> getList(MessageDispatch md) {
return lists[getPriority(md)];
}
private final MessageDispatch removeFirst() {
if (this.size > 0) {
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
LinkedList<MessageDispatch> list = lists[i];
if (!list.isEmpty()) {
this.size--;
return list.removeFirst();
}
}
}
return null;
}
private final MessageDispatch getFirst() {
if (this.size > 0) {
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
LinkedList<MessageDispatch> list = lists[i];
if (!list.isEmpty()) {
return list.getFirst();
}
}
}
return null;
}
}