Adding a blockedProducerWarningInterval attribute to destinations to control the rate at which warnings about blocked producers are generated (otherwise the warnings can flood the log).

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@884234 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin W Macnaughton 2009-11-25 19:35:09 +00:00
parent 83df5cef54
commit 48764becbd
8 changed files with 718 additions and 628 deletions

View File

@ -82,11 +82,11 @@ public class DestinationView implements DestinationViewMBean {
public long getDispatchCount() {
return destination.getDestinationStatistics().getDispatched().getCount();
}
public long getInFlightCount() {
return destination.getDestinationStatistics().getInflight().getCount();
}
public long getExpiredCount() {
return destination.getDestinationStatistics().getExpired().getCount();
}
@ -220,7 +220,7 @@ public class DestinationView implements DestinationViewMBean {
OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
Message[] messages = destination.browse();
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
TabularDataSupport rc = new TabularDataSupport(tt);
MessageEvaluationContext ctx = new MessageEvaluationContext();
@ -248,16 +248,16 @@ public class DestinationView implements DestinationViewMBean {
public String sendTextMessage(String body) throws Exception {
return sendTextMessage(Collections.EMPTY_MAP, body);
}
public String sendTextMessage(Map headers, String body) throws Exception {
return sendTextMessage(headers,body,null,null);
return sendTextMessage(headers, body, null, null);
}
public String sendTextMessage(String body, String user, String password) throws Exception {
return sendTextMessage(Collections.EMPTY_MAP,body,user,password);
return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
}
public String sendTextMessage(Map headers, String body,String userName,String password) throws Exception {
public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
String brokerUrl = "vm://" + broker.getBrokerName();
ActiveMQDestination dest = destination.getActiveMQDestination();
@ -266,14 +266,14 @@ public class DestinationView implements DestinationViewMBean {
Connection connection = null;
try {
connection = cf.createConnection(userName,password);
connection = cf.createConnection(userName, password);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage msg = (ActiveMQTextMessage)session.createTextMessage(body);
ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry)iter.next();
msg.setObjectProperty((String)entry.getKey(), entry.getValue());
Map.Entry entry = (Map.Entry) iter.next();
msg.setObjectProperty((String) entry.getKey(), entry.getValue());
}
producer.setDeliveryMode(msg.getJMSDeliveryMode());
@ -292,30 +292,28 @@ public class DestinationView implements DestinationViewMBean {
public int getMaxAuditDepth() {
return destination.getMaxAuditDepth();
}
}
public int getMaxProducersToAudit() {
return destination.getMaxProducersToAudit();
}
public int getMaxProducersToAudit() {
return destination.getMaxProducersToAudit();
}
public boolean isEnableAudit() {
return destination.isEnableAudit();
}
public boolean isEnableAudit() {
return destination.isEnableAudit();
}
public void setEnableAudit(boolean enableAudit) {
destination.setEnableAudit(enableAudit);
}
public void setEnableAudit(boolean enableAudit) {
destination.setEnableAudit(enableAudit);
}
public void setMaxAuditDepth(int maxAuditDepth) {
destination.setMaxAuditDepth(maxAuditDepth);
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
destination.setMaxProducersToAudit(maxProducersToAudit);
}
public void setMaxAuditDepth(int maxAuditDepth) {
destination.setMaxAuditDepth(maxAuditDepth);
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
destination.setMaxProducersToAudit(maxProducersToAudit);
}
public float getMemoryUsagePortion() {
return destination.getMemoryUsage().getUsagePortion();
}
@ -325,31 +323,52 @@ public class DestinationView implements DestinationViewMBean {
}
public boolean isProducerFlowControl() {
return destination.isProducerFlowControl();
return destination.isProducerFlowControl();
}
public void setMemoryUsagePortion(float value) {
destination.getMemoryUsage().setUsagePortion(value);
}
public void setProducerFlowControl(boolean producerFlowControl) {
destination.setProducerFlowControl(producerFlowControl);
destination.setProducerFlowControl(producerFlowControl);
}
/**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable
* warnings
*
* @param blockedProducerWarningInterval the interval at which warning about
* blocked producers will be triggered.
*/
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
}
/**
*
* @return the interval at which warning about blocked producers will be
* triggered.
*/
public long getBlockedProducerWarningInterval() {
return destination.getBlockedProducerWarningInterval();
}
public int getMaxPageSize() {
return destination.getMaxPageSize();
}
public void setMaxPageSize(int pageSize) {
destination.setMaxPageSize(pageSize);
}
public boolean isUseCache() {
return destination.isUseCache();
}
public void setUseCache(boolean value) {
destination.setUseCache(value);
destination.setUseCache(value);
}
public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {

View File

@ -248,11 +248,30 @@ public interface DestinationViewMBean {
*/
@MBeanInfo("Producers are flow controlled")
boolean isProducerFlowControl();
/**
* @param producerFlowControl the producerFlowControl to set
*/
public void setProducerFlowControl(@MBeanInfo("producerFlowControl") boolean producerFlowControl);
/**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable
* warnings
*
* @param blockedProducerWarningInterval the interval at which warning about
* blocked producers will be triggered.
*/
public void setBlockedProducerWarningInterval(@MBeanInfo("blockedProducerWarningInterval") long blockedProducerWarningInterval);
/**
*
* @return the interval at which warning about blocked producers will be
* triggered.
*/
@MBeanInfo("Blocked Producer Warning Interval")
public long getBlockedProducerWarningInterval();
/**
* @return the maxProducersToAudit
*/

View File

@ -40,18 +40,21 @@ import org.apache.activemq.usage.Usage;
*/
public abstract class BaseDestination implements Destination {
/**
* The maximum number of messages to page in to the destination from persistent storage
* The maximum number of messages to page in to the destination from
* persistent storage
*/
public static final int MAX_PAGE_SIZE = 200;
public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
public static final long EXPIRE_MESSAGE_PERIOD = 30*1000;
public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
protected boolean warnOnProducerFlowControl = true;
protected boolean warnOnProducerFlowControl = true;
protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
private int maxProducersToAudit = 1024;
private int maxAuditDepth = 2048;
private boolean enableAudit = true;
@ -82,8 +85,7 @@ public abstract class BaseDestination implements Destination {
* @param parentStats
* @throws Exception
*/
public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination,
DestinationStatistics parentStats) throws Exception {
public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
this.brokerService = brokerService;
this.broker = brokerService.getBroker();
this.store = store;
@ -118,13 +120,33 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param producerFlowControl
* the producerFlowControl to set
* @param producerFlowControl the producerFlowControl to set
*/
public void setProducerFlowControl(boolean producerFlowControl) {
this.producerFlowControl = producerFlowControl;
}
/**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable
* warnings
*
* @param blockedProducerWarningInterval the interval at which warning about
* blocked producers will be triggered.
*/
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
this.blockedProducerWarningInterval = blockedProducerWarningInterval;
}
/**
*
* @return the interval at which warning about blocked producers will be
* triggered.
*/
public long getBlockedProducerWarningInterval() {
return blockedProducerWarningInterval;
}
/**
* @return the maxProducersToAudit
*/
@ -133,8 +155,7 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param maxProducersToAudit
* the maxProducersToAudit to set
* @param maxProducersToAudit the maxProducersToAudit to set
*/
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
@ -148,8 +169,7 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param maxAuditDepth
* the maxAuditDepth to set
* @param maxAuditDepth the maxAuditDepth to set
*/
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
@ -163,8 +183,7 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param enableAudit
* the enableAudit to set
* @param enableAudit the enableAudit to set
*/
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
@ -199,8 +218,7 @@ public abstract class BaseDestination implements Destination {
}
public final boolean isActive() {
return destinationStatistics.getConsumers().getCount() != 0
|| destinationStatistics.getProducers().getCount() != 0;
return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
}
public int getMaxPageSize() {
@ -218,13 +236,13 @@ public abstract class BaseDestination implements Destination {
public void setMaxBrowsePageSize(int maxPageSize) {
this.maxBrowsePageSize = maxPageSize;
}
public int getMaxExpirePageSize() {
return this.maxExpirePageSize;
}
public void setMaxExpirePageSize(int maxPageSize) {
this.maxExpirePageSize = maxPageSize;
this.maxExpirePageSize = maxPageSize;
}
public void setExpireMessagesPeriod(long expireMessagesPeriod) {
@ -234,7 +252,7 @@ public abstract class BaseDestination implements Destination {
public long getExpireMessagesPeriod() {
return expireMessagesPeriod;
}
public boolean isUseCache() {
return useCache;
}
@ -271,8 +289,7 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryForSlowConsumers
* the advisoryForSlowConsumers to set
* @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
*/
public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
this.advisoryForSlowConsumers = advisoryForSlowConsumers;
@ -286,8 +303,8 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryForDiscardingMessages
* the advisoryForDiscardingMessages to set
* @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
* set
*/
public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
@ -301,8 +318,7 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryWhenFull
* the advisoryWhenFull to set
* @param advisoryWhenFull the advisoryWhenFull to set
*/
public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
this.advisoryWhenFull = advisoryWhenFull;
@ -316,8 +332,7 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryForDelivery
* the advisoryForDelivery to set
* @param advisoryForDelivery the advisoryForDelivery to set
*/
public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
this.advisoryForDelivery = advisoryForDelivery;
@ -331,8 +346,7 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryForConsumed
* the advisoryForConsumed to set
* @param advisoryForConsumed the advisoryForConsumed to set
*/
public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
this.advisoryForConsumed = advisoryForConsumed;
@ -346,13 +360,12 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisdoryForFastProducers
* the advisdoryForFastProducers to set
* @param advisdoryForFastProducers the advisdoryForFastProducers to set
*/
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
public boolean isSendAdvisoryIfNoConsumers() {
return sendAdvisoryIfNoConsumers;
}
@ -376,14 +389,14 @@ public abstract class BaseDestination implements Destination {
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
public int getCursorMemoryHighWaterMark() {
return this.cursorMemoryHighWaterMark;
}
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
}
public int getCursorMemoryHighWaterMark() {
return this.cursorMemoryHighWaterMark;
}
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
}
/**
* called when message is consumed
@ -410,8 +423,8 @@ public abstract class BaseDestination implements Destination {
}
/**
* Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled -
* e.g. non durable topics
* Called when a message is discarded - e.g. running low on memory This will
* happen only if the policy is enabled - e.g. non durable topics
*
* @param context
* @param messageReference
@ -460,19 +473,19 @@ public abstract class BaseDestination implements Destination {
public void dispose(ConnectionContext context) throws IOException {
if (this.store != null) {
this.store.removeAllMessages(context);
this.store.removeAllMessages(context);
this.store.dispose(context);
}
this.destinationStatistics.setParent(null);
this.memoryUsage.stop();
}
/**
* Provides a hook to allow messages with no consumer to be processed in
* some way - such as to send to a dead letter queue or something..
*/
protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
if (!msg.isPersistent()) {
if (!msg.isPersistent()) {
if (isSendAdvisoryIfNoConsumers()) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
@ -489,12 +502,12 @@ public abstract class BaseDestination implements Destination {
if (message.getOriginalTransactionId() != null) {
message.setOriginalTransactionId(message.getTransactionId());
}
ActiveMQTopic advisoryTopic;
if (destination.isQueue()) {
advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
} else {
advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
}
message.setDestination(advisoryTopic);
message.setTransactionId(null);
@ -517,8 +530,9 @@ public abstract class BaseDestination implements Destination {
}
}
}
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws Exception {
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
}
}

View File

@ -41,10 +41,12 @@ import org.apache.activemq.usage.Usage;
public interface Destination extends Service, Task {
public static final DeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new SharedDeadLetterStrategy();
public static final long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000;
void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception;
void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
@ -70,122 +72,146 @@ public interface Destination extends Service, Task {
String getName();
MessageStore getMessageStore();
boolean isProducerFlowControl();
void setProducerFlowControl(boolean value);
/**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable
* warnings
*
* @param blockedProducerWarningInterval the interval at which warning about
* blocked producers will be triggered.
*/
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval);
/**
*
* @return the interval at which warning about blocked producers will be
* triggered.
*/
public long getBlockedProducerWarningInterval();
int getMaxProducersToAudit();
void setMaxProducersToAudit(int maxProducersToAudit);
int getMaxAuditDepth();
void setMaxAuditDepth(int maxAuditDepth);
boolean isEnableAudit();
void setEnableAudit(boolean enableAudit);
boolean isActive();
boolean isActive();
int getMaxPageSize();
public void setMaxPageSize(int maxPageSize);
public int getMaxBrowsePageSize();
public void setMaxBrowsePageSize(int maxPageSize);
public boolean isUseCache();
public void setUseCache(boolean useCache);
public int getMinimumMessageSize();
public void setMinimumMessageSize(int minimumMessageSize);
public int getCursorMemoryHighWaterMark();
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
/**
* optionally called by a Subscriber - to inform the Destination its
* ready for more messages
* optionally called by a Subscriber - to inform the Destination its ready
* for more messages
*/
public void wakeup();
/**
* @return true if lazyDispatch is enabled
*/
public boolean isLazyDispatch();
/**
* set the lazy dispatch - default is false
*
* @param value
*/
public void setLazyDispatch(boolean value);
/**
* Inform the Destination a message has expired
*
* @param context
* @param subs
* @param subs
* @param node
*/
void messageExpired(ConnectionContext context, Subscription subs,MessageReference node);
void messageExpired(ConnectionContext context, Subscription subs, MessageReference node);
/**
* called when message is consumed
*
* @param context
* @param messageReference
*/
void messageConsumed(ConnectionContext context, MessageReference messageReference);
void messageConsumed(ConnectionContext context, MessageReference messageReference);
/**
* Called when message is delivered to the broker
*
* @param context
* @param messageReference
*/
void messageDelivered(ConnectionContext context, MessageReference messageReference);
void messageDelivered(ConnectionContext context, MessageReference messageReference);
/**
* Called when a message is discarded - e.g. running low on memory
* This will happen only if the policy is enabled - e.g. non durable topics
* Called when a message is discarded - e.g. running low on memory This will
* happen only if the policy is enabled - e.g. non durable topics
*
* @param context
* @param messageReference
*/
void messageDiscarded(ConnectionContext context, MessageReference messageReference);
void messageDiscarded(ConnectionContext context, MessageReference messageReference);
/**
* Called when there is a slow consumer
*
* @param context
* @param subs
*/
void slowConsumer(ConnectionContext context, Subscription subs);
void slowConsumer(ConnectionContext context, Subscription subs);
/**
* Called to notify a producer is too fast
*
* @param context
* @param producerInfo
*/
void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
void fastProducer(ConnectionContext context, ProducerInfo producerInfo);
/**
* Called when a Usage reaches a limit
*
* @param context
* @param usage
*/
void isFull(ConnectionContext context,Usage usage);
void isFull(ConnectionContext context, Usage usage);
List<Subscription> getConsumers();
/**
* called on Queues in slave mode to allow dispatch to follow subscription choice of master
* called on Queues in slave mode to allow dispatch to follow subscription
* choice of master
*
* @param messageDispatchNotification
* @throws Exception
*/
void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws Exception;
void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
}

View File

@ -45,8 +45,7 @@ public class DestinationFilter implements Destination {
this.next = next;
}
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
throws IOException {
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
next.acknowledge(context, sub, ack, node);
}
@ -108,13 +107,13 @@ public class DestinationFilter implements Destination {
/**
* Sends a message to the given destination which may be a wildcard
*
* @param context broker context
* @param message message to send
* @param destination possibly wildcard destination to send the message to
* @throws Exception on error
*/
protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination)
throws Exception {
protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
Broker broker = context.getConnectionContext().getBroker();
Set<Destination> destinations = broker.getDestinations(destination);
@ -130,24 +129,30 @@ public class DestinationFilter implements Destination {
public boolean isProducerFlowControl() {
return next.isProducerFlowControl();
}
public void setProducerFlowControl(boolean value){
public void setProducerFlowControl(boolean value) {
next.setProducerFlowControl(value);
}
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
next.addProducer(context, info);
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
}
public long getBlockedProducerWarningInterval() {
return next.getBlockedProducerWarningInterval();
}
public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
next.removeProducer(context, info);
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.addProducer(context, info);
}
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.removeProducer(context, info);
}
public int getMaxAuditDepth() {
return next.getMaxAuditDepth();
return next.getMaxAuditDepth();
}
public int getMaxProducersToAudit() {
@ -157,20 +162,19 @@ public class DestinationFilter implements Destination {
public boolean isEnableAudit() {
return next.isEnableAudit();
}
public void setEnableAudit(boolean enableAudit) {
next.setEnableAudit(enableAudit);
}
public void setMaxAuditDepth(int maxAuditDepth) {
next.setMaxAuditDepth(maxAuditDepth);
next.setMaxAuditDepth(maxAuditDepth);
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
next.setMaxProducersToAudit(maxProducersToAudit);
next.setMaxProducersToAudit(maxProducersToAudit);
}
public boolean isActive() {
return next.isActive();
}
@ -189,88 +193,81 @@ public class DestinationFilter implements Destination {
public void setUseCache(boolean useCache) {
next.setUseCache(useCache);
}
}
public int getMinimumMessageSize() {
return next.getMinimumMessageSize();
}
public void setMinimumMessageSize(int minimumMessageSize) {
next.setMinimumMessageSize(minimumMessageSize);
}
}
public void wakeup() {
next.wakeup();
}
public boolean isLazyDispatch() {
return next.isLazyDispatch();
return next.isLazyDispatch();
}
public void setLazyDispatch(boolean value) {
next.setLazyDispatch(value);
next.setLazyDispatch(value);
}
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
next.messageExpired(context, prefetchSubscription, node);
next.messageExpired(context, prefetchSubscription, node);
}
public boolean iterate() {
return next.iterate();
}
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
next.fastProducer(context, producerInfo);
public boolean iterate() {
return next.iterate();
}
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
next.fastProducer(context, producerInfo);
}
public void isFull(ConnectionContext context, Usage usage) {
next.isFull(context, usage);
next.isFull(context, usage);
}
public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
next.messageConsumed(context, messageReference);
}
public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
next.messageDelivered(context, messageReference);
}
public void messageDiscarded(ConnectionContext context,MessageReference messageReference) {
public void messageDiscarded(ConnectionContext context, MessageReference messageReference) {
next.messageDiscarded(context, messageReference);
}
public void slowConsumer(ConnectionContext context, Subscription subs) {
next.slowConsumer(context, subs);
next.slowConsumer(context, subs);
}
public void messageExpired(ConnectionContext context, Subscription subs,MessageReference node) {
next.messageExpired(context,subs, node);
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
next.messageExpired(context, subs, node);
}
public int getMaxBrowsePageSize() {
return next.getMaxBrowsePageSize();
return next.getMaxBrowsePageSize();
}
public void setMaxBrowsePageSize(int maxPageSize) {
next.setMaxBrowsePageSize(maxPageSize);
}
public void processDispatchNotification(
MessageDispatchNotification messageDispatchNotification) throws Exception {
next.processDispatchNotification(messageDispatchNotification);
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
next.processDispatchNotification(messageDispatchNotification);
}
public int getCursorMemoryHighWaterMark() {
return next.getCursorMemoryHighWaterMark();
}
public int getCursorMemoryHighWaterMark() {
return next.getCursorMemoryHighWaterMark();
}
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
}
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
}
}

View File

@ -41,6 +41,7 @@ import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -59,11 +60,11 @@ import java.util.concurrent.CopyOnWriteArraySet;
*
* @version $Revision: 1.21 $
*/
public class Topic extends BaseDestination implements Task{
public class Topic extends BaseDestination implements Task {
protected static final Log LOG = LogFactory.getLog(Topic.class);
private final TopicMessageStore topicStore;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
protected final Valve dispatchValve = new Valve(true);
protected final Valve dispatchValve = new Valve(true);
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
@ -71,24 +72,22 @@ public class Topic extends BaseDestination implements Task{
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
try {
Topic.this.taskRunner.wakeup();
} catch (InterruptedException e) {
}
try {
Topic.this.taskRunner.wakeup();
} catch (InterruptedException e) {
}
};
};
public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
this.topicStore=store;
this.topicStore = store;
//set default subscription recovery policy
subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy();
subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
public void initialize() throws Exception{
public void initialize() throws Exception {
super.initialize();
if (store != null) {
int messageCount = store.getMessageCount();
@ -140,7 +139,7 @@ public class Topic extends BaseDestination implements Task{
}
} else {
sub.add(context, this);
DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
}
}
@ -171,7 +170,7 @@ public class Topic extends BaseDestination implements Task{
// we are recovering a subscription to avoid out of order messages.
dispatchValve.turnOff();
try {
if (topicStore == null) {
return;
}
@ -195,21 +194,20 @@ public class Topic extends BaseDestination implements Task{
}
}
// Do we need to create the subscription?
if(info==null){
info=new SubscriptionInfo();
if (info == null) {
info = new SubscriptionInfo();
info.setClientId(clientId);
info.setSelector(selector);
info.setSubscriptionName(subscriptionName);
info.setDestination(getActiveMQDestination());
info.setDestination(getActiveMQDestination());
// This destination is an actual destination id.
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
// This destination might be a pattern
synchronized (consumers) {
consumers.add(subscription);
topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
}
}
final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
@ -223,7 +221,7 @@ public class Topic extends BaseDestination implements Task{
subscription.add(message);
}
} catch (IOException e) {
LOG.error("Failed to recover this message " + message);
LOG.error("Failed to recover this message " + message);
}
return true;
}
@ -235,7 +233,7 @@ public class Topic extends BaseDestination implements Task{
public boolean hasSpace() {
return true;
}
public boolean isDuplicate(MessageId id) {
return false;
}
@ -277,23 +275,24 @@ public class Topic extends BaseDestination implements Task{
return;
}
if(memoryUsage.isFull()) {
if (memoryUsage.isFull()) {
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if(warnOnProducerFlowControl) {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
LOG.info("Usage Manager memory limit reached for " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." +
" See http://activemq.apache.org/producer-flow-control.html for more info");
LOG.info("Usage Manager memory limit reached for " + getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." +
" See http://activemq.apache.org/producer-flow-control.html for more info");
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
// We can avoid blocking due to low usage if the producer is sending
// a sync message or
// if it is using a producer window
@ -301,9 +300,9 @@ public class Topic extends BaseDestination implements Task{
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
try {
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
@ -312,7 +311,7 @@ public class Topic extends BaseDestination implements Task{
} else {
doMessageSend(producerExchange, message);
}
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@ -321,7 +320,7 @@ public class Topic extends BaseDestination implements Task{
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
} catch (Exception e) {
if (!sendProducerAck && !context.isInRecoveryMode()) {
ExceptionResponse response = new ExceptionResponse(e);
@ -329,10 +328,10 @@ public class Topic extends BaseDestination implements Task{
context.getConnection().dispatchAsync(response);
}
}
}
});
// If the user manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
@ -342,24 +341,32 @@ public class Topic extends BaseDestination implements Task{
context.setDontSendReponse(true);
return;
}
} else {
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
int count = 0;
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
if (count > 2 && context.isInTransaction()) {
count =0;
int size = context.getTransaction().size();
LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
if (memoryUsage.isFull()) {
if (context.isInTransaction()) {
int count = 0;
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
if (count > 2 && context.isInTransaction()) {
count = 0;
int size = context.getTransaction().size();
LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
}
}
} else {
waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
}
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
@ -382,35 +389,28 @@ public class Topic extends BaseDestination implements Task{
}
/**
* do send the message - this needs to be synchronized to ensure messages are stored AND dispatched in
* the right order
* do send the message - this needs to be synchronized to ensure messages
* are stored AND dispatched in the right order
*
* @param producerExchange
* @param message
* @throws IOException
* @throws Exception
*/
synchronized void doMessageSend(
final ProducerBrokerExchange producerExchange, final Message message)
throws IOException, Exception {
final ConnectionContext context = producerExchange
.getConnectionContext();
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (topicStore != null && message.isPersistent()
&& !canOptimizeOutPersistence()) {
if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull()) {
final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." +
" See http://activemq.apache.org/producer-flow-control.html for more info";
LOG.info(logMessage);
final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
}
}
while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
throw new javax.jms.ResourceAllocationException(logMessage);
}
waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
}
topicStore.addMessage(context, message);
}
@ -457,14 +457,13 @@ public class Topic extends BaseDestination implements Task{
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
}
messageConsumed(context, node);
}
public void gc() {
}
@ -488,7 +487,7 @@ public class Topic extends BaseDestination implements Task{
if (memoryUsage != null) {
memoryUsage.stop();
}
if(this.topicStore != null) {
if (this.topicStore != null) {
this.topicStore.stop();
}
}
@ -510,7 +509,7 @@ public class Topic extends BaseDestination implements Task{
public boolean hasSpace() {
return true;
}
public boolean isDuplicate(MessageId id) {
return false;
}
@ -527,9 +526,9 @@ public class Topic extends BaseDestination implements Task{
}
return result.toArray(new Message[result.size()]);
}
public boolean iterate() {
synchronized(messagesWaitingForSpace) {
synchronized (messagesWaitingForSpace) {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
@ -538,12 +537,9 @@ public class Topic extends BaseDestination implements Task{
return false;
}
// Properties
// -------------------------------------------------------------------------
public DispatchPolicy getDispatchPolicy() {
return dispatchPolicy;
}
@ -560,17 +556,16 @@ public class Topic extends BaseDestination implements Task{
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
}
// Implementation methods
// -------------------------------------------------------------------------
public final void wakeup() {
}
protected void dispatch(final ConnectionContext context, Message message) throws Exception {
destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
dispatchValve.increment();
MessageEvaluationContext msgContext = null;
try {
if (!subscriptionRecoveryPolicy.add(context, message)) {
@ -587,17 +582,17 @@ public class Topic extends BaseDestination implements Task{
msgContext.setMessageReference(message);
if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
onMessageWithNoConsumers(context, message);
}
}
} finally {
dispatchValve.decrement();
if(msgContext != null) {
if (msgContext != null) {
msgContext.clear();
}
}
}
public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference);
destinationStatistics.getMessages().decrement();
destinationStatistics.getEnqueues().decrement();
@ -609,9 +604,24 @@ public class Topic extends BaseDestination implements Task{
try {
acknowledge(context, subs, ack, reference);
} catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ",e);
LOG.error("Failed to remove expired Message from the store ", e);
}
}
private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
long start = System.currentTimeMillis();
long nextWarn = start + blockedProducerWarningInterval;
while (!usage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
long now = System.currentTimeMillis();
if (now >= nextWarn) {
LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
nextWarn = now + blockedProducerWarningInterval;
}
}
}
}

View File

@ -60,6 +60,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int maxQueueAuditDepth=2048;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
private boolean optimizedDispatch=false;
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
@ -125,6 +126,7 @@ public class PolicyEntry extends DestinationMapEntry {
public void baseConfiguration(BaseDestination destination) {
destination.setProducerFlowControl(isProducerFlowControl());
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
destination.setEnableAudit(isEnableAudit());
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
destination.setMaxProducersToAudit(getMaxProducersToAudit());
@ -372,6 +374,27 @@ public class PolicyEntry extends DestinationMapEntry {
this.producerFlowControl = producerFlowControl;
}
/**
* Set's the interval at which warnings about producers being blocked by
* resource usage will be triggered. Values of 0 or less will disable
* warnings
*
* @param blockedProducerWarningInterval the interval at which warning about
* blocked producers will be triggered.
*/
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
this.blockedProducerWarningInterval = blockedProducerWarningInterval;
}
/**
*
* @return the interval at which warning about blocked producers will be
* triggered.
*/
public long getBlockedProducerWarningInterval() {
return blockedProducerWarningInterval;
}
/**
* @return the maxProducersToAudit
*/