mirror of https://github.com/apache/activemq.git
Apply patch for https://issues.apache.org/jira/browse/AMQ-3541
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1189980 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
48bf681246
commit
8340decaf3
|
@ -18,6 +18,9 @@ package org.apache.activemq.broker.util;
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerPluginSupport;
|
import org.apache.activemq.broker.BrokerPluginSupport;
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -26,12 +29,12 @@ import org.slf4j.LoggerFactory;
|
||||||
* A Broker interceptor which updates a JMS Client's timestamp on the message
|
* A Broker interceptor which updates a JMS Client's timestamp on the message
|
||||||
* with a broker timestamp. Useful when the clocks on client machines are known
|
* with a broker timestamp. Useful when the clocks on client machines are known
|
||||||
* to not be correct and you can only trust the time set on the broker machines.
|
* to not be correct and you can only trust the time set on the broker machines.
|
||||||
*
|
*
|
||||||
* Enabling this plugin will break JMS compliance since the timestamp that the
|
* Enabling this plugin will break JMS compliance since the timestamp that the
|
||||||
* producer sees on the messages after as send() will be different from the
|
* producer sees on the messages after as send() will be different from the
|
||||||
* timestamp the consumer will observe when he receives the message. This plugin
|
* timestamp the consumer will observe when he receives the message. This plugin
|
||||||
* is not enabled in the default ActiveMQ configuration.
|
* is not enabled in the default ActiveMQ configuration.
|
||||||
*
|
*
|
||||||
* 2 new attributes have been added which will allow the administrator some override control
|
* 2 new attributes have been added which will allow the administrator some override control
|
||||||
* over the expiration time for incoming messages:
|
* over the expiration time for incoming messages:
|
||||||
*
|
*
|
||||||
|
@ -41,38 +44,38 @@ import org.slf4j.LoggerFactory;
|
||||||
* Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
|
* Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
|
||||||
*
|
*
|
||||||
* @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
|
* @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
|
public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
|
||||||
/**
|
/**
|
||||||
* variable which (when non-zero) is used to override
|
* variable which (when non-zero) is used to override
|
||||||
* the expiration date for messages that arrive with
|
* the expiration date for messages that arrive with
|
||||||
* no expiration date set (in Milliseconds).
|
* no expiration date set (in Milliseconds).
|
||||||
*/
|
*/
|
||||||
long zeroExpirationOverride = 0;
|
long zeroExpirationOverride = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* variable which (when non-zero) is used to limit
|
* variable which (when non-zero) is used to limit
|
||||||
* the expiration date (in Milliseconds).
|
* the expiration date (in Milliseconds).
|
||||||
*/
|
*/
|
||||||
long ttlCeiling = 0;
|
long ttlCeiling = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If true, the plugin will not update timestamp to past values
|
* If true, the plugin will not update timestamp to past values
|
||||||
* False by default
|
* False by default
|
||||||
*/
|
*/
|
||||||
boolean futureOnly = false;
|
boolean futureOnly = false;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* if true, update timestamp even if message has passed through a network
|
* if true, update timestamp even if message has passed through a network
|
||||||
* default false
|
* default false
|
||||||
*/
|
*/
|
||||||
boolean processNetworkMessages = false;
|
boolean processNetworkMessages = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* setter method for zeroExpirationOverride
|
* setter method for zeroExpirationOverride
|
||||||
*/
|
*/
|
||||||
public void setZeroExpirationOverride(long ttl)
|
public void setZeroExpirationOverride(long ttl)
|
||||||
|
@ -80,7 +83,7 @@ public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
|
||||||
this.zeroExpirationOverride = ttl;
|
this.zeroExpirationOverride = ttl;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* setter method for ttlCeiling
|
* setter method for ttlCeiling
|
||||||
*/
|
*/
|
||||||
public void setTtlCeiling(long ttlCeiling)
|
public void setTtlCeiling(long ttlCeiling)
|
||||||
|
@ -88,19 +91,21 @@ public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
|
||||||
this.ttlCeiling = ttlCeiling;
|
this.ttlCeiling = ttlCeiling;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setFutureOnly(boolean futureOnly) {
|
public void setFutureOnly(boolean futureOnly) {
|
||||||
this.futureOnly = futureOnly;
|
this.futureOnly = futureOnly;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setProcessNetworkMessages(Boolean processNetworkMessages) {
|
|
||||||
this.processNetworkMessages = processNetworkMessages;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
public void setProcessNetworkMessages(Boolean processNetworkMessages) {
|
||||||
|
this.processNetworkMessages = processNetworkMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
|
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
|
||||||
if (message.getTimestamp() > 0
|
|
||||||
&& (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
|
if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
|
||||||
|
(processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
|
||||||
// timestamp not been disabled and has not passed through a network or processNetworkMessages=true
|
// timestamp not been disabled and has not passed through a network or processNetworkMessages=true
|
||||||
|
|
||||||
long oldExpiration = message.getExpiration();
|
long oldExpiration = message.getExpiration();
|
||||||
long newTimeStamp = System.currentTimeMillis();
|
long newTimeStamp = System.currentTimeMillis();
|
||||||
long timeToLive = zeroExpirationOverride;
|
long timeToLive = zeroExpirationOverride;
|
||||||
|
@ -112,17 +117,40 @@ public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
|
||||||
timeToLive = ttlCeiling;
|
timeToLive = ttlCeiling;
|
||||||
}
|
}
|
||||||
long expiration = timeToLive + newTimeStamp;
|
long expiration = timeToLive + newTimeStamp;
|
||||||
//In the scenario that the Broker is behind the clients we never want to set the Timestamp and Expiration in the past
|
// In the scenario that the Broker is behind the clients we never want to set the
|
||||||
if(!futureOnly || (expiration > oldExpiration)) {
|
// Timestamp and Expiration in the past
|
||||||
if (timeToLive > 0 && expiration > 0) {
|
if(!futureOnly || (expiration > oldExpiration)) {
|
||||||
message.setExpiration(expiration);
|
if (timeToLive > 0 && expiration > 0) {
|
||||||
}
|
message.setExpiration(expiration);
|
||||||
message.setTimestamp(newTimeStamp);
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
message.setTimestamp(newTimeStamp);
|
||||||
LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
|
if (LOG.isDebugEnabled()) {
|
||||||
}
|
LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
super.send(producerExchange, message);
|
super.send(producerExchange, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isDestinationDLQ(Message message) {
|
||||||
|
DeadLetterStrategy deadLetterStrategy;
|
||||||
|
Message tmp;
|
||||||
|
|
||||||
|
if (message != null && message.getRegionDestination() != null) {
|
||||||
|
deadLetterStrategy = message.getRegionDestination().getDeadLetterStrategy();
|
||||||
|
if (deadLetterStrategy != null) {
|
||||||
|
// Cheap copy, since we only need two fields
|
||||||
|
tmp = new ActiveMQMessage();
|
||||||
|
tmp.setDestination(message.getOriginalDestination());
|
||||||
|
tmp.setRegionDestination(message.getRegionDestination());
|
||||||
|
|
||||||
|
// Determine if we are headed for a DLQ
|
||||||
|
ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp, null);
|
||||||
|
if (deadLetterDestination.equals(message.getDestination())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,197 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.util;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TimeStampingBrokerPluginTest extends TestCase {
|
||||||
|
|
||||||
|
BrokerService broker;
|
||||||
|
TransportConnector tcpConnector;
|
||||||
|
MessageProducer producer;
|
||||||
|
MessageConsumer consumer;
|
||||||
|
Connection connection;
|
||||||
|
Session session;
|
||||||
|
Destination destination;
|
||||||
|
String queue = "TEST.FOO";
|
||||||
|
long expiry = 500;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
|
||||||
|
tsbp.setZeroExpirationOverride(expiry);
|
||||||
|
tsbp.setTtlCeiling(expiry);
|
||||||
|
|
||||||
|
broker = new BrokerService();
|
||||||
|
broker.setPersistent(false);
|
||||||
|
broker.setUseJmx(true);
|
||||||
|
broker.setPlugins(new BrokerPlugin[] {tsbp});
|
||||||
|
tcpConnector = broker.addConnector("tcp://localhost:0");
|
||||||
|
|
||||||
|
// Add policy and individual DLQ strategy
|
||||||
|
PolicyEntry policy = new PolicyEntry();
|
||||||
|
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
|
||||||
|
strategy.setProcessExpired(true);
|
||||||
|
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
|
||||||
|
((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
|
||||||
|
strategy.setProcessNonPersistent(true);
|
||||||
|
policy.setDeadLetterStrategy(strategy);
|
||||||
|
|
||||||
|
PolicyMap pMap = new PolicyMap();
|
||||||
|
pMap.setDefaultEntry(policy);
|
||||||
|
|
||||||
|
broker.setDestinationPolicy(pMap);
|
||||||
|
|
||||||
|
broker.start();
|
||||||
|
// Create a ConnectionFactory
|
||||||
|
ActiveMQConnectionFactory connectionFactory =
|
||||||
|
new ActiveMQConnectionFactory(tcpConnector.getConnectUri());
|
||||||
|
|
||||||
|
// Create a Connection
|
||||||
|
connection = connectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
// Create a Session
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
// Create the destination Queue
|
||||||
|
destination = session.createQueue(queue);
|
||||||
|
|
||||||
|
// Create a MessageProducer from the Session to the Topic or Queue
|
||||||
|
producer = session.createProducer(destination);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
// Clean up
|
||||||
|
producer.close();
|
||||||
|
consumer.close();
|
||||||
|
session.close();
|
||||||
|
connection.close();
|
||||||
|
broker.stop();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testExpirationSet() throws Exception {
|
||||||
|
|
||||||
|
// Create a messages
|
||||||
|
Message sentMessage = session.createMessage();
|
||||||
|
|
||||||
|
// Tell the producer to send the message
|
||||||
|
long beforeSend = System.currentTimeMillis();
|
||||||
|
producer.send(sentMessage);
|
||||||
|
|
||||||
|
// Create a MessageConsumer from the Session to the Topic or Queue
|
||||||
|
consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Wait for a message
|
||||||
|
Message receivedMessage = consumer.receive(1000);
|
||||||
|
|
||||||
|
// assert we got the same message ID we sent
|
||||||
|
assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
|
||||||
|
|
||||||
|
// assert message timestamp is in window
|
||||||
|
assertTrue("Expiration should be not null" + receivedMessage.getJMSExpiration() + "\n", Long.valueOf(receivedMessage.getJMSExpiration()) != null);
|
||||||
|
|
||||||
|
// assert message expiration is in window
|
||||||
|
assertTrue("Before send: " + beforeSend + " Msg ts: " + receivedMessage.getJMSTimestamp() + " Msg Expiry: " + receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration() && receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + expiry));
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testExpirationCelingSet() throws Exception {
|
||||||
|
|
||||||
|
// Create a messages
|
||||||
|
Message sentMessage = session.createMessage();
|
||||||
|
// Tell the producer to send the message
|
||||||
|
long beforeSend = System.currentTimeMillis();
|
||||||
|
long sendExpiry = beforeSend + (expiry*22);
|
||||||
|
sentMessage.setJMSExpiration(sendExpiry);
|
||||||
|
|
||||||
|
producer.send(sentMessage);
|
||||||
|
|
||||||
|
// Create a MessageConsumer from the Session to the Topic or Queue
|
||||||
|
consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Wait for a message
|
||||||
|
Message receivedMessage = consumer.receive(1000);
|
||||||
|
|
||||||
|
// assert we got the same message ID we sent
|
||||||
|
assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
|
||||||
|
|
||||||
|
// assert message timestamp is in window
|
||||||
|
assertTrue("Expiration should be not null" + receivedMessage.getJMSExpiration() + "\n", Long.valueOf(receivedMessage.getJMSExpiration()) != null);
|
||||||
|
|
||||||
|
// assert message expiration is in window
|
||||||
|
assertTrue("Sent expiry: " + sendExpiry + " Recv ts: " + receivedMessage.getJMSTimestamp() + " Recv expiry: " + receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration() && receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + expiry));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpirationDLQ() throws Exception {
|
||||||
|
|
||||||
|
// Create a messages
|
||||||
|
Message sentMessage = session.createMessage();
|
||||||
|
// Tell the producer to send the message
|
||||||
|
long beforeSend = System.currentTimeMillis();
|
||||||
|
long sendExpiry = beforeSend + expiry;
|
||||||
|
sentMessage.setJMSExpiration(sendExpiry);
|
||||||
|
|
||||||
|
producer.send(sentMessage);
|
||||||
|
|
||||||
|
// Create a MessageConsumer from the Session to the Topic or Queue
|
||||||
|
consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
Thread.sleep(expiry+250);
|
||||||
|
|
||||||
|
// Wait for a message
|
||||||
|
Message receivedMessage = consumer.receive(1000);
|
||||||
|
|
||||||
|
// Message should roll to DLQ
|
||||||
|
assertNull(receivedMessage);
|
||||||
|
|
||||||
|
// Close old consumer, setup DLQ listener
|
||||||
|
consumer.close();
|
||||||
|
consumer = session.createConsumer(session.createQueue("DLQ."+queue));
|
||||||
|
|
||||||
|
// Get mesage from DLQ
|
||||||
|
receivedMessage = consumer.receive(1000);
|
||||||
|
|
||||||
|
// assert we got the same message ID we sent
|
||||||
|
assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
|
||||||
|
|
||||||
|
// assert message timestamp is in window
|
||||||
|
//System.out.println("Recv: " + receivedMessage.getJMSExpiration());
|
||||||
|
assertEquals("Expiration should be zero" + receivedMessage.getJMSExpiration() + "\n", receivedMessage.getJMSExpiration(), 0);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue