fix for https://issues.apache.org/activemq/browse/AMQ-2221 - improvments for timestamp plugin

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@768219 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-04-24 08:27:10 +00:00
parent 6fc95f5ce8
commit 82d7182939
1 changed files with 51 additions and 3 deletions

View File

@ -19,6 +19,8 @@ package org.apache.activemq.broker.util;
import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* A Broker interceptor which updates a JMS Client's timestamp on the message * A Broker interceptor which updates a JMS Client's timestamp on the message
@ -30,24 +32,70 @@ import org.apache.activemq.command.Message;
* timestamp the consumer will observe when he receives the message. This plugin * timestamp the consumer will observe when he receives the message. This plugin
* is not enabled in the default ActiveMQ configuration. * is not enabled in the default ActiveMQ configuration.
* *
* 2 new attributes have been added which will allow the administrator some override control
* over the expiration time for incoming messages:
*
* Attribute 'zeroExpirationOverride' can be used to apply an expiration
* time to incoming messages with no expiration defined (messages that would never expire)
*
* Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
*
* @org.apache.xbean.XBean element="timeStampingBrokerPlugin" * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
* *
* @version $Revision$ * @version $Revision$
*/ */
public class TimeStampingBrokerPlugin extends BrokerPluginSupport { public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
private static final Log LOG = LogFactory.getLog(TimeStampingBrokerPlugin.class);
/**
* variable which (when non-zero) is used to override
* the expiration date for messages that arrive with
* no expiration date set (in Milliseconds).
*/
long zeroExpirationOverride = 0;
/**
* variable which (when non-zero) is used to limit
* the expiration date (in Milliseconds).
*/
long ttlCeiling = 0;
/**
* setter method for zeroExpirationOverride
*/
public void setZeroExpirationOverride(long ttl)
{
this.zeroExpirationOverride = ttl;
}
/**
* setter method for ttlCeiling
*/
public void setTtlCeiling(long ttlCeiling)
{
this.ttlCeiling = ttlCeiling;
}
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
if (message.getTimestamp() > 0 if (message.getTimestamp() > 0
&& (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
// timestamp not been disabled and has not passed through a network // timestamp not been disabled and has not passed through a network
long oldExpiration = message.getExpiration(); long oldExpiration = message.getExpiration();
long newTimeStamp = System.currentTimeMillis(); long newTimeStamp = System.currentTimeMillis();
message.setTimestamp(newTimeStamp);
long timeToLive = zeroExpirationOverride;
if (oldExpiration > 0) { if (oldExpiration > 0) {
long oldTimestamp = message.getTimestamp(); long oldTimestamp = message.getTimestamp();
long timeToLive = oldExpiration-oldTimestamp; timeToLive = oldExpiration - oldTimestamp;
long expiration = timeToLive+newTimeStamp; }
if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
timeToLive = ttlCeiling;
}
long expiration = timeToLive + newTimeStamp;
if (timeToLive > 0 && expiration > 0) {
message.setExpiration(expiration); message.setExpiration(expiration);
} }
message.setTimestamp(newTimeStamp);
} }
super.send(producerExchange, message); super.send(producerExchange, message);
} }