Change Queue dispatch model to reduce contention for lots of

consumers

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@628667 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-02-18 09:38:10 +00:00
parent edd35d4b13
commit d4382e483b
17 changed files with 471 additions and 287 deletions

View File

@ -180,6 +180,17 @@ public abstract class AbstractSubscription implements Subscription {
return info != null ? info.getDestination() : null;
}
public boolean isBrowser() {
return info != null && info.isBrowser();
}
public int getInFlightUsage() {
if (info.getPrefetchSize() > 0) {
return (getInFlightSize() * 100)/info.getPrefetchSize();
}
return Integer.MAX_VALUE;
}
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
add(message);
}

View File

@ -28,7 +28,6 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* @version $Revision: 1.12 $
@ -45,8 +44,6 @@ public interface Destination extends Service {
void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
boolean lock(MessageReference node, LockOwner lockOwner);
void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
void gc();

View File

@ -85,10 +85,6 @@ public class DestinationFilter implements Destination {
return next.getMemoryUsage();
}
public boolean lock(MessageReference node, LockOwner lockOwner) {
return next.lock(node, lockOwner);
}
public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception {
next.removeSubscription(context, sub);
}

View File

@ -36,6 +36,7 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
protected CountStatisticImpl inflight;
protected TimeStatisticImpl processTime;
public DestinationStatistics() {

View File

@ -72,7 +72,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return active;
}
protected boolean isFull() {
public boolean isFull() {
return !active || super.isFull();
}

View File

@ -140,9 +140,6 @@ public class IndirectMessageReference implements QueueMessageReference {
}
public boolean lock(LockOwner subscription) {
if (!regionDestination.lock(this, subscription)) {
return false;
}
synchronized (this) {
if (dropped || (lockOwner != null && lockOwner != subscription)) {
return false;
@ -152,8 +149,10 @@ public class IndirectMessageReference implements QueueMessageReference {
}
}
public synchronized void unlock() {
public synchronized boolean unlock() {
boolean result = lockOwner != null;
lockOwner = null;
return result;
}
public synchronized LockOwner getLockOwner() {

View File

@ -44,7 +44,7 @@ final class NullMessageReference implements QueueMessageReference {
}
public boolean isDropped() {
throw new RuntimeException("not implemented");
return false;
}
public boolean lock(LockOwner subscription) {
@ -55,7 +55,8 @@ final class NullMessageReference implements QueueMessageReference {
throw new RuntimeException("not implemented");
}
public void unlock() {
public boolean unlock() {
return true;
}
public int decrementReferenceCount() {

View File

@ -361,12 +361,16 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
broker.sendToDeadLetterQueue(context, node);
}
public int getInFlightSize() {
return dispatched.size();
}
/**
* Used to determine if the broker can dispatch to the consumer.
*
* @return
*/
protected boolean isFull() {
public boolean isFull() {
return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
}
@ -605,4 +609,14 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
this.maxAuditDepth = maxAuditDepth;
}
public List<MessageReference> getInFlightMessages(){
List<MessageReference> result = new ArrayList<MessageReference>();
synchronized(pendingLock) {
result.addAll(dispatched);
result.addAll(pending.pageInList(1000));
}
return result;
}
}

View File

@ -22,6 +22,9 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
@ -55,6 +58,7 @@ import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -75,23 +79,23 @@ public class Queue extends BaseDestination implements Task {
private final List<Subscription> consumers = new ArrayList<Subscription>(50);
private PendingMessageCursor messages;
private final LinkedHashMap<MessageId,MessageReference> pagedInMessages = new LinkedHashMap<MessageId,MessageReference>();
private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object exclusiveLockMutex = new Object();
private final Object sendLock = new Object();
private final ExecutorService executor;
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final ReentrantLock dispatchLock = new ReentrantLock();
private QueueDispatchSelector dispatchSelector;
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
wakeup();
}
};
public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
public Queue(Broker broker, final ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
super(broker, store, destination,systemUsage, parentStats);
@ -100,8 +104,31 @@ public class Queue extends BaseDestination implements Task {
} else {
this.messages = new StoreQueueCursor(broker,this);
}
this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName());
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "QueueThread:"+destination);
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
});
this.taskRunner = new DeterministicTaskRunner(this.executor,this);
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
this.dispatchSelector=new QueueDispatchSelector(destination);
}
/**
* @param queue
* @param string
* @param b
* @return
*/
private TaskRunner DedicatedTaskRunner(Queue queue, String string, boolean b) {
// TODO Auto-generated method stub
return null;
}
public void initialize() throws Exception {
@ -153,26 +180,7 @@ public class Queue extends BaseDestination implements Task {
}
}
/**
* Lock a node
*
* @param node
* @param lockOwner
* @return true if can be locked
* @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference,
* org.apache.activemq.broker.region.LockOwner)
*/
public boolean lock(MessageReference node, LockOwner lockOwner) {
synchronized (exclusiveLockMutex) {
if (exclusiveOwner == lockOwner) {
return true;
}
if (exclusiveOwner != null) {
return false;
}
}
return true;
}
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
dispatchLock.lock();
@ -185,54 +193,41 @@ public class Queue extends BaseDestination implements Task {
synchronized (consumers) {
consumers.add(sub);
if (sub.getConsumerInfo().isExclusive()) {
LockOwner owner = (LockOwner) sub;
if (exclusiveOwner == null) {
exclusiveOwner = owner;
} else {
// switch the owner if the priority is higher.
if (owner.getLockPriority() > exclusiveOwner
.getLockPriority()) {
exclusiveOwner = owner;
Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
if(exclusiveConsumer==null) {
exclusiveConsumer=sub;
}else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()){
exclusiveConsumer=sub;
}
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
}
}
// we hold the lock on the dispatchValue - so lets build the paged
// in
// list directly;
doPageIn(false);
// synchronize with dispatch method so that no new messages are sent
// while
// setting up a subscription. avoid out of order messages,
// duplicates
// etc.
doPageIn(false);
msgContext.setDestination(destination);
synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the
// subscription.
for (Iterator<MessageReference> i = pagedInMessages.values()
.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
if (node.isDropped()
|| (!sub.getConsumerInfo().isBrowser() && node
.getLockOwner() != null)) {
continue;
}
try {
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
if (sub.matches(node, msgContext)) {
sub.add(node);
}
} catch (IOException e) {
log.warn("Could not load message: " + e, e);
}
}
}
} finally {
wakeup();
}finally {
dispatchLock.unlock();
}
}
@ -240,79 +235,60 @@ public class Queue extends BaseDestination implements Task {
public void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception {
destinationStatistics.getConsumers().decrement();
dispatchLock.lock();
try {
// synchronize with dispatch method so that no new messages are sent
// while
// removing up a subscription.
synchronized (consumers) {
consumers.remove(sub);
if (sub.getConsumerInfo().isExclusive()) {
LockOwner owner = (LockOwner) sub;
// Did we loose the exclusive owner??
if (exclusiveOwner == owner) {
// Find the exclusive consumer with the higest Lock
// Priority.
exclusiveOwner = null;
for (Iterator<Subscription> iter = consumers.iterator(); iter
.hasNext();) {
Subscription s = iter.next();
LockOwner so = (LockOwner) s;
Subscription exclusiveConsumer = dispatchSelector
.getExclusiveConsumer();
if (exclusiveConsumer == sub) {
exclusiveConsumer = null;
for (Subscription s : consumers) {
if (s.getConsumerInfo().isExclusive()
&& (exclusiveOwner == null || so
.getLockPriority() > exclusiveOwner
.getLockPriority())) {
exclusiveOwner = so;
&& (exclusiveConsumer == null
|| s.getConsumerInfo().getPriority() > exclusiveConsumer
.getConsumerInfo().getPriority())) {
exclusiveConsumer = s;
}
}
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups = getMessageGroupOwners()
.removeConsumer(consumerId);
// redeliver inflight messages
sub.remove(context, this);
List<MessageReference> list = new ArrayList<MessageReference>();
for (Iterator<MessageReference> i = pagedInMessages.values()
.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
if (!node.isDropped() && !node.isAcked()
&& node.getLockOwner() == sub) {
if (node.unlock()) {
node.incrementRedeliveryCounter();
list.add(node);
}
}
}
if (list != null && !consumers.isEmpty()) {
doDispatch(list);
}
}
if (consumers.isEmpty()) {
messages.gc();
}
}
sub.remove(context, this);
boolean wasExclusiveOwner = false;
if (exclusiveOwner == sub) {
exclusiveOwner = null;
wasExclusiveOwner = true;
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(
consumerId);
if (!sub.getConsumerInfo().isBrowser()) {
MessageEvaluationContext msgContext = new MessageEvaluationContext();
msgContext.setDestination(destination);
// lets copy the messages to dispatch to avoid deadlock
List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
synchronized (pagedInMessages) {
for (Iterator<MessageReference> i = pagedInMessages.values().iterator(); i
.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
if (node.isDropped()) {
continue;
}
String groupID = node.getGroupID();
// Re-deliver all messages that the sub locked
if (node.getLockOwner() == sub
|| wasExclusiveOwner
|| (groupID != null && ownedGroups
.contains(groupID))) {
messagesToDispatch.add(node);
}
}
}
// now lets dispatch from the copy of the collection to
// avoid deadlocks
for (Iterator<QueueMessageReference> iter = messagesToDispatch
.iterator(); iter.hasNext();) {
QueueMessageReference node = iter.next();
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(node, msgContext, consumers);
}
wakeup();
}finally {
dispatchLock.unlock();
}
}
@ -523,6 +499,9 @@ public class Queue extends BaseDestination implements Task {
if (taskRunner != null) {
taskRunner.shutdown();
}
if (this.executor != null) {
this.executor.shutdownNow();
}
if (messages != null) {
messages.stop();
}
@ -677,11 +656,7 @@ public class Queue extends BaseDestination implements Task {
for (MessageReference ref : list) {
try {
QueueMessageReference r = (QueueMessageReference) ref;
// We should only delete messages that can be locked.
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
removeMessage(c,(IndirectMessageReference) r);
}
} catch (IOException e) {
}
}
@ -791,8 +766,6 @@ public class Queue extends BaseDestination implements Task {
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
// We should only copy messages that can be locked.
if (lockMessage(r)) {
r.incrementReferenceCount();
try {
Message m = r.getMessage();
@ -805,7 +778,6 @@ public class Queue extends BaseDestination implements Task {
r.decrementReferenceCount();
}
}
}
count++;
}
} while (count < this.destinationStatistics.getMessages().getCount());
@ -853,7 +825,6 @@ public class Queue extends BaseDestination implements Task {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
if (lockMessage(r)) {
r.incrementReferenceCount();
try {
Message m = r.getMessage();
@ -867,7 +838,6 @@ public class Queue extends BaseDestination implements Task {
r.decrementReferenceCount();
}
}
}
count++;
}
} while (count < this.destinationStatistics.getMessages().getCount());
@ -895,7 +865,6 @@ public class Queue extends BaseDestination implements Task {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
//must return false to prevent spinning
return false;
}
@ -942,9 +911,6 @@ public class Queue extends BaseDestination implements Task {
wakeup();
}
protected boolean lockMessage(IndirectMessageReference r) {
return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER);
}
protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext();
@ -972,7 +938,8 @@ public class Queue extends BaseDestination implements Task {
private List<MessageReference> doPageIn(boolean force) throws Exception {
List<MessageReference> result = null;
dispatchLock.lock();
try {
try{
final int toPageIn = getMaxPageSize() - pagedInMessages.size();
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
@ -1012,13 +979,45 @@ public class Queue extends BaseDestination implements Task {
private void doDispatch(List<MessageReference> list) throws Exception {
if (list != null && !list.isEmpty()) {
MessageEvaluationContext msgContext = new MessageEvaluationContext();
for (int i = 0; i < list.size(); i++) {
MessageReference node = list.get(i);
msgContext.setDestination(destination);
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(node, msgContext, consumers);
if (list != null) {
synchronized (consumers) {
for (MessageReference node : list) {
Subscription target = null;
List<Subscription> targets = null;
for (Subscription s : consumers) {
if (dispatchSelector.canSelect(s, node)) {
if (!s.isFull()) {
s.add(node);
target = s;
break;
} else {
if (targets == null) {
targets = new ArrayList<Subscription>();
}
targets.add(s);
}
}
}
if (targets != null) {
// pick the least loaded to add the messag too
for (Subscription s : targets) {
if (target == null
|| target.getInFlightUsage() > s
.getInFlightUsage()) {
target = s;
}
}
if (target != null) {
target.add(node);
}
}
if (target != null && !dispatchSelector.isExclusiveConsumer(target)) {
consumers.remove(target);
consumers.add(target);
}
}
}
}
}
@ -1030,7 +1029,4 @@ public class Queue extends BaseDestination implements Task {
private void pageInMessages(boolean force) throws Exception {
doDispatch(doPageIn(force));
}
}

View File

@ -0,0 +1,115 @@
/**
*
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.List;
import javax.jms.JMSException;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Queue dispatch policy that determines if a message can be sent to a subscription
*
* @org.apache.xbean.XBean
* @version $Revision$
*/
public class QueueDispatchSelector extends SimpleDispatchSelector {
private static final Log LOG = LogFactory.getLog(QueueDispatchSelector.class);
private Subscription exclusiveConsumer;
/**
* @param destination
*/
public QueueDispatchSelector(ActiveMQDestination destination) {
super(destination);
}
public Subscription getExclusiveConsumer() {
return exclusiveConsumer;
}
public void setExclusiveConsumer(Subscription exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
}
public boolean isExclusiveConsumer(Subscription s) {
return s == this.exclusiveConsumer;
}
public boolean canSelect(Subscription subscription,
MessageReference m) throws Exception {
if (subscription.isBrowser() && super.canDispatch(subscription, m)) {
return true;
}
boolean result = super.canDispatch(subscription, m) ;
if (result) {
result = exclusiveConsumer == null
|| exclusiveConsumer == subscription;
if (result) {
QueueMessageReference node = (QueueMessageReference) m;
// Keep message groups together.
String groupId = node.getGroupID();
int sequence = node.getGroupSequence();
if (groupId != null) {
MessageGroupMap messageGroupOwners = ((Queue) node
.getRegionDestination()).getMessageGroupOwners();
// If we can own the first, then no-one else should own the
// rest.
if (sequence == 1) {
assignGroup(subscription, messageGroupOwners, node,groupId);
}else {
// Make sure that the previous owner is still valid, we may
// need to become the new owner.
ConsumerId groupOwner;
groupOwner = messageGroupOwners.get(groupId);
if (groupOwner == null) {
assignGroup(subscription, messageGroupOwners, node,groupId);
} else {
if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
// A group sequence < 1 is an end of group signal.
if (sequence < 0) {
messageGroupOwners.removeGroup(groupId);
}
} else {
result = false;
}
}
}
}
}
}
return result;
}
protected void assignGroup(Subscription subs,MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
Message message = n.getMessage();
if (message instanceof ActiveMQMessage) {
ActiveMQMessage activeMessage = (ActiveMQMessage)message;
try {
activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
} catch (JMSException e) {
LOG.warn("Failed to set boolean header: " + e, e);
}
}
}
}

View File

@ -36,7 +36,7 @@ public interface QueueMessageReference extends MessageReference {
boolean lock(LockOwner subscription);
void unlock();
boolean unlock();
LockOwner getLockOwner();
}

View File

@ -17,13 +17,14 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -67,54 +68,13 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
}
protected boolean canDispatch(MessageReference n) throws IOException {
boolean result = true;
QueueMessageReference node = (QueueMessageReference)n;
if (node.isAcked()) {
return false;
}
// Keep message groups together.
String groupId = node.getGroupID();
int sequence = node.getGroupSequence();
if (groupId != null) {
MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
// If we can own the first, then no-one else should own the rest.
if (sequence == 1) {
if (node.lock(this)) {
assignGroupToMe(messageGroupOwners, n, groupId);
return true;
} else {
return false;
}
}
// Make sure that the previous owner is still valid, we may
// need to become the new owner.
ConsumerId groupOwner;
synchronized (node) {
groupOwner = messageGroupOwners.get(groupId);
if (groupOwner == null) {
if (node.lock(this)) {
assignGroupToMe(messageGroupOwners, n, groupId);
return true;
} else {
return false;
}
}
}
if (groupOwner.equals(info.getConsumerId())) {
// A group sequence < 1 is an end of group signal.
if (sequence < 0) {
messageGroupOwners.removeGroup(groupId);
}
return true;
}
return false;
} else {
return node.lock(this);
if (node.isAcked() || node.isDropped()) {
result = false;
}
result = result && (isBrowser() || node.lock(this));
return result;
}
/**

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
@ -38,6 +39,7 @@ public interface Subscription extends SubscriptionRecovery {
/**
* Used to add messages that match the subscription.
* @param node
* @throws Exception
* @throws InterruptedException
* @throws IOException
*/
@ -168,6 +170,11 @@ public interface Subscription extends SubscriptionRecovery {
*/
boolean isHighWaterMark();
/**
* @return true if there is no space to dispatch messages
*/
boolean isFull();
/**
* inform the MessageConsumer on the client to change it's prefetch
* @param newPrefetch
@ -185,6 +192,16 @@ public interface Subscription extends SubscriptionRecovery {
*/
int getPrefetchSize();
/**
* @return the number of messages awaiting acknowledgement
*/
int getInFlightSize();
/**
* @return the in flight messages as a percentage of the prefetch size
*/
int getInFlightUsage();
/**
* Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do
@ -193,4 +210,16 @@ public interface Subscription extends SubscriptionRecovery {
*/
boolean isRecoveryRequired();
/**
* @return true if a browser
*/
boolean isBrowser();
/**
* Get the list of in flight messages
* @return list
*/
List<MessageReference> getInFlightMessages();
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPo
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
@ -556,7 +557,6 @@ public class Topic extends BaseDestination implements Task{
destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
if (!subscriptionRecoveryPolicy.add(context, message)) {
return;
@ -567,7 +567,7 @@ public class Topic extends BaseDestination implements Task{
return;
}
}
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
@ -575,7 +575,6 @@ public class Topic extends BaseDestination implements Task{
onMessageWithNoConsumers(context, message);
}
} finally {
msgContext.clear();
dispatchValve.decrement();
}
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
@ -37,7 +39,6 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
@ -51,7 +52,6 @@ public class TopicSubscription extends AbstractSubscription {
protected PendingMessageCursor matched;
protected final SystemUsage usageManager;
protected AtomicLong dispatchedCounter = new AtomicLong();
protected AtomicLong prefetchExtension = new AtomicLong();
boolean singleDestination = true;
Destination destination;
@ -83,8 +83,7 @@ public class TopicSubscription extends AbstractSubscription {
public void add(MessageReference node) throws Exception {
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
if (!isFull() && !isSlave()) {
optimizePrefetch();
if (!isFull() && matched.isEmpty() && !isSlave()) {
// if maximumPendingMessages is set we will only discard messages
// which
// have not been dispatched (i.e. we allow the prefetch buffer to be
@ -128,6 +127,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
}
dispatchMatched();
}
}
}
@ -177,10 +177,8 @@ public class TopicSubscription extends AbstractSubscription {
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean wasFull = isFull();
if (ack.isStandardAck() || ack.isPoisonAck()) {
if (context.isInTransaction()) {
prefetchExtension.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
@ -190,7 +188,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
dequeueCounter.addAndGet(ack.getMessageCount());
prefetchExtension.addAndGet(ack.getMessageCount());
dispatchMatched();
}
});
} else {
@ -198,19 +196,14 @@ public class TopicSubscription extends AbstractSubscription {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
prefetchExtension.addAndGet(ack.getMessageCount());
}
if (wasFull && !isFull()) {
dispatchMatched();
}
return;
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
prefetchExtension.addAndGet(ack.getMessageCount());
if (wasFull && !isFull()) {
dequeueCounter.addAndGet(ack.getMessageCount());
dispatchMatched();
}
return;
}
throw new JMSException("Invalid acknowledgment: " + ack);
@ -287,22 +280,27 @@ public class TopicSubscription extends AbstractSubscription {
// Implementation methods
// -------------------------------------------------------------------------
private boolean isFull() {
return getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize();
public boolean isFull() {
return getDispatchedQueueSize() >= info.getPrefetchSize();
}
public int getInFlightSize() {
return getDispatchedQueueSize();
}
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark() {
return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark() {
return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
}
/**
@ -354,31 +352,18 @@ public class TopicSubscription extends AbstractSubscription {
}
}
/**
* optimize message consumer prefetch if the consumer supports it
*/
public void optimizePrefetch() {
/*
* if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
* &&context.getConnection().isManageable()){
* if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
* isLowWaterMark()){
* info.setCurrentPrefetchSize(info.getPrefetchSize());
* updateConsumerPrefetch(info.getPrefetchSize()); }else
* if(info.getCurrentPrefetchSize()==info.getPrefetchSize() &&
* isHighWaterMark()){ // want to purge any outstanding acks held by the
* consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
*/
}
private void dispatchMatched() throws IOException {
synchronized (matchedListMutex) {
if (!matched.isEmpty() && !isFull()) {
try {
matched.reset();
while (matched.hasNext() && !isFull()) {
MessageReference message = (MessageReference)matched.next();
MessageReference message = (MessageReference) matched
.next();
matched.remove();
// Message may have been sitting in the matched list a while
// Message may have been sitting in the matched list a
// while
// waiting for the consumer to ak the message.
if (broker.isExpired(message)) {
message.decrementReferenceCount();
@ -393,6 +378,7 @@ public class TopicSubscription extends AbstractSubscription {
}
}
}
}
private void dispatch(final MessageReference node) throws IOException {
Message message = (Message)node;
@ -456,7 +442,15 @@ public class TopicSubscription extends AbstractSubscription {
}
public int getPrefetchSize() {
return (int)(info.getPrefetchSize() + prefetchExtension.get());
return (int)info.getPrefetchSize();
}
/**
* Get the list of inflight messages
* @return the list
*/
public synchronized List<MessageReference> getInFlightMessages(){
return matched.pageInList(1000);
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
/**
* Determines if a subscription can dispatch a message reference
*
*/
public interface DispatchSelector {
/**
* return true if a subscription can dispatch a message reference
* @param subscription
* @param node
* @return true if can dispatch
* @throws Exception
*/
boolean canDispatch(Subscription subscription, MessageReference node) throws Exception;
}

View File

@ -0,0 +1,34 @@
/**
*
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.MessageEvaluationContext;
/**
* Simple dispatch policy that determines if a message can be sent to a subscription
*
* @org.apache.xbean.XBean
* @version $Revision$
*/
public class SimpleDispatchSelector implements DispatchSelector {
private final ActiveMQDestination destination;
/**
* @param destination
*/
public SimpleDispatchSelector(ActiveMQDestination destination) {
this.destination = destination;
}
public boolean canDispatch(Subscription subscription, MessageReference node) throws Exception {
MessageEvaluationContext msgContext = new MessageEvaluationContext();
msgContext.setDestination(this.destination);
msgContext.setMessageReference(node);
return subscription.matches(node, msgContext);
}
}