diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java index 5ab1c0cfd0..bc17ef9467 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java @@ -19,6 +19,8 @@ package org.apache.activemq.broker.util; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.Message; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A Broker interceptor which updates a JMS Client's timestamp on the message @@ -30,24 +32,70 @@ import org.apache.activemq.command.Message; * timestamp the consumer will observe when he receives the message. This plugin * is not enabled in the default ActiveMQ configuration. * + * 2 new attributes have been added which will allow the administrator some override control + * over the expiration time for incoming messages: + * + * Attribute 'zeroExpirationOverride' can be used to apply an expiration + * time to incoming messages with no expiration defined (messages that would never expire) + * + * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time + * * @org.apache.xbean.XBean element="timeStampingBrokerPlugin" * * @version $Revision$ */ public class TimeStampingBrokerPlugin extends BrokerPluginSupport { + + private static final Log LOG = LogFactory.getLog(TimeStampingBrokerPlugin.class); + + /** + * variable which (when non-zero) is used to override + * the expiration date for messages that arrive with + * no expiration date set (in Milliseconds). + */ + long zeroExpirationOverride = 0; + + /** + * variable which (when non-zero) is used to limit + * the expiration date (in Milliseconds). + */ + long ttlCeiling = 0; + + /** + * setter method for zeroExpirationOverride + */ + public void setZeroExpirationOverride(long ttl) + { + this.zeroExpirationOverride = ttl; + } + + /** + * setter method for ttlCeiling + */ + public void setTtlCeiling(long ttlCeiling) + { + this.ttlCeiling = ttlCeiling; + } + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { // timestamp not been disabled and has not passed through a network long oldExpiration = message.getExpiration(); long newTimeStamp = System.currentTimeMillis(); + message.setTimestamp(newTimeStamp); + long timeToLive = zeroExpirationOverride; if (oldExpiration > 0) { long oldTimestamp = message.getTimestamp(); - long timeToLive = oldExpiration-oldTimestamp; - long expiration = timeToLive+newTimeStamp; + timeToLive = oldExpiration - oldTimestamp; + } + if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) { + timeToLive = ttlCeiling; + } + long expiration = timeToLive + newTimeStamp; + if (timeToLive > 0 && expiration > 0) { message.setExpiration(expiration); } - message.setTimestamp(newTimeStamp); } super.send(producerExchange, message); }