diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index ec0c8a9f4f..cd7d897ac8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -82,11 +82,11 @@ public class DestinationView implements DestinationViewMBean { public long getDispatchCount() { return destination.getDestinationStatistics().getDispatched().getCount(); } - + public long getInFlightCount() { return destination.getDestinationStatistics().getInflight().getCount(); } - + public long getExpiredCount() { return destination.getDestinationStatistics().getExpired().getCount(); } @@ -220,7 +220,7 @@ public class DestinationView implements DestinationViewMBean { OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); Message[] messages = destination.browse(); CompositeType ct = factory.getCompositeType(); - TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"}); + TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" }); TabularDataSupport rc = new TabularDataSupport(tt); MessageEvaluationContext ctx = new MessageEvaluationContext(); @@ -248,16 +248,16 @@ public class DestinationView implements DestinationViewMBean { public String sendTextMessage(String body) throws Exception { return sendTextMessage(Collections.EMPTY_MAP, body); } - + public String sendTextMessage(Map headers, String body) throws Exception { - return sendTextMessage(headers,body,null,null); + return sendTextMessage(headers, body, null, null); } public String sendTextMessage(String body, String user, String password) throws Exception { - return sendTextMessage(Collections.EMPTY_MAP,body,user,password); + return sendTextMessage(Collections.EMPTY_MAP, body, user, password); } - public String sendTextMessage(Map headers, String body,String userName,String password) throws Exception { + public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception { String brokerUrl = "vm://" + broker.getBrokerName(); ActiveMQDestination dest = destination.getActiveMQDestination(); @@ -266,14 +266,14 @@ public class DestinationView implements DestinationViewMBean { Connection connection = null; try { - connection = cf.createConnection(userName,password); + connection = cf.createConnection(userName, password); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(dest); - ActiveMQTextMessage msg = (ActiveMQTextMessage)session.createTextMessage(body); + ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body); for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) { - Map.Entry entry = (Map.Entry)iter.next(); - msg.setObjectProperty((String)entry.getKey(), entry.getValue()); + Map.Entry entry = (Map.Entry) iter.next(); + msg.setObjectProperty((String) entry.getKey(), entry.getValue()); } producer.setDeliveryMode(msg.getJMSDeliveryMode()); @@ -292,30 +292,28 @@ public class DestinationView implements DestinationViewMBean { public int getMaxAuditDepth() { return destination.getMaxAuditDepth(); - } + } - public int getMaxProducersToAudit() { - return destination.getMaxProducersToAudit(); - } + public int getMaxProducersToAudit() { + return destination.getMaxProducersToAudit(); + } - public boolean isEnableAudit() { - return destination.isEnableAudit(); - } + public boolean isEnableAudit() { + return destination.isEnableAudit(); + } - - public void setEnableAudit(boolean enableAudit) { - destination.setEnableAudit(enableAudit); - } + public void setEnableAudit(boolean enableAudit) { + destination.setEnableAudit(enableAudit); + } - public void setMaxAuditDepth(int maxAuditDepth) { - destination.setMaxAuditDepth(maxAuditDepth); - } - - public void setMaxProducersToAudit(int maxProducersToAudit) { - destination.setMaxProducersToAudit(maxProducersToAudit); - } + public void setMaxAuditDepth(int maxAuditDepth) { + destination.setMaxAuditDepth(maxAuditDepth); + } + + public void setMaxProducersToAudit(int maxProducersToAudit) { + destination.setMaxProducersToAudit(maxProducersToAudit); + } - public float getMemoryUsagePortion() { return destination.getMemoryUsage().getUsagePortion(); } @@ -325,31 +323,52 @@ public class DestinationView implements DestinationViewMBean { } public boolean isProducerFlowControl() { - return destination.isProducerFlowControl(); + return destination.isProducerFlowControl(); } - + public void setMemoryUsagePortion(float value) { destination.getMemoryUsage().setUsagePortion(value); } public void setProducerFlowControl(boolean producerFlowControl) { - destination.setProducerFlowControl(producerFlowControl); + destination.setProducerFlowControl(producerFlowControl); + } + + /** + * Set's the interval at which warnings about producers being blocked by + * resource usage will be triggered. Values of 0 or less will disable + * warnings + * + * @param blockedProducerWarningInterval the interval at which warning about + * blocked producers will be triggered. + */ + public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { + destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval); + } + + /** + * + * @return the interval at which warning about blocked producers will be + * triggered. + */ + public long getBlockedProducerWarningInterval() { + return destination.getBlockedProducerWarningInterval(); } public int getMaxPageSize() { return destination.getMaxPageSize(); } - + public void setMaxPageSize(int pageSize) { destination.setMaxPageSize(pageSize); } - + public boolean isUseCache() { return destination.isUseCache(); } public void setUseCache(boolean value) { - destination.setUseCache(value); + destination.setUseCache(value); } public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 08ad70d849..5db81811b7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -248,11 +248,30 @@ public interface DestinationViewMBean { */ @MBeanInfo("Producers are flow controlled") boolean isProducerFlowControl(); + /** * @param producerFlowControl the producerFlowControl to set */ public void setProducerFlowControl(@MBeanInfo("producerFlowControl") boolean producerFlowControl); + /** + * Set's the interval at which warnings about producers being blocked by + * resource usage will be triggered. Values of 0 or less will disable + * warnings + * + * @param blockedProducerWarningInterval the interval at which warning about + * blocked producers will be triggered. + */ + public void setBlockedProducerWarningInterval(@MBeanInfo("blockedProducerWarningInterval") long blockedProducerWarningInterval); + + /** + * + * @return the interval at which warning about blocked producers will be + * triggered. + */ + @MBeanInfo("Blocked Producer Warning Interval") + public long getBlockedProducerWarningInterval(); + /** * @return the maxProducersToAudit */ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 132c349d58..d104fff849 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -40,18 +40,21 @@ import org.apache.activemq.usage.Usage; */ public abstract class BaseDestination implements Destination { /** - * The maximum number of messages to page in to the destination from persistent storage + * The maximum number of messages to page in to the destination from + * persistent storage */ public static final int MAX_PAGE_SIZE = 200; public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; - public static final long EXPIRE_MESSAGE_PERIOD = 30*1000; + public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; protected final ActiveMQDestination destination; protected final Broker broker; protected final MessageStore store; protected SystemUsage systemUsage; protected MemoryUsage memoryUsage; private boolean producerFlowControl = true; - protected boolean warnOnProducerFlowControl = true; + protected boolean warnOnProducerFlowControl = true; + protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; + private int maxProducersToAudit = 1024; private int maxAuditDepth = 2048; private boolean enableAudit = true; @@ -82,8 +85,7 @@ public abstract class BaseDestination implements Destination { * @param parentStats * @throws Exception */ - public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, - DestinationStatistics parentStats) throws Exception { + public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { this.brokerService = brokerService; this.broker = brokerService.getBroker(); this.store = store; @@ -118,13 +120,33 @@ public abstract class BaseDestination implements Destination { } /** - * @param producerFlowControl - * the producerFlowControl to set + * @param producerFlowControl the producerFlowControl to set */ public void setProducerFlowControl(boolean producerFlowControl) { this.producerFlowControl = producerFlowControl; } + /** + * Set's the interval at which warnings about producers being blocked by + * resource usage will be triggered. Values of 0 or less will disable + * warnings + * + * @param blockedProducerWarningInterval the interval at which warning about + * blocked producers will be triggered. + */ + public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { + this.blockedProducerWarningInterval = blockedProducerWarningInterval; + } + + /** + * + * @return the interval at which warning about blocked producers will be + * triggered. + */ + public long getBlockedProducerWarningInterval() { + return blockedProducerWarningInterval; + } + /** * @return the maxProducersToAudit */ @@ -133,8 +155,7 @@ public abstract class BaseDestination implements Destination { } /** - * @param maxProducersToAudit - * the maxProducersToAudit to set + * @param maxProducersToAudit the maxProducersToAudit to set */ public void setMaxProducersToAudit(int maxProducersToAudit) { this.maxProducersToAudit = maxProducersToAudit; @@ -148,8 +169,7 @@ public abstract class BaseDestination implements Destination { } /** - * @param maxAuditDepth - * the maxAuditDepth to set + * @param maxAuditDepth the maxAuditDepth to set */ public void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; @@ -163,8 +183,7 @@ public abstract class BaseDestination implements Destination { } /** - * @param enableAudit - * the enableAudit to set + * @param enableAudit the enableAudit to set */ public void setEnableAudit(boolean enableAudit) { this.enableAudit = enableAudit; @@ -199,8 +218,7 @@ public abstract class BaseDestination implements Destination { } public final boolean isActive() { - return destinationStatistics.getConsumers().getCount() != 0 - || destinationStatistics.getProducers().getCount() != 0; + return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0; } public int getMaxPageSize() { @@ -218,13 +236,13 @@ public abstract class BaseDestination implements Destination { public void setMaxBrowsePageSize(int maxPageSize) { this.maxBrowsePageSize = maxPageSize; } - + public int getMaxExpirePageSize() { return this.maxExpirePageSize; } public void setMaxExpirePageSize(int maxPageSize) { - this.maxExpirePageSize = maxPageSize; + this.maxExpirePageSize = maxPageSize; } public void setExpireMessagesPeriod(long expireMessagesPeriod) { @@ -234,7 +252,7 @@ public abstract class BaseDestination implements Destination { public long getExpireMessagesPeriod() { return expireMessagesPeriod; } - + public boolean isUseCache() { return useCache; } @@ -271,8 +289,7 @@ public abstract class BaseDestination implements Destination { } /** - * @param advisoryForSlowConsumers - * the advisoryForSlowConsumers to set + * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set */ public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { this.advisoryForSlowConsumers = advisoryForSlowConsumers; @@ -286,8 +303,8 @@ public abstract class BaseDestination implements Destination { } /** - * @param advisoryForDiscardingMessages - * the advisoryForDiscardingMessages to set + * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to + * set */ public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; @@ -301,8 +318,7 @@ public abstract class BaseDestination implements Destination { } /** - * @param advisoryWhenFull - * the advisoryWhenFull to set + * @param advisoryWhenFull the advisoryWhenFull to set */ public void setAdvisoryWhenFull(boolean advisoryWhenFull) { this.advisoryWhenFull = advisoryWhenFull; @@ -316,8 +332,7 @@ public abstract class BaseDestination implements Destination { } /** - * @param advisoryForDelivery - * the advisoryForDelivery to set + * @param advisoryForDelivery the advisoryForDelivery to set */ public void setAdvisoryForDelivery(boolean advisoryForDelivery) { this.advisoryForDelivery = advisoryForDelivery; @@ -331,8 +346,7 @@ public abstract class BaseDestination implements Destination { } /** - * @param advisoryForConsumed - * the advisoryForConsumed to set + * @param advisoryForConsumed the advisoryForConsumed to set */ public void setAdvisoryForConsumed(boolean advisoryForConsumed) { this.advisoryForConsumed = advisoryForConsumed; @@ -346,13 +360,12 @@ public abstract class BaseDestination implements Destination { } /** - * @param advisdoryForFastProducers - * the advisdoryForFastProducers to set + * @param advisdoryForFastProducers the advisdoryForFastProducers to set */ public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { this.advisdoryForFastProducers = advisdoryForFastProducers; } - + public boolean isSendAdvisoryIfNoConsumers() { return sendAdvisoryIfNoConsumers; } @@ -376,14 +389,14 @@ public abstract class BaseDestination implements Destination { public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { this.deadLetterStrategy = deadLetterStrategy; } - - public int getCursorMemoryHighWaterMark() { - return this.cursorMemoryHighWaterMark; - } - public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { - this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; - } + public int getCursorMemoryHighWaterMark() { + return this.cursorMemoryHighWaterMark; + } + + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; + } /** * called when message is consumed @@ -410,8 +423,8 @@ public abstract class BaseDestination implements Destination { } /** - * Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled - - * e.g. non durable topics + * Called when a message is discarded - e.g. running low on memory This will + * happen only if the policy is enabled - e.g. non durable topics * * @param context * @param messageReference @@ -460,19 +473,19 @@ public abstract class BaseDestination implements Destination { public void dispose(ConnectionContext context) throws IOException { if (this.store != null) { - this.store.removeAllMessages(context); + this.store.removeAllMessages(context); this.store.dispose(context); } this.destinationStatistics.setParent(null); this.memoryUsage.stop(); } - + /** * Provides a hook to allow messages with no consumer to be processed in * some way - such as to send to a dead letter queue or something.. */ protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { - if (!msg.isPersistent()) { + if (!msg.isPersistent()) { if (isSendAdvisoryIfNoConsumers()) { // allow messages with no consumers to be dispatched to a dead // letter queue @@ -489,12 +502,12 @@ public abstract class BaseDestination implements Destination { if (message.getOriginalTransactionId() != null) { message.setOriginalTransactionId(message.getTransactionId()); } - + ActiveMQTopic advisoryTopic; if (destination.isQueue()) { - advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); + advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); } else { - advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); + advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); } message.setDestination(advisoryTopic); message.setTransactionId(null); @@ -517,8 +530,9 @@ public abstract class BaseDestination implements Destination { } } } - - public void processDispatchNotification( - MessageDispatchNotification messageDispatchNotification) throws Exception { + + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index 8a59c0487b..18cf83c140 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -41,10 +41,12 @@ import org.apache.activemq.usage.Usage; public interface Destination extends Service, Task { public static final DeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new SharedDeadLetterStrategy(); + public static final long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000; + void addSubscription(ConnectionContext context, Subscription sub) throws Exception; void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception; - + void addProducer(ConnectionContext context, ProducerInfo info) throws Exception; void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception; @@ -70,122 +72,146 @@ public interface Destination extends Service, Task { String getName(); MessageStore getMessageStore(); - + boolean isProducerFlowControl(); - + void setProducerFlowControl(boolean value); - + + /** + * Set's the interval at which warnings about producers being blocked by + * resource usage will be triggered. Values of 0 or less will disable + * warnings + * + * @param blockedProducerWarningInterval the interval at which warning about + * blocked producers will be triggered. + */ + public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval); + + /** + * + * @return the interval at which warning about blocked producers will be + * triggered. + */ + public long getBlockedProducerWarningInterval(); + int getMaxProducersToAudit(); - + void setMaxProducersToAudit(int maxProducersToAudit); - + int getMaxAuditDepth(); - + void setMaxAuditDepth(int maxAuditDepth); - + boolean isEnableAudit(); - + void setEnableAudit(boolean enableAudit); - - boolean isActive(); - + + boolean isActive(); + int getMaxPageSize(); - + public void setMaxPageSize(int maxPageSize); - + public int getMaxBrowsePageSize(); public void setMaxBrowsePageSize(int maxPageSize); - + public boolean isUseCache(); - + public void setUseCache(boolean useCache); - + public int getMinimumMessageSize(); public void setMinimumMessageSize(int minimumMessageSize); - + public int getCursorMemoryHighWaterMark(); - public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); - + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); + /** - * optionally called by a Subscriber - to inform the Destination its - * ready for more messages + * optionally called by a Subscriber - to inform the Destination its ready + * for more messages */ public void wakeup(); - + /** * @return true if lazyDispatch is enabled */ public boolean isLazyDispatch(); - - + /** * set the lazy dispatch - default is false + * * @param value */ public void setLazyDispatch(boolean value); - /** * Inform the Destination a message has expired + * * @param context - * @param subs + * @param subs * @param node */ - void messageExpired(ConnectionContext context, Subscription subs,MessageReference node); + void messageExpired(ConnectionContext context, Subscription subs, MessageReference node); /** * called when message is consumed + * * @param context * @param messageReference */ - void messageConsumed(ConnectionContext context, MessageReference messageReference); - + void messageConsumed(ConnectionContext context, MessageReference messageReference); + /** * Called when message is delivered to the broker + * * @param context * @param messageReference */ - void messageDelivered(ConnectionContext context, MessageReference messageReference); - + void messageDelivered(ConnectionContext context, MessageReference messageReference); + /** - * Called when a message is discarded - e.g. running low on memory - * This will happen only if the policy is enabled - e.g. non durable topics + * Called when a message is discarded - e.g. running low on memory This will + * happen only if the policy is enabled - e.g. non durable topics + * * @param context * @param messageReference */ - void messageDiscarded(ConnectionContext context, MessageReference messageReference); - + void messageDiscarded(ConnectionContext context, MessageReference messageReference); + /** * Called when there is a slow consumer + * * @param context * @param subs */ - void slowConsumer(ConnectionContext context, Subscription subs); - + void slowConsumer(ConnectionContext context, Subscription subs); + /** * Called to notify a producer is too fast + * * @param context * @param producerInfo */ - void fastProducer(ConnectionContext context,ProducerInfo producerInfo); - + void fastProducer(ConnectionContext context, ProducerInfo producerInfo); + /** * Called when a Usage reaches a limit + * * @param context * @param usage */ - void isFull(ConnectionContext context,Usage usage); + void isFull(ConnectionContext context, Usage usage); List getConsumers(); /** - * called on Queues in slave mode to allow dispatch to follow subscription choice of master + * called on Queues in slave mode to allow dispatch to follow subscription + * choice of master + * * @param messageDispatchNotification * @throws Exception */ - void processDispatchNotification( - MessageDispatchNotification messageDispatchNotification) throws Exception; + void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 3d93292693..446356172f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -45,8 +45,7 @@ public class DestinationFilter implements Destination { this.next = next; } - public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) - throws IOException { + public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { next.acknowledge(context, sub, ack, node); } @@ -108,13 +107,13 @@ public class DestinationFilter implements Destination { /** * Sends a message to the given destination which may be a wildcard + * * @param context broker context * @param message message to send * @param destination possibly wildcard destination to send the message to * @throws Exception on error */ - protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) - throws Exception { + protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception { Broker broker = context.getConnectionContext().getBroker(); Set destinations = broker.getDestinations(destination); @@ -130,24 +129,30 @@ public class DestinationFilter implements Destination { public boolean isProducerFlowControl() { return next.isProducerFlowControl(); } - - public void setProducerFlowControl(boolean value){ + + public void setProducerFlowControl(boolean value) { next.setProducerFlowControl(value); } - public void addProducer(ConnectionContext context, ProducerInfo info) - throws Exception { - next.addProducer(context, info); - + public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { + next.setBlockedProducerWarningInterval(blockedProducerWarningInterval); + } + + public long getBlockedProducerWarningInterval() { + return next.getBlockedProducerWarningInterval(); } - public void removeProducer(ConnectionContext context, ProducerInfo info) - throws Exception { - next.removeProducer(context, info); + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { + next.addProducer(context, info); + + } + + public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { + next.removeProducer(context, info); } public int getMaxAuditDepth() { - return next.getMaxAuditDepth(); + return next.getMaxAuditDepth(); } public int getMaxProducersToAudit() { @@ -157,20 +162,19 @@ public class DestinationFilter implements Destination { public boolean isEnableAudit() { return next.isEnableAudit(); } - + public void setEnableAudit(boolean enableAudit) { next.setEnableAudit(enableAudit); } public void setMaxAuditDepth(int maxAuditDepth) { - next.setMaxAuditDepth(maxAuditDepth); + next.setMaxAuditDepth(maxAuditDepth); } - public void setMaxProducersToAudit(int maxProducersToAudit) { - next.setMaxProducersToAudit(maxProducersToAudit); + next.setMaxProducersToAudit(maxProducersToAudit); } - + public boolean isActive() { return next.isActive(); } @@ -189,88 +193,81 @@ public class DestinationFilter implements Destination { public void setUseCache(boolean useCache) { next.setUseCache(useCache); - } - + } + public int getMinimumMessageSize() { return next.getMinimumMessageSize(); } public void setMinimumMessageSize(int minimumMessageSize) { next.setMinimumMessageSize(minimumMessageSize); - } - + } + public void wakeup() { next.wakeup(); } public boolean isLazyDispatch() { - return next.isLazyDispatch(); + return next.isLazyDispatch(); } public void setLazyDispatch(boolean value) { - next.setLazyDispatch(value); + next.setLazyDispatch(value); } public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) { - next.messageExpired(context, prefetchSubscription, node); + next.messageExpired(context, prefetchSubscription, node); } - public boolean iterate() { - return next.iterate(); - } - - public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { - next.fastProducer(context, producerInfo); + public boolean iterate() { + return next.iterate(); + } + + public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { + next.fastProducer(context, producerInfo); } - public void isFull(ConnectionContext context, Usage usage) { - next.isFull(context, usage); + next.isFull(context, usage); } - - public void messageConsumed(ConnectionContext context,MessageReference messageReference) { + public void messageConsumed(ConnectionContext context, MessageReference messageReference) { next.messageConsumed(context, messageReference); } - - public void messageDelivered(ConnectionContext context,MessageReference messageReference) { + public void messageDelivered(ConnectionContext context, MessageReference messageReference) { next.messageDelivered(context, messageReference); } - - public void messageDiscarded(ConnectionContext context,MessageReference messageReference) { + public void messageDiscarded(ConnectionContext context, MessageReference messageReference) { next.messageDiscarded(context, messageReference); } - public void slowConsumer(ConnectionContext context, Subscription subs) { - next.slowConsumer(context, subs); + next.slowConsumer(context, subs); } - - public void messageExpired(ConnectionContext context, Subscription subs,MessageReference node) { - next.messageExpired(context,subs, node); + public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) { + next.messageExpired(context, subs, node); } public int getMaxBrowsePageSize() { - return next.getMaxBrowsePageSize(); + return next.getMaxBrowsePageSize(); } public void setMaxBrowsePageSize(int maxPageSize) { next.setMaxBrowsePageSize(maxPageSize); } - public void processDispatchNotification( - MessageDispatchNotification messageDispatchNotification) throws Exception { - next.processDispatchNotification(messageDispatchNotification); + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { + next.processDispatchNotification(messageDispatchNotification); } - public int getCursorMemoryHighWaterMark() { - return next.getCursorMemoryHighWaterMark(); - } + public int getCursorMemoryHighWaterMark() { + return next.getCursorMemoryHighWaterMark(); + } - public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { - next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); - } + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 13a7969316..31877108d0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -75,7 +75,6 @@ import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - /** * The Queue is a List of MessageEntry objects that are dispatched to matching * subscriptions. @@ -85,10 +84,10 @@ import org.apache.commons.logging.LogFactory; public class Queue extends BaseDestination implements Task, UsageListener { protected static final Log LOG = LogFactory.getLog(Queue.class); protected final TaskRunnerFactory taskFactory; - protected TaskRunner taskRunner; + protected TaskRunner taskRunner; protected final List consumers = new ArrayList(50); protected PendingMessageCursor messages; - private final LinkedHashMap pagedInMessages = new LinkedHashMap(); + private final LinkedHashMap pagedInMessages = new LinkedHashMap(); // Messages that are paged in but have not yet been targeted at a subscription private List pagedInPendingDispatch = new ArrayList(100); private MessageGroupMap messageGroupOwners; @@ -98,16 +97,16 @@ public class Queue extends BaseDestination implements Task, UsageListener { private ExecutorService executor; protected final LinkedList messagesWaitingForSpace = new LinkedList(); private final Object dispatchMutex = new Object(); - private boolean useConsumerPriority=true; - private boolean strictOrderDispatch=false; + private boolean useConsumerPriority = true; + private boolean strictOrderDispatch = false; private QueueDispatchSelector dispatchSelector; - private boolean optimizedDispatch=false; + private boolean optimizedDispatch = false; private boolean firstConsumer = false; private int timeBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0; private CountDownLatch consumersBeforeStartsLatch; private AtomicLong pendingWakeups = new AtomicLong(); - + private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { asyncWakeup(); @@ -115,25 +114,24 @@ public class Queue extends BaseDestination implements Task, UsageListener { }; private final Runnable expireMessagesTask = new Runnable() { public void run() { - expireMessages(); + expireMessages(); } }; private final Object iteratingMutex = new Object() {}; private static final Scheduler scheduler = Scheduler.getInstance(); - - private static final ComparatororderedCompare = new Comparator() { + + private static final Comparator orderedCompare = new Comparator() { public int compare(Subscription s1, Subscription s2) { //We want the list sorted in descending order return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); - } + } }; - - public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,DestinationStatistics parentStats, - TaskRunnerFactory taskFactory) throws Exception { + + public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { super(brokerService, store, destination, parentStats); - this.taskFactory=taskFactory; - this.dispatchSelector=new QueueDispatchSelector(destination); + this.taskFactory = taskFactory; + this.dispatchSelector = new QueueDispatchSelector(destination); } public List getConsumers() { @@ -145,13 +143,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { // make the queue easily visible in the debugger from its task runner threads final class QueueThread extends Thread { final Queue queue; - public QueueThread(Runnable runnable, String name, - Queue queue) { + + public QueueThread(Runnable runnable, String name, Queue queue) { super(runnable, name); this.queue = queue; } } - + public void initialize() throws Exception { if (this.messages == null) { if (destination.isTemporary() || broker == null || store == null) { @@ -168,10 +166,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { this.systemUsage = brokerService.getSystemUsage(); memoryUsage.setParent(systemUsage.getMemoryUsage()); } - - this.taskRunner = - taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); - + + this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); + super.initialize(); if (store != null) { // Restore the persistent messages. @@ -217,12 +214,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { public boolean hasSpace() { return true; } - + public boolean isDuplicate(MessageId id) { return false; } }); - }else { + } else { int messageCount = store.getMessageCount(); destinationStatistics.getMessages().setCount(messageCount); } @@ -230,22 +227,20 @@ public class Queue extends BaseDestination implements Task, UsageListener { } /* - * Holder for subscription and pagedInMessages as a browser - * needs access to existing messages in the queue that have - * already been dispatched + * Holder for subscription and pagedInMessages as a browser needs access to + * existing messages in the queue that have already been dispatched */ class BrowserDispatch { ArrayList messages; QueueBrowserSubscription browser; - - public BrowserDispatch(QueueBrowserSubscription browserSubscription, - Collection values) { - - messages = new ArrayList(values); + + public BrowserDispatch(QueueBrowserSubscription browserSubscription, Collection values) { + + messages = new ArrayList(values); browser = browserSubscription; browser.incrementQueueRef(); } - + void done() { try { browser.decrementQueueRef(); @@ -258,57 +253,57 @@ public class Queue extends BaseDestination implements Task, UsageListener { return browser; } } - + LinkedList browserDispatches = new LinkedList(); public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { // synchronize with dispatch method so that no new messages are sent // while setting up a subscription. avoid out of order messages, // duplicates, etc. - synchronized(dispatchMutex) { - + synchronized (dispatchMutex) { + sub.add(context, this); destinationStatistics.getConsumers().increment(); // needs to be synchronized - so no contention with dispatching synchronized (consumers) { - - // set a flag if this is a first consumer - if (consumers.size() == 0) { - firstConsumer = true; - if (consumersBeforeDispatchStarts != 0) { - consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1); - } - } else { - if (consumersBeforeStartsLatch != null) { - consumersBeforeStartsLatch.countDown(); - } - } - + + // set a flag if this is a first consumer + if (consumers.size() == 0) { + firstConsumer = true; + if (consumersBeforeDispatchStarts != 0) { + consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1); + } + } else { + if (consumersBeforeStartsLatch != null) { + consumersBeforeStartsLatch.countDown(); + } + } + addToConsumerList(sub); if (sub.getConsumerInfo().isExclusive()) { Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); - if(exclusiveConsumer==null) { - exclusiveConsumer=sub; - }else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()){ - exclusiveConsumer=sub; + if (exclusiveConsumer == null) { + exclusiveConsumer = sub; + } else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) { + exclusiveConsumer = sub; } dispatchSelector.setExclusiveConsumer(exclusiveConsumer); } } - - if (sub instanceof QueueBrowserSubscription ) { + + if (sub instanceof QueueBrowserSubscription) { QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; - + // do again in iterate to ensure new messages are dispatched pageInMessages(false); - - synchronized (pagedInMessages) { - if (!pagedInMessages.isEmpty()) { - BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values()); - browserDispatches.addLast(browserDispatch); - } - } + + synchronized (pagedInMessages) { + if (!pagedInMessages.isEmpty()) { + BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values()); + browserDispatches.addLast(browserDispatch); + } + } } if (!(this.optimizedDispatch || isSlave())) { wakeup(); @@ -321,30 +316,23 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) - throws Exception { + public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception { destinationStatistics.getConsumers().decrement(); // synchronize with dispatch method so that no new messages are sent // while removing up a subscription. - synchronized(dispatchMutex) { + synchronized (dispatchMutex) { if (LOG.isDebugEnabled()) { - LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId - + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() - + ", dispatched: " + getDestinationStatistics().getDispatched().getCount() - + ", inflight: " + getDestinationStatistics().getInflight().getCount()); + LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + ", dispatched: " + + getDestinationStatistics().getDispatched().getCount() + ", inflight: " + getDestinationStatistics().getInflight().getCount()); } synchronized (consumers) { removeFromConsumerList(sub); if (sub.getConsumerInfo().isExclusive()) { - Subscription exclusiveConsumer = dispatchSelector - .getExclusiveConsumer(); + Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); if (exclusiveConsumer == sub) { exclusiveConsumer = null; for (Subscription s : consumers) { - if (s.getConsumerInfo().isExclusive() - && (exclusiveConsumer == null - || s.getConsumerInfo().getPriority() > exclusiveConsumer - .getConsumerInfo().getPriority())) { + if (s.getConsumerInfo().isExclusive() && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority())) { exclusiveConsumer = s; } @@ -354,12 +342,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { } ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); getMessageGroupOwners().removeConsumer(consumerId); - + // redeliver inflight messages List list = new ArrayList(); for (MessageReference ref : sub.remove(context, this)) { - QueueMessageReference qmr = (QueueMessageReference)ref; - if( qmr.getLockOwner()==sub ) { + QueueMessageReference qmr = (QueueMessageReference) ref; + if (qmr.getLockOwner() == sub) { qmr.unlock(); // only increment redelivery if it was delivered or we have no delivery information if (lastDeiveredSequenceId == 0 || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) { @@ -368,7 +356,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } list.add(qmr); } - + if (!list.isEmpty()) { doDispatch(list); } @@ -400,32 +388,33 @@ public class Queue extends BaseDestination implements Task, UsageListener { } return; } - if(memoryUsage.isFull()) { + if (memoryUsage.isFull()) { isFull(context, memoryUsage); fastProducer(context, producerInfo); if (isProducerFlowControl() && context.isProducerFlowControl()) { - if(warnOnProducerFlowControl) { + if (warnOnProducerFlowControl) { warnOnProducerFlowControl = false; - LOG.info("Usage Manager memory limit reached on " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + - " See http://activemq.apache.org/producer-flow-control.html for more info"); + LOG.info("Usage Manager Memory Limit reached on " + getActiveMQDestination().getQualifiedName() + + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - + if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." + - " See http://activemq.apache.org/producer-flow-control.html for more info"); + throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - + // We can avoid blocking due to low usage if the producer is sending // a sync message or // if it is using a producer window if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { // copy the exchange state since the context will be modified while we are waiting // for space. - final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); + final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); synchronized (messagesWaitingForSpace) { messagesWaitingForSpace.add(new Runnable() { public void run() { - + try { // While waiting for space to free up... the // message may have expired. @@ -436,7 +425,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } else { doMessageSend(producerExchangeCopy, message); } - + if (sendProducerAck) { ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack); @@ -445,7 +434,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { response.setCorrelationId(message.getCommandId()); context.getConnection().dispatchAsync(response); } - + } catch (Exception e) { if (!sendProducerAck && !context.isInRecoveryMode()) { ExceptionResponse response = new ExceptionResponse(e); @@ -455,7 +444,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } }); - + // If the user manager is not full, then the task will not // get called.. if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { @@ -465,18 +454,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { context.setDontSendReponse(true); return; } - + } else { - - // Producer flow control cannot be used, so we have do the flow - // control at the broker - // by blocking this thread until there is space available. - while (!memoryUsage.waitForSpace(1000)) { - if (context.getStopping().get()) { - throw new IOException("Connection closed, send aborted."); - } + + if (memoryUsage.isFull()) { + waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer (" + message.getProducerId() + ") stopped to prevent flooding " + + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - + // The usage manager could have delayed us by the time // we unblock the message could have expired.. if (message.isExpired()) { @@ -501,19 +486,15 @@ public class Queue extends BaseDestination implements Task, UsageListener { synchronized (sendLock) { if (store != null && message.isPersistent()) { if (systemUsage.getStoreUsage().isFull()) { - final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + - " See http://activemq.apache.org/producer-flow-control.html for more info"; - LOG.info(logMessage); + + String logMessage = "Usage Manager Store is Full. Producer (" + message.getProducerId() + ") stopped to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; + if (systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException(logMessage); } - } - while (!systemUsage.getStoreUsage().waitForSpace(1000)) { - if (context.getStopping().get()) { - throw new IOException( - "Connection closed, send aborted."); - } - LOG.debug(this + ", waiting for store space... msg: " + message); + + waitForSpace(context, systemUsage.getStoreUsage(), logMessage); } message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); store.addMessage(context, message); @@ -552,12 +533,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { sendMessage(context, message); } } - + private void expireMessages() { if (LOG.isDebugEnabled()) { LOG.debug("Expiring messages .."); } - + // just track the insertion count List browsedMessages = new AbstractList() { int size = 0; @@ -581,9 +562,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { asyncWakeup(); } - public void gc(){ + public void gc() { } - + public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { messageConsumed(context, node); if (store != null && node.isPersistent()) { @@ -616,8 +597,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { synchronized (messages) { size = messages.size(); } - return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size - + ", in flight groups=" + messageGroupOwners; + return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups=" + + messageGroupOwners; } public void start() throws Exception { @@ -632,25 +613,25 @@ public class Queue extends BaseDestination implements Task, UsageListener { doPageIn(false); } - public void stop() throws Exception{ + public void stop() throws Exception { if (taskRunner != null) { taskRunner.shutdown(); } if (this.executor != null) { this.executor.shutdownNow(); } - + scheduler.cancel(expireMessagesTask); - + if (messages != null) { messages.stop(); } - + systemUsage.getMemoryUsage().removeUsageListener(this); if (memoryUsage != null) { memoryUsage.stop(); } - if (store!=null) { + if (store != null) { store.stop(); } } @@ -661,7 +642,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { return destination; } - public MessageGroupMap getMessageGroupOwners() { if (messageGroupOwners == null) { messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap(); @@ -692,7 +672,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { public void setMessages(PendingMessageCursor messages) { this.messages = messages; } - + public boolean isUseConsumerPriority() { return useConsumerPriority; } @@ -708,7 +688,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { public void setStrictOrderDispatch(boolean strictOrderDispatch) { this.strictOrderDispatch = strictOrderDispatch; } - public boolean isOptimizedDispatch() { return optimizedDispatch; @@ -717,21 +696,22 @@ public class Queue extends BaseDestination implements Task, UsageListener { public void setOptimizedDispatch(boolean optimizedDispatch) { this.optimizedDispatch = optimizedDispatch; } - public int getTimeBeforeDispatchStarts() { - return timeBeforeDispatchStarts; - } - public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { - this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; - } + public int getTimeBeforeDispatchStarts() { + return timeBeforeDispatchStarts; + } - public int getConsumersBeforeDispatchStarts() { - return consumersBeforeDispatchStarts; - } + public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { + this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; + } - public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { - this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; - } + public int getConsumersBeforeDispatchStarts() { + return consumersBeforeDispatchStarts; + } + + public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { + this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; + } // Implementation methods // ------------------------------------------------------------------------- @@ -740,19 +720,18 @@ public class Queue extends BaseDestination implements Task, UsageListener { return result; } - public Message[] browse() { + public Message[] browse() { List l = new ArrayList(); doBrowse(l, getMaxBrowsePageSize()); return l.toArray(new Message[l.size()]); } - - + public void doBrowse(List l, int max) { final ConnectionContext connectionContext = createConnectionContext(); try { pageInMessages(false); List toExpire = new ArrayList(); - synchronized(dispatchMutex) { + synchronized (dispatchMutex) { synchronized (pagedInPendingDispatch) { addAll(pagedInPendingDispatch, l, max, toExpire); for (MessageReference ref : toExpire) { @@ -775,17 +754,16 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } } - + if (l.size() < getMaxBrowsePageSize()) { synchronized (messages) { try { messages.reset(); while (messages.hasNext() && l.size() < max) { - MessageReference node = messages.next(); + MessageReference node = messages.next(); if (node.isExpired()) { if (broker.isExpired(node)) { - messageExpired(connectionContext, - createMessageReference(node.getMessage())); + messageExpired(connectionContext, createMessageReference(node.getMessage())); } messages.remove(); } else { @@ -800,16 +778,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } } - } + } } catch (Exception e) { LOG.error("Problem retrieving message for browse", e); - } + } } - private void addAll(Collection refs, - List l, int maxBrowsePageSize, List toExpire) throws Exception { - for (Iterator i = refs.iterator(); i.hasNext() - && l.size() < getMaxBrowsePageSize();) { + private void addAll(Collection refs, List l, int maxBrowsePageSize, List toExpire) throws Exception { + for (Iterator i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) { QueueMessageReference ref = i.next(); if (ref.isExpired()) { toExpire.add(ref); @@ -843,8 +819,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { break; } } catch (IOException e) { - LOG.error("got an exception retrieving message " - + id); + LOG.error("got an exception retrieving message " + id); } } } finally { @@ -857,10 +832,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { return null; } - public void purge() throws Exception { + public void purge() throws Exception { ConnectionContext c = createConnectionContext(); List list = null; - do { + do { doPageIn(true); synchronized (pagedInMessages) { list = new ArrayList(pagedInMessages.values()); @@ -869,11 +844,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { for (MessageReference ref : list) { try { QueueMessageReference r = (QueueMessageReference) ref; - removeMessage(c,(IndirectMessageReference) r); + removeMessage(c, (IndirectMessageReference) r); } catch (IOException e) { } } - + } while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0); gc(); this.destinationStatistics.getMessages().setCount(0); @@ -921,15 +896,14 @@ public class Queue extends BaseDestination implements Task, UsageListener { synchronized (pagedInMessages) { set.addAll(pagedInMessages.values()); } - List list = new ArrayList(set); + List list = new ArrayList(set); for (MessageReference ref : list) { IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { removeMessage(context, r); set.remove(r); - if (++movedCounter >= maximumMessages - && maximumMessages > 0) { + if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; } } @@ -975,24 +949,23 @@ public class Queue extends BaseDestination implements Task, UsageListener { int count = 0; Set set = new CopyOnWriteArraySet(); do { - int oldMaxSize=getMaxPageSize(); + int oldMaxSize = getMaxPageSize(); setMaxPageSize((int) this.destinationStatistics.getMessages().getCount()); doPageIn(true); setMaxPageSize(oldMaxSize); synchronized (pagedInMessages) { set.addAll(pagedInMessages.values()); } - List list = new ArrayList(set); + List list = new ArrayList(set); for (MessageReference ref : list) { IndirectMessageReference r = (IndirectMessageReference) ref; if (filter.evaluate(context, r)) { - - r.incrementReferenceCount(); + + r.incrementReferenceCount(); try { Message m = r.getMessage(); BrokerSupport.resend(context, m, dest); - if (++movedCounter >= maximumMessages - && maximumMessages > 0) { + if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; } } finally { @@ -1004,15 +977,16 @@ public class Queue extends BaseDestination implements Task, UsageListener { } while (count < this.destinationStatistics.getMessages().getCount()); return movedCounter; } - + /** * Move a message + * * @param context connection context * @param m message * @param dest ActiveMQDestination * @throws Exception */ - public boolean moveMessageTo(ConnectionContext context,Message m,ActiveMQDestination dest) throws Exception { + public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception { QueueMessageReference r = createMessageReference(m); BrokerSupport.resend(context, m, dest); removeMessage(context, r); @@ -1035,7 +1009,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { * @return the number of messages removed */ public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception { - return moveMatchingMessagesTo(context, selector, dest,Integer.MAX_VALUE); + return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE); } /** @@ -1050,9 +1024,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { * Moves the messages matching the given filter up to the maximum number of * matched messages */ - public int moveMatchingMessagesTo(ConnectionContext context, - MessageReferenceFilter filter, ActiveMQDestination dest, - int maximumMessages) throws Exception { + public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int movedCounter = 0; Set set = new CopyOnWriteArraySet(); do { @@ -1067,20 +1039,18 @@ public class Queue extends BaseDestination implements Task, UsageListener { // We should only move messages that can be locked. moveMessageTo(context, ref.getMessage(), dest); set.remove(r); - if (++movedCounter >= maximumMessages - && maximumMessages > 0) { + if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; } } } - } while (set.size() < this.destinationStatistics.getMessages().getCount() - && set.size() < maximumMessages); + } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); return movedCounter; } - + BrowserDispatch getNextBrowserDispatch() { synchronized (pagedInMessages) { - if( browserDispatches.isEmpty() ) { + if (browserDispatches.isEmpty()) { return null; } return browserDispatches.removeFirst(); @@ -1093,93 +1063,93 @@ public class Queue extends BaseDestination implements Task, UsageListener { * @see org.apache.activemq.thread.Task#iterate() */ public boolean iterate() { - boolean pageInMoreMessages = false; - synchronized(iteratingMutex) { - + boolean pageInMoreMessages = false; + synchronized (iteratingMutex) { + // do early to allow dispatch of these waiting messages - synchronized(messagesWaitingForSpace) { + synchronized (messagesWaitingForSpace) { while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) { Runnable op = messagesWaitingForSpace.removeFirst(); op.run(); } } - + BrowserDispatch rd; - while ((rd = getNextBrowserDispatch()) != null) { - pageInMoreMessages = true; - - try { - MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); - msgContext.setDestination(destination); - - QueueBrowserSubscription browser = rd.getBrowser(); - for (QueueMessageReference node : rd.messages) { - if (!node.isAcked()) { - msgContext.setMessageReference(node); - if (browser.matches(node, msgContext)) { - browser.add(node); - } - } - } - + while ((rd = getNextBrowserDispatch()) != null) { + pageInMoreMessages = true; + + try { + MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); + msgContext.setDestination(destination); + + QueueBrowserSubscription browser = rd.getBrowser(); + for (QueueMessageReference node : rd.messages) { + if (!node.isAcked()) { + msgContext.setMessageReference(node); + if (browser.matches(node, msgContext)) { + browser.add(node); + } + } + } + rd.done(); - } catch (Exception e) { - LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e); - } - } - - if (firstConsumer) { - firstConsumer = false; - try { - if (consumersBeforeDispatchStarts > 0) { - int timeout = 1000; // wait one second by default if consumer count isn't reached - if (timeBeforeDispatchStarts > 0) { - timeout = timeBeforeDispatchStarts; - } - if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) { - if (LOG.isDebugEnabled()) { - LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch."); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch."); - } - } - } - if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) { - iteratingMutex.wait(timeBeforeDispatchStarts); - if (LOG.isDebugEnabled()) { - LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch."); - } - } - } catch (Exception e) { - LOG.error(e); - } - } - - synchronized (messages) { - pageInMoreMessages |= !messages.isEmpty(); - } - - // Kinda ugly.. but I think dispatchLock is the only mutex protecting the - // pagedInPendingDispatch variable. - synchronized(dispatchMutex) { - pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); - } - - // Perhaps we should page always into the pagedInPendingDispatch list if - // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() - // then we do a dispatch. - if (pageInMoreMessages) { - try { - pageInMessages(false); - - } catch (Throwable e) { - LOG.error("Failed to page in more queue messages ", e); + } catch (Exception e) { + LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e); } - } - return pendingWakeups.decrementAndGet() > 0; + } + + if (firstConsumer) { + firstConsumer = false; + try { + if (consumersBeforeDispatchStarts > 0) { + int timeout = 1000; // wait one second by default if consumer count isn't reached + if (timeBeforeDispatchStarts > 0) { + timeout = timeBeforeDispatchStarts; + } + if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) { + if (LOG.isDebugEnabled()) { + LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch."); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch."); + } + } + } + if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) { + iteratingMutex.wait(timeBeforeDispatchStarts); + if (LOG.isDebugEnabled()) { + LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch."); + } + } + } catch (Exception e) { + LOG.error(e); + } + } + + synchronized (messages) { + pageInMoreMessages |= !messages.isEmpty(); + } + + // Kinda ugly.. but I think dispatchLock is the only mutex protecting the + // pagedInPendingDispatch variable. + synchronized (dispatchMutex) { + pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); + } + + // Perhaps we should page always into the pagedInPendingDispatch list if + // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() + // then we do a dispatch. + if (pageInMoreMessages) { + try { + pageInMessages(false); + + } catch (Throwable e) { + LOG.error("Failed to page in more queue messages ", e); + } + } + return pendingWakeups.decrementAndGet() > 0; } } @@ -1188,8 +1158,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { public boolean evaluate(ConnectionContext context, MessageReference r) { return messageId.equals(r.getMessageId().toString()); } + public String toString() { - return "MessageIdFilter: "+messageId; + return "MessageIdFilter: " + messageId; } }; } @@ -1213,22 +1184,22 @@ public class Queue extends BaseDestination implements Task, UsageListener { protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException { removeMessage(c, null, r); - synchronized(dispatchMutex) { + synchronized (dispatchMutex) { synchronized (pagedInPendingDispatch) { pagedInPendingDispatch.remove(r); } } } - - protected void removeMessage(ConnectionContext c, Subscription subs,QueueMessageReference r) throws IOException { + + protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException { MessageAck ack = new MessageAck(); ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setDestination(destination); ack.setMessageID(r.getMessageId()); removeMessage(c, subs, r, ack); } - - protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException { + + protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, MessageAck ack) throws IOException { reference.setAcked(true); // This sends the ack the the journal.. if (!ack.isInTransaction()) { @@ -1240,13 +1211,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { acknowledge(context, sub, ack, reference); } finally { context.getTransaction().addSynchronization(new Synchronization() { - + public void afterCommit() throws Exception { getDestinationStatistics().getDequeues().increment(); dropMessage(reference); wakeup(); } - + public void afterRollback() throws Exception { reference.setAcked(false); } @@ -1255,38 +1226,38 @@ public class Queue extends BaseDestination implements Task, UsageListener { } if (ack.isPoisonAck()) { // message gone to DLQ, is ok to allow redelivery - synchronized(messages) { + synchronized (messages) { messages.rollback(reference.getMessageId()); } } } - + private void dropMessage(QueueMessageReference reference) { reference.drop(); destinationStatistics.getMessages().decrement(); - synchronized(pagedInMessages) { + synchronized (pagedInMessages) { pagedInMessages.remove(reference.getMessageId()); } } - - public void messageExpired(ConnectionContext context,MessageReference reference) { - messageExpired(context,null,reference); + + public void messageExpired(ConnectionContext context, MessageReference reference) { + messageExpired(context, null, reference); } - - public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) { + + public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { if (LOG.isDebugEnabled()) { LOG.debug("message expired: " + reference); } broker.messageExpired(context, reference); destinationStatistics.getExpired().increment(); try { - removeMessage(context,subs,(QueueMessageReference)reference); + removeMessage(context, subs, (QueueMessageReference) reference); } catch (IOException e) { - LOG.error("Failed to remove expired Message from the store ",e); + LOG.error("Failed to remove expired Message from the store ", e); } } - + protected ConnectionContext createConnectionContext() { ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); answer.setBroker(this.broker); @@ -1296,17 +1267,18 @@ public class Queue extends BaseDestination implements Task, UsageListener { final void sendMessage(final ConnectionContext context, Message msg) throws Exception { if (!msg.isPersistent() && messages.getSystemUsage() != null) { - if (systemUsage.getTempUsage().isFull()) { - final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + - " See http://activemq.apache.org/producer-flow-control.html for more info"; - LOG.info(logMessage); + if (systemUsage.getTempUsage().isFull()) { + final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; if (systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException(logMessage); } + + waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage); } - messages.getSystemUsage().getTempUsage().waitForSpace(); + } - synchronized(messages) { + synchronized (messages) { messages.addMessageLast(msg); } destinationStatistics.getEnqueues().increment(); @@ -1319,7 +1291,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } wakeup(); } - + public void wakeup() { if (optimizedDispatch || isSlave()) { iterate(); @@ -1332,12 +1304,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { private void asyncWakeup() { try { pendingWakeups.incrementAndGet(); - this.taskRunner.wakeup(); + this.taskRunner.wakeup(); } catch (InterruptedException e) { LOG.warn("Async task tunner failed to wakeup ", e); } } - + private boolean isSlave() { return broker.getBrokerService().isSlave(); } @@ -1345,14 +1317,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { private List doPageIn(boolean force) throws Exception { List result = null; List resultList = null; - synchronized(dispatchMutex) { + synchronized (dispatchMutex) { int toPageIn = Math.min(getMaxPageSize(), messages.size()); if (LOG.isDebugEnabled()) { - LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " - + destinationStatistics.getInflight().getCount() - + ", pagedInMessages.size " + pagedInMessages.size()); + LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size " + + pagedInMessages.size()); } - + if (isLazyDispatch() && !force) { // Only page in the minimum number of messages which can be dispatched immediately. toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); @@ -1376,7 +1347,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } else { result.add(ref); count++; - } + } } } finally { messages.release(); @@ -1385,7 +1356,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // Only add new messages, not already pagedIn to avoid multiple dispatch attempts synchronized (pagedInMessages) { resultList = new ArrayList(result.size()); - for(QueueMessageReference ref : result) { + for (QueueMessageReference ref : result) { if (!pagedInMessages.containsKey(ref.getMessageId())) { pagedInMessages.put(ref.getMessageId(), ref); resultList.add(ref); @@ -1402,8 +1373,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { private void doDispatch(List list) throws Exception { boolean doWakeUp = false; - synchronized(dispatchMutex) { - + synchronized (dispatchMutex) { + synchronized (pagedInPendingDispatch) { if (!pagedInPendingDispatch.isEmpty()) { // Try to first dispatch anything that had not been @@ -1425,20 +1396,20 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } } - } + } if (doWakeUp) { // avoid lock order contention asyncWakeup(); } } - + /** * @return list of messages that could get dispatched to consumers if they * were not full. */ private List doActualDispatch(List list) throws Exception { List consumers; - + synchronized (this.consumers) { if (this.consumers.isEmpty() || isSlave()) { // slave dispatch happens in processDispatchNotification @@ -1449,15 +1420,15 @@ public class Queue extends BaseDestination implements Task, UsageListener { List rc = new ArrayList(list.size()); Set fullConsumers = new HashSet(this.consumers.size()); - + for (MessageReference node : list) { Subscription target = null; - int interestCount=0; + int interestCount = 0; for (Subscription s : consumers) { - if (s instanceof QueueBrowserSubscription) { - interestCount++; - continue; - } + if (s instanceof QueueBrowserSubscription) { + interestCount++; + continue; + } if (dispatchSelector.canSelect(s, node)) { if (!fullConsumers.contains(s)) { if (!s.isFull()) { @@ -1472,23 +1443,22 @@ public class Queue extends BaseDestination implements Task, UsageListener { } interestCount++; } else { - // makes sure it gets dispatched again - if (!node.isDropped() && !((QueueMessageReference)node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) { - interestCount++; - } + // makes sure it gets dispatched again + if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) { + interestCount++; + } } } - - if ((target == null && interestCount>0) || consumers.size() == 0) { + + if ((target == null && interestCount > 0) || consumers.size() == 0) { // This means all subs were full or that there are no consumers... - rc.add((QueueMessageReference)node); + rc.add((QueueMessageReference) node); } // If it got dispatched, rotate the consumer list to get round robin distribution. - if (target != null && !strictOrderDispatch && consumers.size() > 1 && - !dispatchSelector.isExclusiveConsumer(target)) { + if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) { synchronized (this.consumers) { - if( removeFromConsumerList(target) ) { + if (removeFromConsumerList(target)) { addToConsumerList(target); consumers = new ArrayList(this.consumers); } @@ -1496,15 +1466,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - return rc; } - protected void pageInMessages(boolean force) throws Exception { - doDispatch(doPageIn(force)); + doDispatch(doPageIn(force)); } - + private void addToConsumerList(Subscription sub) { if (useConsumerPriority) { consumers.add(sub); @@ -1513,42 +1481,43 @@ public class Queue extends BaseDestination implements Task, UsageListener { consumers.add(sub); } } - + private boolean removeFromConsumerList(Subscription sub) { return consumers.remove(sub); } - + private int getConsumerMessageCountBeforeFull() throws Exception { int total = 0; boolean zeroPrefetch = false; synchronized (consumers) { for (Subscription s : consumers) { - zeroPrefetch |= s.getPrefetchSize() == 0; - int countBeforeFull = s.countBeforeFull(); + zeroPrefetch |= s.getPrefetchSize() == 0; + int countBeforeFull = s.countBeforeFull(); total += countBeforeFull; } } - if (total==0 && zeroPrefetch){ - total=1; + if (total == 0 && zeroPrefetch) { + total = 1; } return total; } - /* - * In slave mode, dispatch is ignored till we get this notification as the dispatch - * process is non deterministic between master and slave. - * On a notification, the actual dispatch to the subscription (as chosen by the master) - * is completed. - * (non-Javadoc) - * @see org.apache.activemq.broker.region.BaseDestination#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification) + /* + * In slave mode, dispatch is ignored till we get this notification as the + * dispatch process is non deterministic between master and slave. On a + * notification, the actual dispatch to the subscription (as chosen by the + * master) is completed. (non-Javadoc) + * + * @see + * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification + * (org.apache.activemq.command.MessageDispatchNotification) */ - public void processDispatchNotification( - MessageDispatchNotification messageDispatchNotification) throws Exception { + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { // do dispatch Subscription sub = getMatchingSubscription(messageDispatchNotification); if (sub != null) { MessageReference message = getMatchingMessage(messageDispatchNotification); - sub.add(message); + sub.add(message); sub.processMessageDispatchNotification(messageDispatchNotification); } } @@ -1556,25 +1525,25 @@ public class Queue extends BaseDestination implements Task, UsageListener { private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception { QueueMessageReference message = null; MessageId messageId = messageDispatchNotification.getMessageId(); - - synchronized(dispatchMutex) { + + synchronized (dispatchMutex) { synchronized (pagedInPendingDispatch) { - for(QueueMessageReference ref : pagedInPendingDispatch) { - if (messageId.equals(ref.getMessageId())) { - message = ref; - pagedInPendingDispatch.remove(ref); - break; - } - } + for (QueueMessageReference ref : pagedInPendingDispatch) { + if (messageId.equals(ref.getMessageId())) { + message = ref; + pagedInPendingDispatch.remove(ref); + break; + } + } } - + if (message == null) { synchronized (pagedInMessages) { message = pagedInMessages.get(messageId); } } - - if (message == null) { + + if (message == null) { synchronized (messages) { try { messages.setMaxBatchSize(getMaxPageSize()); @@ -1593,28 +1562,25 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } } - + if (message == null) { Message msg = loadMessage(messageId); if (msg != null) { message = this.createMessageReference(msg); } - } - - } + } + + } if (message == null) { - throw new JMSException( - "Slave broker out of sync with master - Message: " - + messageDispatchNotification.getMessageId() - + " on " + messageDispatchNotification.getDestination() - + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " - + messageDispatchNotification.getConsumerId()); + throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination() + + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " + messageDispatchNotification.getConsumerId()); } return message; } /** * Find a consumer that matches the id in the message dispatch notification + * * @param messageDispatchNotification * @return sub or null if the subscription has been removed before dispatch * @throws JMSException @@ -1637,4 +1603,20 @@ public class Queue extends BaseDestination implements Task, UsageListener { asyncWakeup(); } } + + private final void waitForSpace(ConnectionContext context, Usage usage, String warning) throws IOException, InterruptedException { + long start = System.currentTimeMillis(); + long nextWarn = start + blockedProducerWarningInterval; + while (!usage.waitForSpace(1000)) { + if (context.getStopping().get()) { + throw new IOException("Connection closed, send aborted."); + } + + long now = System.currentTimeMillis(); + if (now >= nextWarn) { + LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); + nextWarn = now + blockedProducerWarningInterval; + } + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index d24bffc5c3..52e669bb4d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -41,6 +41,7 @@ import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.Valve; import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.usage.Usage; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -59,11 +60,11 @@ import java.util.concurrent.CopyOnWriteArraySet; * * @version $Revision: 1.21 $ */ -public class Topic extends BaseDestination implements Task{ +public class Topic extends BaseDestination implements Task { protected static final Log LOG = LogFactory.getLog(Topic.class); private final TopicMessageStore topicStore; protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); - protected final Valve dispatchValve = new Valve(true); + protected final Valve dispatchValve = new Valve(true); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap(); @@ -71,24 +72,22 @@ public class Topic extends BaseDestination implements Task{ private final LinkedList messagesWaitingForSpace = new LinkedList(); private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { - try { - Topic.this.taskRunner.wakeup(); - } catch (InterruptedException e) { - } + try { + Topic.this.taskRunner.wakeup(); + } catch (InterruptedException e) { + } }; }; - - public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, - TaskRunnerFactory taskFactory) throws Exception { + public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { super(brokerService, store, destination, parentStats); - this.topicStore=store; + this.topicStore = store; //set default subscription recovery policy - subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy(); + subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy(); this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); } - - public void initialize() throws Exception{ + + public void initialize() throws Exception { super.initialize(); if (store != null) { int messageCount = store.getMessageCount(); @@ -140,7 +139,7 @@ public class Topic extends BaseDestination implements Task{ } } else { sub.add(context, this); - DurableTopicSubscription dsub = (DurableTopicSubscription)sub; + DurableTopicSubscription dsub = (DurableTopicSubscription) sub; durableSubcribers.put(dsub.getSubscriptionKey(), dsub); } } @@ -171,7 +170,7 @@ public class Topic extends BaseDestination implements Task{ // we are recovering a subscription to avoid out of order messages. dispatchValve.turnOff(); try { - + if (topicStore == null) { return; } @@ -195,21 +194,20 @@ public class Topic extends BaseDestination implements Task{ } } // Do we need to create the subscription? - if(info==null){ - info=new SubscriptionInfo(); + if (info == null) { + info = new SubscriptionInfo(); info.setClientId(clientId); info.setSelector(selector); info.setSubscriptionName(subscriptionName); - info.setDestination(getActiveMQDestination()); + info.setDestination(getActiveMQDestination()); // This destination is an actual destination id. - info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); + info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This destination might be a pattern synchronized (consumers) { consumers.add(subscription); - topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive()); + topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive()); } } - final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(destination); @@ -223,7 +221,7 @@ public class Topic extends BaseDestination implements Task{ subscription.add(message); } } catch (IOException e) { - LOG.error("Failed to recover this message " + message); + LOG.error("Failed to recover this message " + message); } return true; } @@ -235,7 +233,7 @@ public class Topic extends BaseDestination implements Task{ public boolean hasSpace() { return true; } - + public boolean isDuplicate(MessageId id) { return false; } @@ -277,23 +275,24 @@ public class Topic extends BaseDestination implements Task{ return; } - if(memoryUsage.isFull()) { + if (memoryUsage.isFull()) { isFull(context, memoryUsage); fastProducer(context, producerInfo); - + if (isProducerFlowControl() && context.isProducerFlowControl()) { - - if(warnOnProducerFlowControl) { + + if (warnOnProducerFlowControl) { warnOnProducerFlowControl = false; - LOG.info("Usage Manager memory limit reached for " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + - " See http://activemq.apache.org/producer-flow-control.html for more info"); + LOG.info("Usage Manager memory limit reached for " + getActiveMQDestination().getQualifiedName() + + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - + if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." + - " See http://activemq.apache.org/producer-flow-control.html for more info"); + throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - + // We can avoid blocking due to low usage if the producer is sending // a sync message or // if it is using a producer window @@ -301,9 +300,9 @@ public class Topic extends BaseDestination implements Task{ synchronized (messagesWaitingForSpace) { messagesWaitingForSpace.add(new Runnable() { public void run() { - + try { - + // While waiting for space to free up... the // message may have expired. if (message.isExpired()) { @@ -312,7 +311,7 @@ public class Topic extends BaseDestination implements Task{ } else { doMessageSend(producerExchange, message); } - + if (sendProducerAck) { ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack); @@ -321,7 +320,7 @@ public class Topic extends BaseDestination implements Task{ response.setCorrelationId(message.getCommandId()); context.getConnection().dispatchAsync(response); } - + } catch (Exception e) { if (!sendProducerAck && !context.isInRecoveryMode()) { ExceptionResponse response = new ExceptionResponse(e); @@ -329,10 +328,10 @@ public class Topic extends BaseDestination implements Task{ context.getConnection().dispatchAsync(response); } } - + } }); - + // If the user manager is not full, then the task will not // get called.. if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { @@ -342,24 +341,32 @@ public class Topic extends BaseDestination implements Task{ context.setDontSendReponse(true); return; } - + } else { - // Producer flow control cannot be used, so we have do the flow // control at the broker // by blocking this thread until there is space available. - int count = 0; - while (!memoryUsage.waitForSpace(1000)) { - if (context.getStopping().get()) { - throw new IOException("Connection closed, send aborted."); - } - if (count > 2 && context.isInTransaction()) { - count =0; - int size = context.getTransaction().size(); - LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message); + + if (memoryUsage.isFull()) { + if (context.isInTransaction()) { + + int count = 0; + while (!memoryUsage.waitForSpace(1000)) { + if (context.getStopping().get()) { + throw new IOException("Connection closed, send aborted."); + } + if (count > 2 && context.isInTransaction()) { + count = 0; + int size = context.getTransaction().size(); + LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message); + } + } + } else { + waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } } - + // The usage manager could have delayed us by the time // we unblock the message could have expired.. if (message.isExpired()) { @@ -382,35 +389,28 @@ public class Topic extends BaseDestination implements Task{ } /** - * do send the message - this needs to be synchronized to ensure messages are stored AND dispatched in - * the right order + * do send the message - this needs to be synchronized to ensure messages + * are stored AND dispatched in the right order + * * @param producerExchange * @param message * @throws IOException * @throws Exception */ - synchronized void doMessageSend( - final ProducerBrokerExchange producerExchange, final Message message) - throws IOException, Exception { - final ConnectionContext context = producerExchange - .getConnectionContext(); + synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { + final ConnectionContext context = producerExchange.getConnectionContext(); message.setRegionDestination(this); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); - if (topicStore != null && message.isPersistent() - && !canOptimizeOutPersistence()) { + if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { if (systemUsage.getStoreUsage().isFull()) { - final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + - " See http://activemq.apache.org/producer-flow-control.html for more info"; - LOG.info(logMessage); + final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"; if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException(logMessage); - } - } - while (!systemUsage.getStoreUsage().waitForSpace(1000)) { - if (context.getStopping().get()) { - throw new IOException("Connection closed, send aborted."); + throw new javax.jms.ResourceAllocationException(logMessage); } + + waitForSpace(context, systemUsage.getStoreUsage(), logMessage); } topicStore.addMessage(context, message); } @@ -457,14 +457,13 @@ public class Topic extends BaseDestination implements Task{ public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException { if (topicStore != null && node.isPersistent()) { - DurableTopicSubscription dsub = (DurableTopicSubscription)sub; + DurableTopicSubscription dsub = (DurableTopicSubscription) sub; SubscriptionKey key = dsub.getSubscriptionKey(); topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId()); } messageConsumed(context, node); } - public void gc() { } @@ -488,7 +487,7 @@ public class Topic extends BaseDestination implements Task{ if (memoryUsage != null) { memoryUsage.stop(); } - if(this.topicStore != null) { + if (this.topicStore != null) { this.topicStore.stop(); } } @@ -510,7 +509,7 @@ public class Topic extends BaseDestination implements Task{ public boolean hasSpace() { return true; } - + public boolean isDuplicate(MessageId id) { return false; } @@ -527,9 +526,9 @@ public class Topic extends BaseDestination implements Task{ } return result.toArray(new Message[result.size()]); } - + public boolean iterate() { - synchronized(messagesWaitingForSpace) { + synchronized (messagesWaitingForSpace) { while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { Runnable op = messagesWaitingForSpace.removeFirst(); op.run(); @@ -538,12 +537,9 @@ public class Topic extends BaseDestination implements Task{ return false; } - // Properties // ------------------------------------------------------------------------- - - public DispatchPolicy getDispatchPolicy() { return dispatchPolicy; } @@ -560,17 +556,16 @@ public class Topic extends BaseDestination implements Task{ this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; } - // Implementation methods // ------------------------------------------------------------------------- - + public final void wakeup() { } - + protected void dispatch(final ConnectionContext context, Message message) throws Exception { destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); - dispatchValve.increment(); + dispatchValve.increment(); MessageEvaluationContext msgContext = null; try { if (!subscriptionRecoveryPolicy.add(context, message)) { @@ -587,17 +582,17 @@ public class Topic extends BaseDestination implements Task{ msgContext.setMessageReference(message); if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { onMessageWithNoConsumers(context, message); - } - + } + } finally { dispatchValve.decrement(); - if(msgContext != null) { + if (msgContext != null) { msgContext.clear(); } } } - - public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) { + + public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { broker.messageExpired(context, reference); destinationStatistics.getMessages().decrement(); destinationStatistics.getEnqueues().decrement(); @@ -609,9 +604,24 @@ public class Topic extends BaseDestination implements Task{ try { acknowledge(context, subs, ack, reference); } catch (IOException e) { - LOG.error("Failed to remove expired Message from the store ",e); + LOG.error("Failed to remove expired Message from the store ", e); } } + private final void waitForSpace(ConnectionContext context, Usage usage, String warning) throws IOException, InterruptedException { + long start = System.currentTimeMillis(); + long nextWarn = start + blockedProducerWarningInterval; + while (!usage.waitForSpace(1000)) { + if (context.getStopping().get()) { + throw new IOException("Connection closed, send aborted."); + } + + long now = System.currentTimeMillis(); + if (now >= nextWarn) { + LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); + nextWarn = now + blockedProducerWarningInterval; + } + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index d7029e4083..651954f0da 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -60,6 +60,7 @@ public class PolicyEntry extends DestinationMapEntry { private int maxQueueAuditDepth=2048; private boolean enableAudit=true; private boolean producerFlowControl = true; + private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; private boolean optimizedDispatch=false; private int maxPageSize=BaseDestination.MAX_PAGE_SIZE; private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE; @@ -125,6 +126,7 @@ public class PolicyEntry extends DestinationMapEntry { public void baseConfiguration(BaseDestination destination) { destination.setProducerFlowControl(isProducerFlowControl()); + destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); destination.setEnableAudit(isEnableAudit()); destination.setMaxAuditDepth(getMaxQueueAuditDepth()); destination.setMaxProducersToAudit(getMaxProducersToAudit()); @@ -372,6 +374,27 @@ public class PolicyEntry extends DestinationMapEntry { this.producerFlowControl = producerFlowControl; } + /** + * Set's the interval at which warnings about producers being blocked by + * resource usage will be triggered. Values of 0 or less will disable + * warnings + * + * @param blockedProducerWarningInterval the interval at which warning about + * blocked producers will be triggered. + */ + public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { + this.blockedProducerWarningInterval = blockedProducerWarningInterval; + } + + /** + * + * @return the interval at which warning about blocked producers will be + * triggered. + */ + public long getBlockedProducerWarningInterval() { + return blockedProducerWarningInterval; + } + /** * @return the maxProducersToAudit */