From 00879cf6836ee20da9796d5475593889f370d661 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 25 Jun 2010 10:28:17 +0000 Subject: [PATCH] changes for https://issues.apache.org/activemq/browse/AMQ-2791 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@957881 13f79535-47bb-0310-9956-ffa450edef68 --- .../SimplePriorityMessageDispatchChannel.java | 4 +- .../activemq/broker/jmx/DestinationView.java | 30 ++-- .../broker/jmx/DestinationViewMBean.java | 13 +- .../broker/region/BaseDestination.java | 10 ++ .../activemq/broker/region/Destination.java | 4 +- .../broker/region/DestinationFilter.java | 7 +- .../broker/region/MessageReference.java | 4 +- .../broker/region/NullMessageReference.java | 6 +- .../broker/region/PrefetchSubscription.java | 2 +- .../apache/activemq/broker/region/Queue.java | 48 +++--- .../activemq/broker/region/TempQueue.java | 2 +- .../broker/region/TopicSubscription.java | 4 +- .../cursors/AbstractPendingMessageCursor.java | 23 +++ .../region/cursors/AbstractStoreCursor.java | 55 +++--- .../cursors/FilePendingMessageCursor.java | 5 +- .../region/cursors/OrderedPendingList.java | 131 ++++++++++++++ .../broker/region/cursors/PendingList.java | 31 ++++ .../broker/region/cursors/PendingNode.java | 46 +++++ .../cursors/PrioritizedPendingList.java | 130 ++++++++++++++ .../cursors/StoreDurableSubscriberCursor.java | 32 +++- .../region/cursors/StoreQueueCursor.java | 18 +- .../cursors/VMPendingMessageCursor.java | 58 ++++--- ...DurableSubscriberMessageStoragePolicy.java | 3 +- .../FilePendingQueueMessageStoragePolicy.java | 2 +- ...PendingSubscriberMessageStoragePolicy.java | 10 +- ...PendingSubscriberMessageStoragePolicy.java | 3 +- .../broker/region/policy/PolicyEntry.java | 13 +- ...DurableSubscriberMessageStoragePolicy.java | 3 +- .../VMPendingQueueMessageStoragePolicy.java | 2 +- ...PendingSubscriberMessageStoragePolicy.java | 6 +- .../broker/util/LoggingBrokerPlugin.java | 160 +++++++----------- .../org/apache/activemq/command/Message.java | 2 +- .../memory/list/SimpleMessageList.java | 15 +- .../activemq/plugin/DiscardingDLQBroker.java | 28 +-- 34 files changed, 637 insertions(+), 273 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java diff --git a/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java b/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java index 9a5cec8264..1245cab6ed 100644 --- a/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.activemq.command.MessageDispatch; public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel { - private static Integer MAX_PRIORITY = 10; + private static final Integer MAX_PRIORITY = 10; private final Object mutex = new Object(); private final LinkedList[] lists; private boolean closed; @@ -234,7 +234,7 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan } protected int getPriority(MessageDispatch message) { - int priority = Message.DEFAULT_PRIORITY; + int priority = javax.jms.Message.DEFAULT_PRIORITY; if (message.getMessage() != null) { Math.max(message.getMessage().getPriority(), 0); priority = Math.min(priority, 9); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index cd7d897ac8..9a912dbe30 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -16,19 +16,6 @@ */ package org.apache.activemq.broker.jmx; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.Message; -import org.apache.activemq.filter.BooleanExpression; -import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.selector.SelectorParser; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -48,6 +35,19 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.selector.SelectorParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class DestinationView implements DestinationViewMBean { private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class); @@ -126,6 +126,10 @@ public class DestinationView implements DestinationViewMBean { public long getMinEnqueueTime() { return destination.getDestinationStatistics().getProcessTime().getMinTime(); } + + public boolean isPrioritizedMessages() { + return destination.isPrioritizedMessages(); + } public CompositeData[] browse() throws OpenDataException { try { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 5db81811b7..c9a42a8816 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -16,16 +16,15 @@ */ package org.apache.activemq.broker.jmx; +import java.io.IOException; import java.util.List; import java.util.Map; -import java.io.IOException; - import javax.jms.InvalidSelectorException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; -import javax.management.ObjectName; -import javax.management.MalformedObjectNameException; public interface DestinationViewMBean { @@ -313,6 +312,12 @@ public interface DestinationViewMBean { @MBeanInfo("Caching is enabled") public boolean isUseCache(); + /** + * @return true if prioritized messages are enabled for the destination + */ + @MBeanInfo("Prioritized messages is enabled") + public boolean isPrioritizedMessages(); + /** * @param value * enable/disable caching on the destination diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 063998c910..07501af3e1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -81,6 +81,7 @@ public abstract class BaseDestination implements Destination { protected int cursorMemoryHighWaterMark = 70; protected int storeUsageHighWaterMark = 100; private SlowConsumerStrategy slowConsumerStrategy; + private boolean prioritizedMessages; /** * @param broker @@ -580,5 +581,14 @@ public abstract class BaseDestination implements Destination { public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { this.slowConsumerStrategy = slowConsumerStrategy; } + + + public boolean isPrioritizedMessages() { + return this.prioritizedMessages; + } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + this.prioritizedMessages = prioritizedMessages; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index 13953419fc..9ca595c6ae 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -18,14 +18,12 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.List; - import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatchNotification; @@ -215,4 +213,6 @@ public interface Destination extends Service, Task { * @throws Exception */ void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception; + + boolean isPrioritizedMessages(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index c9856dad5a..d15b97b87b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -19,7 +19,6 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.List; import java.util.Set; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -39,7 +38,7 @@ import org.apache.activemq.usage.Usage; */ public class DestinationFilter implements Destination { - private Destination next; + private final Destination next; public DestinationFilter(Destination next) { this.next = next; @@ -270,4 +269,8 @@ public class DestinationFilter implements Destination { public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); } + + public boolean isPrioritizedMessages() { + return next.isPrioritizedMessages(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java index 5cf4b39bc2..90288729f5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; - import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; @@ -33,7 +31,7 @@ public interface MessageReference { MessageId getMessageId(); Message getMessageHardRef(); - Message getMessage() throws IOException; + Message getMessage(); boolean isPersistent(); Destination getRegionDestination(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java index 2cdcb32eb5..f754ed8a0e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; - import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.Message; @@ -28,7 +26,7 @@ import org.apache.activemq.command.MessageId; */ final class NullMessageReference implements QueueMessageReference { - private ActiveMQMessage message = new ActiveMQMessage(); + private final ActiveMQMessage message = new ActiveMQMessage(); private volatile int references; public void drop() { @@ -75,7 +73,7 @@ final class NullMessageReference implements QueueMessageReference { return 0; } - public Message getMessage() throws IOException { + public Message getMessage() { return message; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 36cc849d6a..f63e99cdc8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -78,7 +78,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { - this(broker,usageManager,context, info, new VMPendingMessageCursor()); + this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 677b0a18e7..4cc8a7a871 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -236,7 +236,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { public void initialize() throws Exception { if (this.messages == null) { if (destination.isTemporary() || broker == null || store == null) { - this.messages = new VMPendingMessageCursor(); + this.messages = new VMPendingMessageCursor(isPrioritizedMessages()); } else { this.messages = new StoreQueueCursor(broker, this); } @@ -951,38 +951,30 @@ public class Queue extends BaseDestination implements Task, UsageListener { public Message getMessage(String id) { MessageId msgId = new MessageId(id); - try { - synchronized (pagedInMessages) { - QueueMessageReference r = this.pagedInMessages.get(msgId); - if (r != null) { - return r.getMessage(); - } + synchronized (pagedInMessages) { + QueueMessageReference r = this.pagedInMessages.get(msgId); + if (r != null) { + return r.getMessage(); } - synchronized (messages) { - try { - messages.reset(); - while (messages.hasNext()) { - try { - MessageReference r = messages.next(); - r.decrementReferenceCount(); - messages.rollback(r.getMessageId()); - if (msgId.equals(r.getMessageId())) { - Message m = r.getMessage(); - if (m != null) { - return m; - } - break; - } - } catch (IOException e) { - LOG.error("got an exception retrieving message " + id); + } + synchronized (messages) { + try { + messages.reset(); + while (messages.hasNext()) { + MessageReference r = messages.next(); + r.decrementReferenceCount(); + messages.rollback(r.getMessageId()); + if (msgId.equals(r.getMessageId())) { + Message m = r.getMessage(); + if (m != null) { + return m; } + break; } - } finally { - messages.release(); } + } finally { + messages.release(); } - } catch (IOException e) { - LOG.error("got an exception retrieving message " + id); } return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java index 6e34bb4b90..4c3b82b374 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -56,7 +56,7 @@ public class TempQueue extends Queue{ @Override public void initialize() throws Exception { - this.messages=new VMPendingMessageCursor(); + this.messages=new VMPendingMessageCursor(false); this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); this.systemUsage = brokerService.getSystemUsage(); memoryUsage.setParent(systemUsage.getMemoryUsage()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 8d1a76b91e..b1286883d9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -72,9 +72,9 @@ public class TopicSubscription extends AbstractSubscription { this.usageManager = usageManager; String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) { - this.matched = new VMPendingMessageCursor(); + this.matched = new VMPendingMessageCursor(false); } else { - this.matched = new FilePendingMessageCursor(broker,matchedName); + this.matched = new FilePendingMessageCursor(broker,matchedName,false); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 2adc05f3c5..0cc6752678 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -19,11 +19,14 @@ package org.apache.activemq.broker.region.cursors; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Set; import org.apache.activemq.ActiveMQMessageAudit; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.SystemUsage; @@ -44,6 +47,11 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs protected boolean useCache=true; private boolean started=false; protected MessageReference last = null; + protected final boolean prioritizedMessages; + + public AbstractPendingMessageCursor(boolean prioritizedMessages) { + this.prioritizedMessages=prioritizedMessages; + } public synchronized void start() throws Exception { @@ -304,4 +312,19 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs protected synchronized boolean isStarted() { return started; } + + public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) { + boolean result = false; + Set destinations = broker.getDestinations(sub.getActiveMQDestination()); + if (destinations != null) { + for (Destination dest:destinations) { + if (dest.isPrioritizedMessages()) { + result = true; + break; + } + } + } + return result; + + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index d1894d30c2..ec6fb0dd79 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -17,8 +17,6 @@ package org.apache.activemq.broker.region.cursors; import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map.Entry; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; @@ -34,8 +32,8 @@ import org.apache.commons.logging.LogFactory; public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class); protected final Destination regionDestination; - private final LinkedHashMap batchList = new LinkedHashMap (); - private Iterator> iterator = null; + private final PendingList batchList; + private Iterator iterator = null; private boolean cacheEnabled=false; protected boolean batchResetNeeded = true; protected boolean storeHasMessages = false; @@ -43,10 +41,16 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i private MessageId lastCachedId; protected AbstractStoreCursor(Destination destination) { + super((destination != null ? destination.isPrioritizedMessages():false)); this.regionDestination=destination; + if (this.prioritizedMessages) { + this.batchList= new PrioritizedPendingList(); + }else { + this.batchList = new OrderedPendingList(); + } } - @Override + public final synchronized void start() throws Exception{ if (!isStarted()) { super.start(); @@ -60,7 +64,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } - @Override + public final synchronized void stop() throws Exception { resetBatch(); super.stop(); @@ -82,7 +86,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } message.incrementReferenceCount(); - batchList.put(message.getMessageId(), message); + batchList.addMessageLast(message); clearIterator(true); recovered = true; } else { @@ -99,7 +103,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return recovered; } - @Override + public final void reset() { if (batchList.isEmpty()) { try { @@ -113,7 +117,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i size(); } - @Override + public synchronized void release() { clearIterator(false); } @@ -129,7 +133,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i private synchronized void ensureIterator() { if(this.iterator==null) { - this.iterator=this.batchList.entrySet().iterator(); + this.iterator=this.batchList.iterator(); } } @@ -137,7 +141,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public final void finished() { } - @Override + public final synchronized boolean hasNext() { if (batchList.isEmpty()) { try { @@ -151,11 +155,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return this.iterator.hasNext(); } - @Override + public final synchronized MessageReference next() { MessageReference result = null; if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { - result = this.iterator.next().getValue(); + result = this.iterator.next(); } last = result; if (result != null) { @@ -164,7 +168,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return result; } - @Override + public final synchronized void addMessageLast(MessageReference node) throws Exception { if (cacheEnabled && hasSpace()) { recoverMessage(node.getMessage(),true); @@ -189,13 +193,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i protected void setBatch(MessageId messageId) throws Exception { } - @Override + public final synchronized void addMessageFirst(MessageReference node) throws Exception { cacheEnabled=false; size++; } - @Override + public final synchronized void remove() { size--; if (iterator!=null) { @@ -212,21 +216,22 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } - @Override + public final synchronized void remove(MessageReference node) { size--; cacheEnabled=false; - batchList.remove(node.getMessageId()); + batchList.remove(node); } - @Override + public final synchronized void clear() { gc(); } - @Override + public final synchronized void gc() { - for (Message msg : batchList.values()) { + for (Iteratori = batchList.iterator();i.hasNext();) { + MessageReference msg = i.next(); rollback(msg.getMessageId()); msg.decrementReferenceCount(); } @@ -241,7 +246,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } - @Override + protected final synchronized void fillBatch() { if (batchResetNeeded) { resetBatch(); @@ -261,18 +266,18 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } - @Override + public final synchronized boolean isEmpty() { // negative means more messages added to store through queue.send since last reset return size == 0; } - @Override + public final synchronized boolean hasMessagesBufferedToDeliver() { return !batchList.isEmpty(); } - @Override + public final synchronized int size() { if (size < 0) { this.size = getStoreSize(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 691414c77e..4f8b479ba6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -63,9 +63,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple /** * @param broker * @param name + * @param prioritizedMessages * @param store */ - public FilePendingMessageCursor(Broker broker, String name) { + public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { + super(prioritizedMessages); this.useCache = false; this.broker = broker; // the store can be null if the BrokerService has persistence @@ -190,6 +192,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple tryAddMessageLast(node, 0); } + @Override public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { if (!node.isExpired()) { try { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java new file mode 100644 index 0000000000..4a9c6e482b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; + +public class OrderedPendingList implements PendingList { + PendingNode root = null; + PendingNode tail = null; + final Map map = new HashMap(); + + public PendingNode addMessageFirst(MessageReference message) { + PendingNode node = new PendingNode(this, message); + if (root == null) { + root = node; + tail = node; + } else { + root.linkBefore(node); + } + this.map.put(message.getMessageId(), node); + return node; + } + + public PendingNode addMessageLast(MessageReference message) { + PendingNode node = new PendingNode(this, message); + if (root == null) { + root = node; + } else { + tail.linkAfter(node); + } + tail = node; + this.map.put(message.getMessageId(), node); + return node; + } + + public void clear() { + this.root = null; + this.tail = null; + this.map.clear(); + } + + public boolean isEmpty() { + return this.map.isEmpty(); + } + + public Iterator iterator() { + return new Iterator() { + private PendingNode current = null; + private PendingNode next = root; + + public boolean hasNext() { + return next != null; + } + + public MessageReference next() { + MessageReference result = null; + this.current = this.next; + result = this.current.getMessage(); + this.next = (PendingNode) this.next.getNext(); + return result; + } + + public void remove() { + if (this.current != null && this.current.getMessage() != null) { + map.remove(this.current.getMessage().getMessageId()); + } + removeNode(this.current); + } + }; + } + + public void remove(MessageReference message) { + if (message != null) { + PendingNode node = this.map.remove(message.getMessageId()); + removeNode(node); + } + } + + public int size() { + return this.map.size(); + } + + void removeNode(PendingNode node) { + if (node != null) { + map.remove(node.getMessage().getMessageId()); + if (root == node) { + root = (PendingNode) node.getNext(); + } + if (tail == node) { + tail = (PendingNode) node.getPrevious(); + } + node.unlink(); + } + } + + List getAsList() { + List result = new ArrayList(size()); + PendingNode node = root; + while (node != null) { + result.add(node); + node = (PendingNode) node.getNext(); + } + return result; + } + + @Override + public String toString() { + return "OrderedPendingList(" + System.identityHashCode(this) + ")"; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java new file mode 100644 index 0000000000..7b9844c1db --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.util.Iterator; +import org.apache.activemq.broker.region.MessageReference; + +public interface PendingList { + + public boolean isEmpty(); + public void clear(); + public PendingNode addMessageFirst(MessageReference message); + public PendingNode addMessageLast(MessageReference message); + public void remove(MessageReference message); + public int size(); + public Iterator iterator(); +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java new file mode 100644 index 0000000000..21cdcd0962 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.util.LinkedNode; + +public class PendingNode extends LinkedNode { + private final MessageReference message; + private final OrderedPendingList list; + public PendingNode(OrderedPendingList list,MessageReference message) { + this.list = list; + this.message = message; + } + + MessageReference getMessage() { + return this.message; + } + + OrderedPendingList getList() { + return this.list; + } + + @Override + public String toString() { + PendingNode n = (PendingNode) getNext(); + String str = "PendingNode("; + str += System.identityHashCode(this) + "),root="+isHeadNode()+",next="+(n != null ?System.identityHashCode(n):"NULL"); + return str; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java new file mode 100644 index 0000000000..075160cc9d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; + +public class PrioritizedPendingList implements PendingList { + static final Integer MAX_PRIORITY = 10; + private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY]; + final Map map = new HashMap(); + + public PrioritizedPendingList() { + for (int i = 0; i < MAX_PRIORITY; i++) { + this.lists[i] = new OrderedPendingList(); + } + } + public PendingNode addMessageFirst(MessageReference message) { + PendingNode node = getList(message).addMessageFirst(message); + this.map.put(message.getMessageId(), node); + return node; + } + + public PendingNode addMessageLast(MessageReference message) { + PendingNode node = getList(message).addMessageLast(message); + this.map.put(message.getMessageId(), node); + return node; + } + + public void clear() { + for (int i = 0; i < MAX_PRIORITY; i++) { + this.lists[i].clear(); + } + this.map.clear(); + } + + public boolean isEmpty() { + return this.map.isEmpty(); + } + + public Iterator iterator() { + return new PrioritizedPendingListIterator(); + } + + public void remove(MessageReference message) { + if (message != null) { + PendingNode node = this.map.remove(message.getMessageId()); + if (node != null) { + node.getList().removeNode(node); + } + } + } + + public int size() { + return this.map.size(); + } + + @Override + public String toString() { + return "PrioritizedPendingList(" + System.identityHashCode(this) + ")"; + } + + protected int getPriority(MessageReference message) { + int priority = javax.jms.Message.DEFAULT_PRIORITY; + if (message.getMessageId() != null) { + Math.max(message.getMessage().getPriority(), 0); + priority = Math.min(priority, 9); + } + return priority; + } + + protected OrderedPendingList getList(MessageReference msg) { + return lists[getPriority(msg)]; + } + + private class PrioritizedPendingListIterator implements Iterator { + private int index = 0; + private int currentIndex = 0; + List list = new ArrayList(size()); + + PrioritizedPendingListIterator() { + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + OrderedPendingList orderedPendingList = lists[i]; + if (!orderedPendingList.isEmpty()) { + list.addAll(orderedPendingList.getAsList()); + } + } + } + public boolean hasNext() { + return list.size() > index; + } + + public MessageReference next() { + PendingNode node = list.get(this.index); + this.currentIndex = this.index; + this.index++; + return node.getMessage(); + } + + public void remove() { + PendingNode node = list.get(this.currentIndex); + if (node != null) { + map.remove(node.getMessage().getMessageId()); + node.getList().removeNode(node); + } + + } + + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 410ab1d51a..bc306db208 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -58,13 +57,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @param subscription subscription for this cursor */ public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) { + super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription)); this.subscription=subscription; this.clientId = clientId; this.subscriberName = subscriberName; if (broker.getBrokerService().isPersistent()) { - this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName); + this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages); }else { - this.nonPersistent = new VMPendingMessageCursor(); + this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); } this.nonPersistent.setMaxBatchSize(maxBatchSize); @@ -72,6 +72,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { this.storePrefetches.add(this.nonPersistent); } + @Override public synchronized void start() throws Exception { if (!isStarted()) { super.start(); @@ -82,6 +83,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + @Override public synchronized void stop() throws Exception { if (isStarted()) { super.stop(); @@ -98,6 +100,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @param destination * @throws Exception */ + @Override public synchronized void add(ConnectionContext context, Destination destination) throws Exception { if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName); @@ -122,6 +125,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @param destination * @throws Exception */ + @Override public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { PendingMessageCursor tsp = topics.remove(destination); if (tsp != null) { @@ -133,6 +137,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { /** * @return true if there are no pending messages */ + @Override public synchronized boolean isEmpty() { for (PendingMessageCursor tsp : storePrefetches) { if( !tsp.isEmpty() ) @@ -141,6 +146,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { return true; } + @Override public synchronized boolean isEmpty(Destination destination) { boolean result = true; TopicStorePrefetch tsp = topics.get(destination); @@ -157,10 +163,12 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor * @return true if recovery required */ + @Override public boolean isRecoveryRequired() { return false; } + @Override public synchronized void addMessageLast(MessageReference node) throws Exception { if (node != null) { Message msg = node.getMessage(); @@ -179,16 +187,19 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + @Override public synchronized void addRecoveredMessage(MessageReference node) throws Exception { nonPersistent.addMessageLast(node); } + @Override public synchronized void clear() { for (PendingMessageCursor tsp : storePrefetches) { tsp.clear(); } } + @Override public synchronized boolean hasNext() { boolean result = true; if (result) { @@ -203,35 +214,41 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { return result; } + @Override public synchronized MessageReference next() { MessageReference result = currentCursor != null ? currentCursor.next() : null; return result; } + @Override public synchronized void remove() { if (currentCursor != null) { currentCursor.remove(); } } + @Override public synchronized void remove(MessageReference node) { if (currentCursor != null) { currentCursor.remove(node); } } + @Override public synchronized void reset() { for (PendingMessageCursor storePrefetch : storePrefetches) { storePrefetch.reset(); } } + @Override public synchronized void release() { for (PendingMessageCursor storePrefetch : storePrefetches) { storePrefetch.release(); } } + @Override public synchronized int size() { int pendingCount=0; for (PendingMessageCursor tsp : storePrefetches) { @@ -240,6 +257,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { return pendingCount; } + @Override public void setMaxBatchSize(int maxBatchSize) { for (PendingMessageCursor storePrefetch : storePrefetches) { storePrefetch.setMaxBatchSize(maxBatchSize); @@ -247,12 +265,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { super.setMaxBatchSize(maxBatchSize); } + @Override public synchronized void gc() { for (PendingMessageCursor tsp : storePrefetches) { tsp.gc(); } } + @Override public void setSystemUsage(SystemUsage usageManager) { super.setSystemUsage(usageManager); for (PendingMessageCursor tsp : storePrefetches) { @@ -260,6 +280,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + @Override public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); for (PendingMessageCursor cursor : storePrefetches) { @@ -267,6 +288,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + @Override public void setMaxProducersToAudit(int maxProducersToAudit) { super.setMaxProducersToAudit(maxProducersToAudit); for (PendingMessageCursor cursor : storePrefetches) { @@ -274,6 +296,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + @Override public void setMaxAuditDepth(int maxAuditDepth) { super.setMaxAuditDepth(maxAuditDepth); for (PendingMessageCursor cursor : storePrefetches) { @@ -281,6 +304,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + @Override public void setEnableAudit(boolean enableAudit) { super.setEnableAudit(enableAudit); for (PendingMessageCursor cursor : storePrefetches) { @@ -288,6 +312,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + @Override public void setUseCache(boolean useCache) { super.setUseCache(useCache); for (PendingMessageCursor cursor : storePrefetches) { @@ -313,6 +338,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { return currentCursor; } + @Override public String toString() { return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index 967d06714c..b92e523e9b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -32,21 +32,21 @@ import org.apache.commons.logging.LogFactory; public class StoreQueueCursor extends AbstractPendingMessageCursor { private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class); - private Broker broker; + private final Broker broker; private int pendingCount; - private Queue queue; + private final Queue queue; private PendingMessageCursor nonPersistent; - private QueueStorePrefetch persistent; + private final QueueStorePrefetch persistent; private boolean started; private PendingMessageCursor currentCursor; /** * Construct - * + * @param broker * @param queue - * @param tmpStore */ public StoreQueueCursor(Broker broker,Queue queue) { + super((queue != null ? queue.isPrioritizedMessages():false)); this.broker=broker; this.queue = queue; this.persistent = new QueueStorePrefetch(queue); @@ -58,9 +58,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { super.start(); if (nonPersistent == null) { if (broker.getBrokerService().isPersistent()) { - nonPersistent = new FilePendingMessageCursor(broker,queue.getName()); + nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages); }else { - nonPersistent = new VMPendingMessageCursor(); + nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); } nonPersistent.setMaxBatchSize(getMaxBatchSize()); nonPersistent.setSystemUsage(systemUsage); @@ -101,7 +101,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { } } } - + public synchronized void addMessageFirst(MessageReference node) throws Exception { if (node != null) { Message msg = node.getMessage(); @@ -240,6 +240,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { } } + @Override public void setUseCache(boolean useCache) { super.setUseCache(useCache); if (persistent != null) { @@ -250,6 +251,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { } } + @Override public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); if (persistent != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java index fb17f1adde..c8d856d3c6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java @@ -32,13 +32,20 @@ import org.apache.activemq.broker.region.QueueMessageReference; * @version $Revision$ */ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { - private final LinkedList list = new LinkedList(); + private final PendingList list; private Iterator iter; - public VMPendingMessageCursor() { + + public VMPendingMessageCursor(boolean prioritizedMessages) { + super(prioritizedMessages); this.useCache = false; + if (this.prioritizedMessages) { + this.list= new PrioritizedPendingList(); + }else { + this.list = new OrderedPendingList(); + } } - @Override + public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { List rc = new ArrayList(); @@ -56,7 +63,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * @return true if there are no pending messages */ - @Override + public synchronized boolean isEmpty() { if (list.isEmpty()) { return true; @@ -79,9 +86,9 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * reset the cursor */ - @Override + public synchronized void reset() { - iter = list.listIterator(); + iter = list.iterator(); last = null; } @@ -90,10 +97,10 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { * * @param node */ - @Override + public synchronized void addMessageLast(MessageReference node) { node.incrementReferenceCount(); - list.addLast(node); + list.addMessageLast(node); } /** @@ -102,16 +109,16 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { * @param position * @param node */ - @Override + public synchronized void addMessageFirst(MessageReference node) { node.incrementReferenceCount(); - list.addFirst(node); + list.addMessageFirst(node); } /** * @return true if there pending messages to dispatch */ - @Override + public synchronized boolean hasNext() { return iter.hasNext(); } @@ -119,7 +126,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * @return the next pending message */ - @Override + public synchronized MessageReference next() { last = iter.next(); if (last != null) { @@ -131,7 +138,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * remove the message at the cursor position */ - @Override + public synchronized void remove() { if (last != null) { last.decrementReferenceCount(); @@ -142,7 +149,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * @return the number of pending messages */ - @Override + public synchronized int size() { return list.size(); } @@ -150,7 +157,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { /** * clear all pending messages */ - @Override + public synchronized void clear() { for (Iterator i = list.iterator(); i.hasNext();) { MessageReference ref = i.next(); @@ -159,16 +166,10 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { list.clear(); } - @Override + public synchronized void remove(MessageReference node) { - for (Iterator i = list.iterator(); i.hasNext();) { - MessageReference ref = i.next(); - if (node.getMessageId().equals(ref.getMessageId())) { - ref.decrementReferenceCount(); - i.remove(); - break; - } - } + list.remove(node); + node.decrementReferenceCount(); } /** @@ -177,10 +178,11 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { * @param maxItems * @return a list of paged in messages */ - @Override + public LinkedList pageInList(int maxItems) { LinkedList result = new LinkedList(); - for (MessageReference ref: list) { + for (Iteratori = list.iterator();i.hasNext();) { + MessageReference ref = i.next(); ref.incrementReferenceCount(); result.add(ref); if (result.size() >= maxItems) { @@ -190,12 +192,12 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { return result; } - @Override + public boolean isTransient() { return true; } - @Override + public void destroy() throws Exception { super.destroy(); clear(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java index 235f948aa5..7c8c9cc584 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -42,6 +43,6 @@ public class FilePendingDurableSubscriberMessageStoragePolicy implements Pending * @return the Pending Message cursor */ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) { - return new FilePendingMessageCursor(broker,name); + return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub)); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java index f735435bfa..09ac67ef07 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java @@ -39,7 +39,7 @@ public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessage * org.apache.activemq.kaha.Store) */ public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) { - return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName()); + return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName(),queue.isPrioritizedMessages()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java index 91f9eecb61..52beec5fa7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java @@ -17,6 +17,8 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -31,14 +33,16 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor; public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy { /** - * @param broker + * @param broker * @param name * @param maxBatchSize * @return a Cursor * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, * org.apache.activemq.kaha.Store, int) */ - public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) { - return new FilePendingMessageCursor(broker,"PendingCursor:" + name); + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker, String name, int maxBatchSize, + Subscription subs) { + return new FilePendingMessageCursor(broker, "PendingCursor:" + name, AbstractPendingMessageCursor + .isPrioritizedMessageSubscriber(broker, subs)); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java index 537ec616e3..afdde76852 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; /** @@ -35,5 +36,5 @@ public interface PendingSubscriberMessageStoragePolicy { * @param maxBatchSize * @return the Pending Message cursor */ - PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize); + PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 5a396d7ba4..ae607ad3c3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -87,6 +87,7 @@ public class PolicyEntry extends DestinationMapEntry { private int cursorMemoryHighWaterMark = 70; private int storeUsageHighWaterMark = 100; private SlowConsumerStrategy slowConsumerStrategy; + private boolean prioritizedMessages; public void configure(Broker broker,Queue queue) { @@ -155,6 +156,7 @@ public class PolicyEntry extends DestinationMapEntry { scs.setScheduler(broker.getScheduler()); } destination.setSlowConsumerStrategy(scs); + destination.setPrioritizedMessages(isPrioritizedMessages()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -184,7 +186,7 @@ public class PolicyEntry extends DestinationMapEntry { if (pendingSubscriberPolicy != null) { String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); - subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize)); + subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription)); } if (enableAudit) { subscription.setEnableAudit(enableAudit); @@ -739,5 +741,14 @@ public class PolicyEntry extends DestinationMapEntry { public SlowConsumerStrategy getSlowConsumerStrategy() { return this.slowConsumerStrategy; } + + + public boolean isPrioritizedMessages() { + return this.prioritizedMessages; + } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + this.prioritizedMessages = prioritizedMessages; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java index ebbe3f3ef6..4559257376 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; @@ -40,6 +41,6 @@ public class VMPendingDurableSubscriberMessageStoragePolicy implements PendingDu * @return the Pending Message cursor */ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) { - return new VMPendingMessageCursor(); + return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub)); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java index ddd41ac3c1..109c627b6c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java @@ -37,6 +37,6 @@ public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageSt * @return the cursor */ public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) { - return new VMPendingMessageCursor(); + return new VMPendingMessageCursor(queue.isPrioritizedMessages()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java index db0c733bf7..b4619c852c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java @@ -17,6 +17,8 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; @@ -38,7 +40,7 @@ public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscribe * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, * org.apache.activemq.kaha.Store, int) */ - public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) { - return new VMPendingMessageCursor(); + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs) { + return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, subs)); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java index d0fe8e1dcc..841917f7d7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.broker.util; -import java.io.IOException; import java.util.Set; import javax.annotation.PostConstruct; import org.apache.activemq.broker.BrokerPluginSupport; @@ -65,7 +64,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { private boolean logInternalEvents = false; /** - * + * * @throws Exception * @org.apache.xbean.InitMethod */ @@ -77,7 +76,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { public boolean isLogAll() { return logAll; } - + /** * Log all Events that go through the Plugin */ @@ -152,15 +151,12 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void acknowledge(ConsumerBrokerExchange consumerExchange, - MessageAck ack) throws Exception { + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { if (isLogAll() || isLogConsumerEvents()) { - LOG.info("Acknowledging message for client ID : " - + consumerExchange.getConnectionContext().getClientId() + LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId() + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : "")); if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) { - LOG.trace("Message count: " + ack.getMessageCount() - + ", First Message Id: " + ack.getFirstMessageId() + LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId() + ", Last Message Id: " + ack.getLastMessageId()); } } @@ -168,18 +164,15 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public Response messagePull(ConnectionContext context, MessagePull pull) - throws Exception { + public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { if (isLogAll() || isLogConsumerEvents()) { - LOG.info("Message Pull from : " + context.getClientId() + " on " - + pull.getDestination().getPhysicalName()); + LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName()); } return super.messagePull(context, pull); } @Override - public void addConnection(ConnectionContext context, ConnectionInfo info) - throws Exception { + public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { LOG.info("Adding Connection : " + context); } @@ -187,8 +180,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) - throws Exception { + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { LOG.info("Adding Consumer : " + info); } @@ -196,8 +188,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void addProducer(ConnectionContext context, ProducerInfo info) - throws Exception { + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { if (isLogAll() || isLogProducerEvents()) { LOG.info("Adding Producer :" + info); } @@ -205,8 +196,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void commitTransaction(ConnectionContext context, TransactionId xid, - boolean onePhase) throws Exception { + public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Commiting transaction : " + xid.getTransactionKey()); } @@ -214,8 +204,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void removeSubscription(ConnectionContext context, - RemoveSubscriptionInfo info) throws Exception { + public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { LOG.info("Removing subscription : " + info); } @@ -223,8 +212,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public TransactionId[] getPreparedTransactions(ConnectionContext context) - throws Exception { + public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { TransactionId[] result = super.getPreparedTransactions(context); if ((isLogAll() || isLogTransactionEvents()) && result != null) { @@ -241,8 +229,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public int prepareTransaction(ConnectionContext context, TransactionId xid) - throws Exception { + public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Preparing transaction : " + xid.getTransactionKey()); } @@ -250,8 +237,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void removeConnection(ConnectionContext context, - ConnectionInfo info, Throwable error) throws Exception { + public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { if (isLogAll() || isLogConnectionEvents()) { LOG.info("Removing Connection : " + info); } @@ -259,8 +245,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void removeConsumer(ConnectionContext context, ConsumerInfo info) - throws Exception { + public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (isLogAll() || isLogConsumerEvents()) { LOG.info("Removing Consumer : " + info); } @@ -268,8 +253,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void removeProducer(ConnectionContext context, ProducerInfo info) - throws Exception { + public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { if (isLogAll() || isLogProducerEvents()) { LOG.info("Removing Producer : " + info); } @@ -277,8 +261,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void rollbackTransaction(ConnectionContext context, TransactionId xid) - throws Exception { + public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Rolling back Transaction : " + xid.getTransactionKey()); } @@ -286,8 +269,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void send(ProducerBrokerExchange producerExchange, - Message messageSend) throws Exception { + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { if (isLogAll() || isLogProducerEvents()) { LOG.info("Sending message : " + messageSend); } @@ -295,8 +277,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void beginTransaction(ConnectionContext context, TransactionId xid) - throws Exception { + public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { if (isLogAll() || isLogTransactionEvents()) { LOG.info("Beginning transaction : " + xid.getTransactionKey()); } @@ -304,11 +285,9 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void forgetTransaction(ConnectionContext context, - TransactionId transactionId) throws Exception { + public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { if (isLogAll() || isLogTransactionEvents()) { - LOG.info("Forgetting transaction : " - + transactionId.getTransactionKey()); + LOG.info("Forgetting transaction : " + transactionId.getTransactionKey()); } super.forgetTransaction(context, transactionId); } @@ -333,23 +312,20 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public org.apache.activemq.broker.region.Destination addDestination( - ConnectionContext context, ActiveMQDestination destination,boolean create) - throws Exception { + public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context, + ActiveMQDestination destination, boolean create) throws Exception { if (isLogAll() || isLogInternalEvents()) { - LOG.info("Adding destination : " - + destination.getDestinationTypeAsString() + ":" + LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":" + destination.getPhysicalName()); } - return super.addDestination(context, destination,create); + return super.addDestination(context, destination, create); } @Override - public void removeDestination(ConnectionContext context, - ActiveMQDestination destination, long timeout) throws Exception { + public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) + throws Exception { if (isLogAll() || isLogInternalEvents()) { - LOG.info("Removing destination : " - + destination.getDestinationTypeAsString() + ":" + LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":" + destination.getPhysicalName()); } super.removeDestination(context, destination, timeout); @@ -390,8 +366,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void addSession(ConnectionContext context, SessionInfo info) - throws Exception { + public void addSession(ConnectionContext context, SessionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { LOG.info("Adding Session : " + info); } @@ -399,8 +374,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void removeSession(ConnectionContext context, SessionInfo info) - throws Exception { + public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { if (isLogAll() || isLogConnectionEvents()) { LOG.info("Removing Session : " + info); } @@ -458,12 +432,9 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void processDispatchNotification( - MessageDispatchNotification messageDispatchNotification) - throws Exception { + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) { - LOG.info("ProcessDispatchNotification :" - + messageDispatchNotification); + LOG.info("ProcessDispatchNotification :" + messageDispatchNotification); } super.processDispatchNotification(messageDispatchNotification); } @@ -487,8 +458,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void addDestinationInfo(ConnectionContext context, - DestinationInfo info) throws Exception { + public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Adding destination info : " + info); } @@ -496,8 +466,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void removeDestinationInfo(ConnectionContext context, - DestinationInfo info) throws Exception { + public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { if (isLogAll() || isLogInternalEvents()) { LOG.info("Removing destination info : " + info); } @@ -505,36 +474,31 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void messageExpired(ConnectionContext context, - MessageReference message) { + public void messageExpired(ConnectionContext context, MessageReference message) { if (isLogAll() || isLogInternalEvents()) { String msg = "Unable to display message."; - try { - msg = message.getMessage().toString(); - } catch (IOException ioe) { - } + + msg = message.getMessage().toString(); + LOG.info("Message has expired : " + msg); } super.messageExpired(context, message); } @Override - public void sendToDeadLetterQueue(ConnectionContext context, - MessageReference messageReference) { + public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogInternalEvents()) { String msg = "Unable to display message."; - try { - msg = messageReference.getMessage().toString(); - } catch (IOException ioe) { - } + + msg = messageReference.getMessage().toString(); + LOG.info("Sending to DLQ : " + msg); } super.sendToDeadLetterQueue(context, messageReference); } @Override - public void fastProducer(ConnectionContext context, - ProducerInfo producerInfo) { + public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { LOG.info("Fast Producer : " + producerInfo); } @@ -542,8 +506,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void isFull(ConnectionContext context, Destination destination, - Usage usage) { + public void isFull(ConnectionContext context, Destination destination, Usage usage) { if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) { LOG.info("Destination is full : " + destination.getName()); } @@ -551,50 +514,43 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport { } @Override - public void messageConsumed(ConnectionContext context, - MessageReference messageReference) { + public void messageConsumed(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { String msg = "Unable to display message."; - try { - msg = messageReference.getMessage().toString(); - } catch (IOException ioe) { - } + + msg = messageReference.getMessage().toString(); + LOG.info("Message consumed : " + msg); } super.messageConsumed(context, messageReference); } @Override - public void messageDelivered(ConnectionContext context, - MessageReference messageReference) { + public void messageDelivered(ConnectionContext context, MessageReference messageReference) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { String msg = "Unable to display message."; - try { - msg = messageReference.getMessage().toString(); - } catch (IOException ioe) { - } + + msg = messageReference.getMessage().toString(); + LOG.info("Message delivered : " + msg); } super.messageDelivered(context, messageReference); } @Override - public void messageDiscarded(ConnectionContext context, Subscription sub, - MessageReference messageReference) { + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { if (isLogAll() || isLogInternalEvents()) { String msg = "Unable to display message."; - try { - msg = messageReference.getMessage().toString(); - } catch (IOException ioe) { - } + + msg = messageReference.getMessage().toString(); + LOG.info("Message discarded : " + msg); } super.messageDiscarded(context, sub, messageReference); } @Override - public void slowConsumer(ConnectionContext context, - Destination destination, Subscription subs) { + public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) { LOG.info("Detected slow consumer on " + destination.getName()); StringBuffer buf = new StringBuffer("Connection("); diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java index 042c567a53..19ac309ca3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java @@ -564,7 +564,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess return this; } - public Message getMessage() throws IOException { + public Message getMessage() { return this; } diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java b/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java index 30d4339d21..751f281156 100644 --- a/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.memory.list; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -38,10 +37,10 @@ import org.apache.commons.logging.LogFactory; */ public class SimpleMessageList implements MessageList { private static final Log LOG = LogFactory.getLog(SimpleMessageList.class); - private LinkedList list = new LinkedList(); + private final LinkedList list = new LinkedList(); private int maximumSize = 100 * 64 * 1024; private int size; - private Object lock = new Object(); + private final Object lock = new Object(); public SimpleMessageList() { } @@ -73,13 +72,9 @@ public class SimpleMessageList implements MessageList { for (Iterator i = list.iterator(); i.hasNext();) { MessageReference ref = i.next(); Message msg; - try { - msg = ref.getMessage(); - if (filter.matches(msg.getDestination())) { - result.add(msg); - } - } catch (IOException e) { - LOG.error("Failed to get Message from MessageReference: " + ref, e); + msg = ref.getMessage(); + if (filter.matches(msg.getDestination())) { + result.add(msg); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java b/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java index b9ec3fd13d..f6ca62d202 100644 --- a/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java @@ -16,9 +16,7 @@ */ package org.apache.activemq.plugin; -import java.io.IOException; import java.util.regex.Pattern; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; @@ -48,25 +46,15 @@ public class DiscardingDLQBroker extends BrokerFilter { @Override public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) { if (log.isTraceEnabled()) { - try { - log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null)); - } catch (IOException x) { - log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + msgRef != null ? msgRef : null, x); - } + log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null)); } boolean dropped = true; Message msg = null; ActiveMQDestination dest = null; String destName = null; - try { - msg = msgRef.getMessage(); - dest = msg.getDestination(); - destName = dest.getPhysicalName(); - }catch (IOException x) { - if (log.isDebugEnabled()) { - log.debug("Unable to retrieve message or destination for message going to Dead Letter Queue. message skipped.", x); - } - } + msg = msgRef.getMessage(); + dest = msg.getDestination(); + destName = dest.getPhysicalName(); if (dest == null || destName == null ) { //do nothing, no need to forward it @@ -105,12 +93,8 @@ public class DiscardingDLQBroker extends BrokerFilter { private void skipMessage(String prefix, MessageReference msgRef) { if (log.isDebugEnabled()) { - try { - String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null); - log.debug(lmsg); - }catch (IOException x) { - log.debug("Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef:null),x); - } + String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null); + log.debug(lmsg); } }