diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 2cd06d886d..a4a48133ba 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -179,6 +179,17 @@ public abstract class AbstractSubscription implements Subscription { public ActiveMQDestination getActiveMQDestination() { return info != null ? info.getDestination() : null; } + + public boolean isBrowser() { + return info != null && info.isBrowser(); + } + + public int getInFlightUsage() { + if (info.getPrefetchSize() > 0) { + return (getInFlightSize() * 100)/info.getPrefetchSize(); + } + return Integer.MAX_VALUE; + } protected void doAddRecoveredMessage(MessageReference message) throws Exception { add(message); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index 34d1bf020a..e55a5d238e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -28,7 +28,6 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.store.MessageStore; import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.SystemUsage; /** * @version $Revision: 1.12 $ @@ -45,8 +44,6 @@ public interface Destination extends Service { void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception; - boolean lock(MessageReference node, LockOwner lockOwner); - void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException; void gc(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 1fd765899a..c099b0d85b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -85,10 +85,6 @@ public class DestinationFilter implements Destination { return next.getMemoryUsage(); } - public boolean lock(MessageReference node, LockOwner lockOwner) { - return next.lock(node, lockOwner); - } - public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception { next.removeSubscription(context, sub); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index e0391ab663..d764e464c0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -36,6 +36,7 @@ public class DestinationStatistics extends StatsImpl { protected CountStatisticImpl messages; protected PollCountStatisticImpl messagesCached; protected CountStatisticImpl dispatched; + protected CountStatisticImpl inflight; protected TimeStatisticImpl processTime; public DestinationStatistics() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 0c6109bc65..ffc1d13d21 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -72,7 +72,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us return active; } - protected boolean isFull() { + public boolean isFull() { return !active || super.isFull(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 83a31e3f0a..2ab4bfd15d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -140,9 +140,6 @@ public class IndirectMessageReference implements QueueMessageReference { } public boolean lock(LockOwner subscription) { - if (!regionDestination.lock(this, subscription)) { - return false; - } synchronized (this) { if (dropped || (lockOwner != null && lockOwner != subscription)) { return false; @@ -152,8 +149,10 @@ public class IndirectMessageReference implements QueueMessageReference { } } - public synchronized void unlock() { + public synchronized boolean unlock() { + boolean result = lockOwner != null; lockOwner = null; + return result; } public synchronized LockOwner getLockOwner() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java index 7a27b47dfd..f824d3797c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java @@ -44,7 +44,7 @@ final class NullMessageReference implements QueueMessageReference { } public boolean isDropped() { - throw new RuntimeException("not implemented"); + return false; } public boolean lock(LockOwner subscription) { @@ -55,7 +55,8 @@ final class NullMessageReference implements QueueMessageReference { throw new RuntimeException("not implemented"); } - public void unlock() { + public boolean unlock() { + return true; } public int decrementReferenceCount() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 00048f1f33..07d6ad5dda 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -360,13 +360,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription { protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception { broker.sendToDeadLetterQueue(context, node); } - + + public int getInFlightSize() { + return dispatched.size(); + } + /** * Used to determine if the broker can dispatch to the consumer. * * @return */ - protected boolean isFull() { + public boolean isFull() { return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize(); } @@ -604,5 +608,15 @@ public abstract class PrefetchSubscription extends AbstractSubscription { public void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; } + + + public List getInFlightMessages(){ + List result = new ArrayList(); + synchronized(pendingLock) { + result.addAll(dispatched); + result.addAll(pending.pageInList(1000)); + } + return result; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 525faaa85f..d1264a6764 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -22,6 +22,9 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.ReentrantLock; import javax.jms.InvalidSelectorException; @@ -55,6 +58,7 @@ import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.thread.DeterministicTaskRunner; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -75,23 +79,23 @@ public class Queue extends BaseDestination implements Task { private final List consumers = new ArrayList(50); private PendingMessageCursor messages; private final LinkedHashMap pagedInMessages = new LinkedHashMap(); - private LockOwner exclusiveOwner; private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); - private final Object exclusiveLockMutex = new Object(); private final Object sendLock = new Object(); + private final ExecutorService executor; private final TaskRunner taskRunner; private final LinkedList messagesWaitingForSpace = new LinkedList(); private final ReentrantLock dispatchLock = new ReentrantLock(); + private QueueDispatchSelector dispatchSelector; private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { wakeup(); } }; - - public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats, + + public Queue(Broker broker, final ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { super(broker, store, destination,systemUsage, parentStats); @@ -100,8 +104,31 @@ public class Queue extends BaseDestination implements Task { } else { this.messages = new StoreQueueCursor(broker,this); } - this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName()); + + this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "QueueThread:"+destination); + thread.setDaemon(true); + thread.setPriority(Thread.NORM_PRIORITY); + return thread; + } + }); + + this.taskRunner = new DeterministicTaskRunner(this.executor,this); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); + this.dispatchSelector=new QueueDispatchSelector(destination); + + } + + /** + * @param queue + * @param string + * @param b + * @return + */ + private TaskRunner DedicatedTaskRunner(Queue queue, String string, boolean b) { + // TODO Auto-generated method stub + return null; } public void initialize() throws Exception { @@ -153,26 +180,7 @@ public class Queue extends BaseDestination implements Task { } } - /** - * Lock a node - * - * @param node - * @param lockOwner - * @return true if can be locked - * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference, - * org.apache.activemq.broker.region.LockOwner) - */ - public boolean lock(MessageReference node, LockOwner lockOwner) { - synchronized (exclusiveLockMutex) { - if (exclusiveOwner == lockOwner) { - return true; - } - if (exclusiveOwner != null) { - return false; - } - } - return true; - } + public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { dispatchLock.lock(); @@ -185,54 +193,41 @@ public class Queue extends BaseDestination implements Task { synchronized (consumers) { consumers.add(sub); if (sub.getConsumerInfo().isExclusive()) { - LockOwner owner = (LockOwner) sub; - if (exclusiveOwner == null) { - exclusiveOwner = owner; - } else { - // switch the owner if the priority is higher. - if (owner.getLockPriority() > exclusiveOwner - .getLockPriority()) { - exclusiveOwner = owner; - } + Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); + if(exclusiveConsumer==null) { + exclusiveConsumer=sub; + }else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()){ + exclusiveConsumer=sub; } + dispatchSelector.setExclusiveConsumer(exclusiveConsumer); } } - - // we hold the lock on the dispatchValue - so lets build the paged - // in - // list directly; - doPageIn(false); - // synchronize with dispatch method so that no new messages are sent // while // setting up a subscription. avoid out of order messages, // duplicates // etc. - + doPageIn(false); msgContext.setDestination(destination); synchronized (pagedInMessages) { // Add all the matching messages in the queue to the // subscription. + for (Iterator i = pagedInMessages.values() .iterator(); i.hasNext();) { QueueMessageReference node = (QueueMessageReference) i .next(); - if (node.isDropped() - || (!sub.getConsumerInfo().isBrowser() && node - .getLockOwner() != null)) { - continue; - } - try { + if (!node.isDropped() && !node.isAcked() && (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) { msgContext.setMessageReference(node); if (sub.matches(node, msgContext)) { sub.add(node); } - } catch (IOException e) { - log.warn("Could not load message: " + e, e); } } + } - } finally { + wakeup(); + }finally { dispatchLock.unlock(); } } @@ -240,79 +235,60 @@ public class Queue extends BaseDestination implements Task { public void removeSubscription(ConnectionContext context, Subscription sub) throws Exception { destinationStatistics.getConsumers().decrement(); - // synchronize with dispatch method so that no new messages are sent - // while - // removing up a subscription. - synchronized (consumers) { - consumers.remove(sub); - if (sub.getConsumerInfo().isExclusive()) { - LockOwner owner = (LockOwner) sub; - // Did we loose the exclusive owner?? - if (exclusiveOwner == owner) { - // Find the exclusive consumer with the higest Lock - // Priority. - exclusiveOwner = null; - for (Iterator iter = consumers.iterator(); iter - .hasNext();) { - Subscription s = iter.next(); - LockOwner so = (LockOwner) s; - if (s.getConsumerInfo().isExclusive() - && (exclusiveOwner == null || so - .getLockPriority() > exclusiveOwner - .getLockPriority())) { - exclusiveOwner = so; + dispatchLock.lock(); + try { + // synchronize with dispatch method so that no new messages are sent + // while + // removing up a subscription. + synchronized (consumers) { + consumers.remove(sub); + if (sub.getConsumerInfo().isExclusive()) { + Subscription exclusiveConsumer = dispatchSelector + .getExclusiveConsumer(); + if (exclusiveConsumer == sub) { + exclusiveConsumer = null; + for (Subscription s : consumers) { + if (s.getConsumerInfo().isExclusive() + && (exclusiveConsumer == null + || s.getConsumerInfo().getPriority() > exclusiveConsumer + .getConsumerInfo().getPriority())) { + exclusiveConsumer = s; + + } + } + dispatchSelector.setExclusiveConsumer(exclusiveConsumer); + } + } + ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); + MessageGroupSet ownedGroups = getMessageGroupOwners() + .removeConsumer(consumerId); + // redeliver inflight messages + sub.remove(context, this); + + List list = new ArrayList(); + for (Iterator i = pagedInMessages.values() + .iterator(); i.hasNext();) { + QueueMessageReference node = (QueueMessageReference) i + .next(); + if (!node.isDropped() && !node.isAcked() + && node.getLockOwner() == sub) { + if (node.unlock()) { + node.incrementRedeliveryCounter(); + list.add(node); } } } + if (list != null && !consumers.isEmpty()) { + doDispatch(list); + } } + if (consumers.isEmpty()) { messages.gc(); } - } - sub.remove(context, this); - boolean wasExclusiveOwner = false; - if (exclusiveOwner == sub) { - exclusiveOwner = null; - wasExclusiveOwner = true; - } - ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); - MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer( - consumerId); - if (!sub.getConsumerInfo().isBrowser()) { - MessageEvaluationContext msgContext = new MessageEvaluationContext(); - - msgContext.setDestination(destination); - // lets copy the messages to dispatch to avoid deadlock - List messagesToDispatch = new ArrayList(); - synchronized (pagedInMessages) { - for (Iterator i = pagedInMessages.values().iterator(); i - .hasNext();) { - QueueMessageReference node = (QueueMessageReference) i - .next(); - if (node.isDropped()) { - continue; - } - String groupID = node.getGroupID(); - // Re-deliver all messages that the sub locked - if (node.getLockOwner() == sub - || wasExclusiveOwner - || (groupID != null && ownedGroups - .contains(groupID))) { - messagesToDispatch.add(node); - } - } - } - // now lets dispatch from the copy of the collection to - // avoid deadlocks - for (Iterator iter = messagesToDispatch - .iterator(); iter.hasNext();) { - QueueMessageReference node = iter.next(); - node.incrementRedeliveryCounter(); - node.unlock(); - msgContext.setMessageReference(node); - dispatchPolicy.dispatch(node, msgContext, consumers); - } - + wakeup(); + }finally { + dispatchLock.unlock(); } } @@ -523,6 +499,9 @@ public class Queue extends BaseDestination implements Task { if (taskRunner != null) { taskRunner.shutdown(); } + if (this.executor != null) { + this.executor.shutdownNow(); + } if (messages != null) { messages.stop(); } @@ -677,11 +656,7 @@ public class Queue extends BaseDestination implements Task { for (MessageReference ref : list) { try { QueueMessageReference r = (QueueMessageReference) ref; - - // We should only delete messages that can be locked. - if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { removeMessage(c,(IndirectMessageReference) r); - } } catch (IOException e) { } } @@ -791,19 +766,16 @@ public class Queue extends BaseDestination implements Task { for (MessageReference ref : list) { IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { - // We should only copy messages that can be locked. - if (lockMessage(r)) { - r.incrementReferenceCount(); - try { - Message m = r.getMessage(); - BrokerSupport.resend(context, m, dest); - if (++movedCounter >= maximumMessages - && maximumMessages > 0) { - return movedCounter; - } - } finally { - r.decrementReferenceCount(); + r.incrementReferenceCount(); + try { + Message m = r.getMessage(); + BrokerSupport.resend(context, m, dest); + if (++movedCounter >= maximumMessages + && maximumMessages > 0) { + return movedCounter; } + } finally { + r.decrementReferenceCount(); } } count++; @@ -853,19 +825,17 @@ public class Queue extends BaseDestination implements Task { IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { // We should only move messages that can be locked. - if (lockMessage(r)) { - r.incrementReferenceCount(); - try { - Message m = r.getMessage(); - BrokerSupport.resend(context, m, dest); - removeMessage(context, r); - if (++movedCounter >= maximumMessages - && maximumMessages > 0) { - return movedCounter; - } - } finally { - r.decrementReferenceCount(); + r.incrementReferenceCount(); + try { + Message m = r.getMessage(); + BrokerSupport.resend(context, m, dest); + removeMessage(context, r); + if (++movedCounter >= maximumMessages + && maximumMessages > 0) { + return movedCounter; } + } finally { + r.decrementReferenceCount(); } } count++; @@ -885,7 +855,7 @@ public class Queue extends BaseDestination implements Task { } if (result) { try { - pageInMessages(false); + pageInMessages(false); } catch (Throwable e) { log.error("Failed to page in more queue messages ", e); @@ -895,7 +865,6 @@ public class Queue extends BaseDestination implements Task { Runnable op = messagesWaitingForSpace.removeFirst(); op.run(); } - //must return false to prevent spinning return false; } @@ -942,10 +911,7 @@ public class Queue extends BaseDestination implements Task { wakeup(); } - protected boolean lockMessage(IndirectMessageReference r) { - return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER); - } - + protected ConnectionContext createConnectionContext() { ConnectionContext answer = new ConnectionContext(); answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); @@ -972,7 +938,8 @@ public class Queue extends BaseDestination implements Task { private List doPageIn(boolean force) throws Exception { List result = null; dispatchLock.lock(); - try { + try{ + final int toPageIn = getMaxPageSize() - pagedInMessages.size(); if ((force || !consumers.isEmpty()) && toPageIn > 0) { messages.setMaxBatchSize(toPageIn); @@ -1009,16 +976,48 @@ public class Queue extends BaseDestination implements Task { } return result; } - + private void doDispatch(List list) throws Exception { - - if (list != null && !list.isEmpty()) { - MessageEvaluationContext msgContext = new MessageEvaluationContext(); - for (int i = 0; i < list.size(); i++) { - MessageReference node = list.get(i); - msgContext.setDestination(destination); - msgContext.setMessageReference(node); - dispatchPolicy.dispatch(node, msgContext, consumers); + + if (list != null) { + synchronized (consumers) { + for (MessageReference node : list) { + Subscription target = null; + List targets = null; + for (Subscription s : consumers) { + if (dispatchSelector.canSelect(s, node)) { + if (!s.isFull()) { + s.add(node); + target = s; + break; + } else { + if (targets == null) { + targets = new ArrayList(); + } + targets.add(s); + } + } + } + if (targets != null) { + // pick the least loaded to add the messag too + + for (Subscription s : targets) { + if (target == null + || target.getInFlightUsage() > s + .getInFlightUsage()) { + target = s; + } + } + if (target != null) { + target.add(node); + } + } + if (target != null && !dispatchSelector.isExclusiveConsumer(target)) { + consumers.remove(target); + consumers.add(target); + } + + } } } } @@ -1030,7 +1029,4 @@ public class Queue extends BaseDestination implements Task { private void pageInMessages(boolean force) throws Exception { doDispatch(doPageIn(force)); } - - - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java new file mode 100644 index 0000000000..018eb52819 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java @@ -0,0 +1,115 @@ +/** + * + */ +package org.apache.activemq.broker.region; + +import java.io.IOException; +import java.util.List; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.region.group.MessageGroupMap; +import org.apache.activemq.broker.region.policy.SimpleDispatchSelector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.Message; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Queue dispatch policy that determines if a message can be sent to a subscription + * + * @org.apache.xbean.XBean + * @version $Revision$ + */ +public class QueueDispatchSelector extends SimpleDispatchSelector { + private static final Log LOG = LogFactory.getLog(QueueDispatchSelector.class); + private Subscription exclusiveConsumer; + + + /** + * @param destination + */ + public QueueDispatchSelector(ActiveMQDestination destination) { + super(destination); + } + + public Subscription getExclusiveConsumer() { + return exclusiveConsumer; + } + public void setExclusiveConsumer(Subscription exclusiveConsumer) { + this.exclusiveConsumer = exclusiveConsumer; + } + + public boolean isExclusiveConsumer(Subscription s) { + return s == this.exclusiveConsumer; + } + + + public boolean canSelect(Subscription subscription, + MessageReference m) throws Exception { + if (subscription.isBrowser() && super.canDispatch(subscription, m)) { + return true; + } + + boolean result = super.canDispatch(subscription, m) ; + if (result) { + result = exclusiveConsumer == null + || exclusiveConsumer == subscription; + if (result) { + QueueMessageReference node = (QueueMessageReference) m; + // Keep message groups together. + String groupId = node.getGroupID(); + int sequence = node.getGroupSequence(); + if (groupId != null) { + MessageGroupMap messageGroupOwners = ((Queue) node + .getRegionDestination()).getMessageGroupOwners(); + + // If we can own the first, then no-one else should own the + // rest. + if (sequence == 1) { + assignGroup(subscription, messageGroupOwners, node,groupId); + }else { + + // Make sure that the previous owner is still valid, we may + // need to become the new owner. + ConsumerId groupOwner; + + groupOwner = messageGroupOwners.get(groupId); + if (groupOwner == null) { + assignGroup(subscription, messageGroupOwners, node,groupId); + } else { + if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { + // A group sequence < 1 is an end of group signal. + if (sequence < 0) { + messageGroupOwners.removeGroup(groupId); + } + } else { + result = false; + } + } + } + } + } + } + return result; + } + + protected void assignGroup(Subscription subs,MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { + messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); + Message message = n.getMessage(); + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage)message; + try { + activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false); + } catch (JMSException e) { + LOG.warn("Failed to set boolean header: " + e, e); + } + } + } + + + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java index 997bfb2545..c07092fc15 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java @@ -36,7 +36,7 @@ public interface QueueMessageReference extends MessageReference { boolean lock(LockOwner subscription); - void unlock(); + boolean unlock(); LockOwner getLockOwner(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index c2f51d8b27..ecb97b4d0f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -17,13 +17,14 @@ package org.apache.activemq.broker.region; import java.io.IOException; + import javax.jms.InvalidSelectorException; import javax.jms.JMSException; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -67,54 +68,13 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner } protected boolean canDispatch(MessageReference n) throws IOException { + boolean result = true; QueueMessageReference node = (QueueMessageReference)n; - if (node.isAcked()) { - return false; - } - // Keep message groups together. - String groupId = node.getGroupID(); - int sequence = node.getGroupSequence(); - if (groupId != null) { - MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners(); - - // If we can own the first, then no-one else should own the rest. - if (sequence == 1) { - if (node.lock(this)) { - assignGroupToMe(messageGroupOwners, n, groupId); - return true; - } else { - return false; - } - } - - // Make sure that the previous owner is still valid, we may - // need to become the new owner. - ConsumerId groupOwner; - synchronized (node) { - groupOwner = messageGroupOwners.get(groupId); - if (groupOwner == null) { - if (node.lock(this)) { - assignGroupToMe(messageGroupOwners, n, groupId); - return true; - } else { - return false; - } - } - } - - if (groupOwner.equals(info.getConsumerId())) { - // A group sequence < 1 is an end of group signal. - if (sequence < 0) { - messageGroupOwners.removeGroup(groupId); - } - return true; - } - - return false; - - } else { - return node.lock(this); + if (node.isAcked() || node.isDropped()) { + result = false; } + result = result && (isBrowser() || node.lock(this)); + return result; } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index 0d3c1f075b..2eec6e2b2d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.List; import javax.jms.InvalidSelectorException; import javax.management.ObjectName; @@ -38,6 +39,7 @@ public interface Subscription extends SubscriptionRecovery { /** * Used to add messages that match the subscription. * @param node + * @throws Exception * @throws InterruptedException * @throws IOException */ @@ -168,6 +170,11 @@ public interface Subscription extends SubscriptionRecovery { */ boolean isHighWaterMark(); + /** + * @return true if there is no space to dispatch messages + */ + boolean isFull(); + /** * inform the MessageConsumer on the client to change it's prefetch * @param newPrefetch @@ -185,6 +192,16 @@ public interface Subscription extends SubscriptionRecovery { */ int getPrefetchSize(); + /** + * @return the number of messages awaiting acknowledgement + */ + int getInFlightSize(); + + /** + * @return the in flight messages as a percentage of the prefetch size + */ + int getInFlightUsage(); + /** * Informs the Broker if the subscription needs to intervention to recover it's state * e.g. DurableTopicSubscriber may do @@ -192,5 +209,17 @@ public interface Subscription extends SubscriptionRecovery { * @return true if recovery required */ boolean isRecoveryRequired(); + + + /** + * @return true if a browser + */ + boolean isBrowser(); + + /** + * Get the list of in flight messages + * @return list + */ + List getInFlightMessages(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 4d8c882fb0..bb1214a514 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -33,6 +33,7 @@ import org.apache.activemq.broker.region.policy.FixedSizedSubscriptionRecoveryPo import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; +import org.apache.activemq.broker.region.policy.SimpleDispatchSelector; import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; @@ -555,8 +556,7 @@ public class Topic extends BaseDestination implements Task{ protected void dispatch(final ConnectionContext context, Message message) throws Exception { destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); - dispatchValve.increment(); - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); + dispatchValve.increment(); try { if (!subscriptionRecoveryPolicy.add(context, message)) { return; @@ -567,7 +567,7 @@ public class Topic extends BaseDestination implements Task{ return; } } - + MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); msgContext.setDestination(destination); msgContext.setMessageReference(message); @@ -575,7 +575,6 @@ public class Topic extends BaseDestination implements Task{ onMessageWithNoConsumers(context, message); } } finally { - msgContext.clear(); dispatchValve.decrement(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index b80815f1ff..0f8b993439 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -18,6 +18,8 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.jms.JMSException; @@ -37,7 +39,6 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.Response; -import org.apache.activemq.kaha.Store; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.usage.SystemUsage; import org.apache.commons.logging.Log; @@ -51,8 +52,7 @@ public class TopicSubscription extends AbstractSubscription { protected PendingMessageCursor matched; protected final SystemUsage usageManager; protected AtomicLong dispatchedCounter = new AtomicLong(); - protected AtomicLong prefetchExtension = new AtomicLong(); - + boolean singleDestination = true; Destination destination; @@ -83,8 +83,7 @@ public class TopicSubscription extends AbstractSubscription { public void add(MessageReference node) throws Exception { enqueueCounter.incrementAndGet(); node.incrementReferenceCount(); - if (!isFull() && !isSlave()) { - optimizePrefetch(); + if (!isFull() && matched.isEmpty() && !isSlave()) { // if maximumPendingMessages is set we will only discard messages // which // have not been dispatched (i.e. we allow the prefetch buffer to be @@ -128,6 +127,7 @@ public class TopicSubscription extends AbstractSubscription { } } } + dispatchMatched(); } } } @@ -177,20 +177,18 @@ public class TopicSubscription extends AbstractSubscription { public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { // Handle the standard acknowledgment case. - boolean wasFull = isFull(); if (ack.isStandardAck() || ack.isPoisonAck()) { if (context.isInTransaction()) { - prefetchExtension.addAndGet(ack.getMessageCount()); context.getTransaction().addSynchronization(new Synchronization() { public void afterCommit() throws Exception { - synchronized (TopicSubscription.this) { + synchronized (TopicSubscription.this) { if (singleDestination && destination != null) { destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); } } dequeueCounter.addAndGet(ack.getMessageCount()); - prefetchExtension.addAndGet(ack.getMessageCount()); + dispatchMatched(); } }); } else { @@ -198,19 +196,14 @@ public class TopicSubscription extends AbstractSubscription { destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); } dequeueCounter.addAndGet(ack.getMessageCount()); - prefetchExtension.addAndGet(ack.getMessageCount()); - } - if (wasFull && !isFull()) { - dispatchMatched(); } + dispatchMatched(); return; } else if (ack.isDeliveredAck()) { // Message was delivered but not acknowledged: update pre-fetch // counters. - prefetchExtension.addAndGet(ack.getMessageCount()); - if (wasFull && !isFull()) { - dispatchMatched(); - } + dequeueCounter.addAndGet(ack.getMessageCount()); + dispatchMatched(); return; } throw new JMSException("Invalid acknowledgment: " + ack); @@ -287,22 +280,27 @@ public class TopicSubscription extends AbstractSubscription { // Implementation methods // ------------------------------------------------------------------------- - private boolean isFull() { - return getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize(); + public boolean isFull() { + return getDispatchedQueueSize() >= info.getPrefetchSize(); } - + + public int getInFlightSize() { + return getDispatchedQueueSize(); + } + + /** * @return true when 60% or more room is left for dispatching messages */ public boolean isLowWaterMark() { - return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); + return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); } /** * @return true when 10% or less room is left for dispatching messages */ public boolean isHighWaterMark() { - return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); + return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); } /** @@ -354,42 +352,30 @@ public class TopicSubscription extends AbstractSubscription { } } - /** - * optimize message consumer prefetch if the consumer supports it - */ - public void optimizePrefetch() { - /* - * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null - * &&context.getConnection().isManageable()){ - * if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && - * isLowWaterMark()){ - * info.setCurrentPrefetchSize(info.getPrefetchSize()); - * updateConsumerPrefetch(info.getPrefetchSize()); }else - * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && - * isHighWaterMark()){ // want to purge any outstanding acks held by the - * consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } } - */ - } - - private void dispatchMatched() throws IOException { + private void dispatchMatched() throws IOException { synchronized (matchedListMutex) { - try { - matched.reset(); - while (matched.hasNext() && !isFull()) { - MessageReference message = (MessageReference)matched.next(); - matched.remove(); - // Message may have been sitting in the matched list a while - // waiting for the consumer to ak the message. - if (broker.isExpired(message)) { - message.decrementReferenceCount(); - broker.messageExpired(getContext(), message); - dequeueCounter.incrementAndGet(); - continue; // just drop it. + if (!matched.isEmpty() && !isFull()) { + try { + matched.reset(); + + while (matched.hasNext() && !isFull()) { + MessageReference message = (MessageReference) matched + .next(); + matched.remove(); + // Message may have been sitting in the matched list a + // while + // waiting for the consumer to ak the message. + if (broker.isExpired(message)) { + message.decrementReferenceCount(); + broker.messageExpired(getContext(), message); + dequeueCounter.incrementAndGet(); + continue; // just drop it. + } + dispatch(message); } - dispatch(message); + } finally { + matched.release(); } - } finally { - matched.release(); } } } @@ -456,7 +442,15 @@ public class TopicSubscription extends AbstractSubscription { } public int getPrefetchSize() { - return (int)(info.getPrefetchSize() + prefetchExtension.get()); + return (int)info.getPrefetchSize(); + } + + /** + * Get the list of inflight messages + * @return the list + */ + public synchronized List getInFlightMessages(){ + return matched.pageInList(1000); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java new file mode 100644 index 0000000000..22799781ff --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; + +/** + * Determines if a subscription can dispatch a message reference + * + */ +public interface DispatchSelector { + + + /** + * return true if a subscription can dispatch a message reference + * @param subscription + * @param node + * @return true if can dispatch + * @throws Exception + */ + + boolean canDispatch(Subscription subscription, MessageReference node) throws Exception; +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java new file mode 100644 index 0000000000..1d949ba34f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java @@ -0,0 +1,34 @@ +/** + * + */ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.filter.MessageEvaluationContext; + +/** + * Simple dispatch policy that determines if a message can be sent to a subscription + * + * @org.apache.xbean.XBean + * @version $Revision$ + */ +public class SimpleDispatchSelector implements DispatchSelector { + + private final ActiveMQDestination destination; + + /** + * @param destination + */ + public SimpleDispatchSelector(ActiveMQDestination destination) { + this.destination = destination; + } + + public boolean canDispatch(Subscription subscription, MessageReference node) throws Exception { + MessageEvaluationContext msgContext = new MessageEvaluationContext(); + msgContext.setDestination(this.destination); + msgContext.setMessageReference(node); + return subscription.matches(node, msgContext); + } +}