[AMQ-9455] DestinationPolicy support for MessageInterceptorStrategy

(cherry picked from commit c465330be5)
This commit is contained in:
Matt Pavlovich 2024-03-17 19:32:53 -05:00
parent a12f030090
commit 6cc7658b94
No known key found for this signature in database
9 changed files with 758 additions and 1 deletions

View File

@ -28,6 +28,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
@ -35,7 +36,6 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
@ -99,6 +99,7 @@ public abstract class BaseDestination implements Destination {
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
protected int cursorMemoryHighWaterMark = 70;
protected int storeUsageHighWaterMark = 100;
private MessageInterceptorStrategy messageInterceptorStrategy;
private SlowConsumerStrategy slowConsumerStrategy;
private boolean prioritizedMessages;
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
@ -942,4 +943,12 @@ public abstract class BaseDestination implements Destination {
public SystemUsage getSystemUsage() {
return systemUsage;
}
public MessageInterceptorStrategy getMessageInterceptorStrategy() {
return this.messageInterceptorStrategy;
}
public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInterceptorStrategy) {
this.messageInterceptorStrategy = messageInterceptorStrategy;
}
}

View File

@ -49,6 +49,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import javax.jms.MessageFormatException;
import javax.jms.MessageFormatRuntimeException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerStoppedException;
@ -625,6 +627,15 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
if(getMessageInterceptorStrategy() != null) {
try {
getMessageInterceptorStrategy().process(producerExchange, message);
} catch (MessageFormatRuntimeException e) {
throw new MessageFormatException(e.getMessage(), e.getErrorCode());
}
}
if (message.isExpired()) {
// message not stored - or added to stats yet - so chuck here
broker.getRoot().messageExpired(context, message, null);

View File

@ -64,6 +64,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageFormatRuntimeException;
import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
@ -371,6 +373,14 @@ public class Topic extends BaseDestination implements Task {
message.setRegionDestination(this);
if(getMessageInterceptorStrategy() != null) {
try {
getMessageInterceptorStrategy().process(producerExchange, message);
} catch (MessageFormatRuntimeException e) {
throw new MessageFormatException(e.getMessage(), e.getErrorCode());
}
}
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if (message.isExpired()) {

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.Arrays;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import javax.jms.MessageFormatRuntimeException;
/**
* Configurable chain of MessageInterceptorStrategies
*
* @org.apache.xbean.XBean
*/
public class ChainMessageInterceptorStrategy implements MessageInterceptorStrategy {
private MessageInterceptorStrategy[] messageInterceptorStrategies;
@Override
public void process(ProducerBrokerExchange producerBrokerExchange, Message message) throws MessageFormatRuntimeException {
if(messageInterceptorStrategies == null || messageInterceptorStrategies.length == 0) {
return;
}
Arrays.stream(messageInterceptorStrategies).forEach(m -> m.process(producerBrokerExchange, message));
}
public void setMessageStrategies(MessageInterceptorStrategy[] messageInterceptorStrategies) {
this.messageInterceptorStrategies = messageInterceptorStrategies;
}
public MessageInterceptorStrategy[] getMessageInterceptorStrategies() {
return this.messageInterceptorStrategies;
}
}

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.MessageFormatRuntimeException;
/**
* Enforce message policies for JMS Header values
*
* @org.apache.xbean.XBean
*/
public class HeaderMessageInterceptorStrategy implements MessageInterceptorStrategy {
private static final Logger LOG = LoggerFactory.getLogger(HeaderMessageInterceptorStrategy.class);
boolean forceDeliveryMode = false;
boolean persistent = true;
boolean forceExpiration = false;
/**
* variable which (when non-zero) is used to override
* the expiration date for messages that arrive with
* no expiration date set (in Milliseconds).
*/
long zeroExpirationOverride = 0;
/**
* variable which (when non-zero) is used to limit
* the expiration date (in Milliseconds).
*/
long expirationCeiling = 0;
/**
* If true, the plugin will not update timestamp to past values
* False by default
*/
boolean futureOnly = false;
/**
* if true, update timestamp even if message has passed through a network
* default false
*/
boolean processNetworkMessages = false;
/**
* setter method for zeroExpirationOverride
*/
public void setZeroExpirationOverride(long ttl)
{
this.zeroExpirationOverride = ttl;
}
/**
* setter method for expirationCeiling
*/
public void setExpirationCeiling(long expirationCeiling)
{
this.expirationCeiling = expirationCeiling;
}
public void setFutureOnly(boolean futureOnly) {
this.futureOnly = futureOnly;
}
public void setProcessNetworkMessages(Boolean processNetworkMessages) {
this.processNetworkMessages = processNetworkMessages;
}
@Override
public void process(final ProducerBrokerExchange producerBrokerExchange, final Message message) throws MessageFormatRuntimeException {
if(!isProcessNetworkMessages() && producerBrokerExchange.getConnectionContext().isNetworkConnection()) {
// Message passed through a network and processNetworkMessages=true is not set
return;
}
if(isForceExpiration()) {
if (message.getTimestamp() > 0 && !message.getDestination().isDLQ()) {
long oldExpiration = message.getExpiration();
long newTimeStamp = System.currentTimeMillis();
long timeToLive = zeroExpirationOverride;
long oldTimestamp = message.getTimestamp();
if (oldExpiration > 0) {
timeToLive = oldExpiration - oldTimestamp;
}
if (timeToLive > 0 && expirationCeiling > 0 && timeToLive > expirationCeiling) {
timeToLive = expirationCeiling;
}
long expiration = timeToLive + newTimeStamp;
// In the scenario that the Broker is behind the clients we never want to set the
// Timestamp and Expiration in the past
if(!futureOnly || (expiration > oldExpiration)) {
if (timeToLive > 0 && expiration > 0) {
message.setExpiration(expiration);
}
message.setTimestamp(newTimeStamp);
LOG.debug("Set message {} timestamp from {} to {}", message.getMessageId(), oldTimestamp, newTimeStamp);
}
}
}
if(forceDeliveryMode) {
message.setPersistent(isPersistent());
}
}
public void setForceDeliveryMode(boolean forceDeliveryMode) {
this.forceDeliveryMode = forceDeliveryMode;
}
public boolean isForceDeliveryMode() {
return this.forceDeliveryMode;
}
public void setForceExpiration(boolean forceExpiration) {
this.forceExpiration = forceExpiration;
}
public boolean isForceExpiration() {
return this.forceExpiration;
}
public void setPersistent(boolean persistent) {
this.persistent = persistent;
}
public boolean isPersistent() {
return this.persistent;
}
public void setProcessNetworkMessages(boolean processNetworkMessages) {
this.processNetworkMessages = processNetworkMessages;
}
public boolean isProcessNetworkMessages() {
return this.processNetworkMessages;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import javax.jms.MessageFormatRuntimeException;
public interface MessageInterceptorStrategy {
/**
* When a PolicyEntry is configured with a MessageInterceptorStrategy, the
* process method is invoked with the current ProducerBrokerExchange and Message before
* the message is stored in any destination cache or persistence store.
*
* Implementations may reference data from the ProducerBrokerExchange and may check or
* modify headers, properties, body or other metadata on the Message.
*
* Any change to the message must adhere to OpenWire and ActiveMQ requirements or risk
* issues with memory usage, compatibility, and general correct functioning.
*
* Implementations shall not copy, or clone the message.
*
* Implementations may throw a <tt>MessageFormatRuntimeException</tt>
* that is returned to the client to indicate a message should not be added to the queue.
*
* @param producerBrokerExchange
* @param message
* @return
* @throws MessageFormatRuntimeException
*/
void process(final ProducerBrokerExchange producerBrokerExchange, final Message message) throws MessageFormatRuntimeException;
}

View File

@ -115,6 +115,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int sendFailIfNoSpace = -1;
private long sendFailIfNoSpaceAfterTimeout = -1;
private MessageInterceptorStrategy messageInterceptorStrategy = null;
public void configure(Broker broker,Queue queue) {
baseConfiguration(broker,queue);
@ -139,6 +140,7 @@ public class PolicyEntry extends DestinationMapEntry {
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
queue.setMessageInterceptorStrategy(getMessageInterceptorStrategy());
}
public void update(Queue queue) {
@ -201,6 +203,7 @@ public class PolicyEntry extends DestinationMapEntry {
topic.getMemoryUsage().setLimit(memoryLimit);
}
topic.setLazyDispatch(isLazyDispatch());
topic.setMessageInterceptorStrategy(getMessageInterceptorStrategy());
}
public void update(Topic topic) {
@ -1165,4 +1168,12 @@ public class PolicyEntry extends DestinationMapEntry {
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}
public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInterceptorStrategy) {
this.messageInterceptorStrategy = messageInterceptorStrategy;
}
public MessageInterceptorStrategy getMessageInterceptorStrategy() {
return this.messageInterceptorStrategy;
}
}

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import java.io.File;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageFormatRuntimeException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.TestSupport;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* This unit test is to test that MessageInterceptorStrategy features
*
*/
public class MessageInterceptorStrategyMemoryUsageTest extends TestSupport {
BrokerService broker;
ConnectionFactory factory;
Connection connection;
Session session;
MessageProducer producer;
QueueBrowser queueBrowser;
@Before
public void setUp() throws Exception {
broker = new BrokerService();
File testDataDir = new File("target/activemq-data/message-interceptor-strategy");
broker.setDataDirectoryFile(testDataDir);
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
.get(0).getConnectUri().toString());
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@After
public void tearDown() throws Exception {
if(producer != null) {
producer.close();
}
session.close();
connection.stop();
connection.close();
broker.stop();
}
/**
* Test sending messages that have body modified have correct usage
*
* Start with 10x 1k message bodies that get increased to 1mb
*/
@Test
public void testMemoryUsageBodyIncrease() throws Exception {
applyHeaderMessageInterceptor(1*1024*1024);
String queueName = "mis.bodySize.increase";
Queue queue = createQueue(queueName);
for (int i=0; i<10; i++) {
BytesMessage sendMessageP = session.createBytesMessage();
byte[] origBody = new byte[1*1024];
sendMessageP.writeBytes(origBody);
producer.send(queue, sendMessageP);
}
QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
assertEquals(Long.valueOf(10_496_000l), Long.valueOf(queueViewMBean.getMemoryUsageByteCount()));
}
/**
* Test sending messages that have body modified have correct usage
*
* Start with 10x 1mb message bodies that get decreased to 1kb
*/
@Test
public void testMemoryUsageBodyDecrease() throws Exception {
applyHeaderMessageInterceptor(1*1024);
String queueName = "mis.bodySize.decrease";
Queue queue = createQueue(queueName);
for (int i=0; i<10; i++) {
BytesMessage sendMessageP = session.createBytesMessage();
byte[] origBody = new byte[1*1024*1024];
sendMessageP.writeBytes(origBody);
producer.send(queue, sendMessageP);
}
QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
assertEquals(Long.valueOf(20_480), Long.valueOf(queueViewMBean.getMemoryUsageByteCount()));
}
private PolicyMap applyHeaderMessageInterceptor(final int bodySize) {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
MessageInterceptorStrategy bodySizeMessageInterceptorStrategy= new MessageInterceptorStrategy() {
@Override
public void process(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) throws MessageFormatRuntimeException {
if(bodySize > 0) {
try {
message.clearBody();
} catch (JMSException e) {
fail(e.getMessage());
}
byte[] newBody = new byte[bodySize];
new Random().nextBytes(newBody);
message.setContent(new ByteSequence(newBody));
message.storeContent();
}
}
};
defaultEntry.setMessageInterceptorStrategy(bodySizeMessageInterceptorStrategy);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
return policyMap;
}
private Queue createQueue(String queueName) throws Exception {
Queue queue = session.createQueue(queueName);
producer = session.createProducer(queue);
return queue;
}
}

View File

@ -0,0 +1,291 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import java.io.File;
import java.time.Duration;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.HeaderMessageInterceptorStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.test.TestSupport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* This unit test is to test that MessageInterceptorStrategy features
*
*/
public class MessageInterceptorStrategyTest extends TestSupport {
BrokerService broker;
ConnectionFactory factory;
Connection connection;
Session session;
MessageProducer producer;
QueueBrowser queueBrowser;
@Before
public void setUp() throws Exception {
broker = new BrokerService();
File testDataDir = new File("target/activemq-data/message-interceptor-strategy");
broker.setDataDirectoryFile(testDataDir);
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
.get(0).getConnectUri().toString());
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@After
public void tearDown() throws Exception {
if(producer != null) {
producer.close();
}
session.close();
connection.stop();
connection.close();
broker.stop();
}
/**
* Test sending messages can be forced to Persistent
*/
@Test
public void testForceDeliveryModePersistent() throws Exception {
applyHeaderMessageInterceptor(true, true, false, 0l, Long.MAX_VALUE);
Queue queue = createQueue("mis.forceDeliveryMode.true");
Message sendMessageP = session.createTextMessage("forceDeliveryMode=true");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(queue, sendMessageP);
Message sendMessageNP = session.createTextMessage("forceDeliveryMode=true");
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(queue, sendMessageNP);
queueBrowser = session.createBrowser(queue);
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
int count = 0;
while(browseEnumeration.hasMoreElements()) {
Message message = (Message)browseEnumeration.nextElement();
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
count++;
}
assertEquals(Integer.valueOf(2), Integer.valueOf(count));
}
/**
* Test sending messages can be forced to NonPersistent
*/
@Test
public void testForceDeliveryModeNonPersistent() throws Exception {
applyHeaderMessageInterceptor(true, false, false, 0l, Long.MAX_VALUE);
Queue queue = createQueue("mis.forceDeliveryMode.false");
Message sendMessageP = session.createTextMessage("forceDeliveryMode=false");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(queue, sendMessageP);
Message sendMessageNP = session.createTextMessage("forceDeliveryMode=false");
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(queue, sendMessageNP);
queueBrowser = session.createBrowser(queue);
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
int count = 0;
while(browseEnumeration.hasMoreElements()) {
Message message = (Message)browseEnumeration.nextElement();
assertEquals(DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
count++;
}
assertEquals(Integer.valueOf(2), Integer.valueOf(count));
}
/**
* Test not overriding expiration
*/
@Test
public void testForceExpirationDisabled() throws Exception {
applyHeaderMessageInterceptor(false, false, false, 100_000l, Long.MAX_VALUE);
Queue queue = createQueue("mis.forceExpiration.zero");
Message sendMessageP = session.createTextMessage("expiration=zero");
producer.setTimeToLive(0l);
producer.send(queue, sendMessageP);
queueBrowser = session.createBrowser(queue);
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
int count = 0;
while(browseEnumeration.hasMoreElements()) {
Message message = (Message)browseEnumeration.nextElement();
assertEquals(Long.valueOf(0l), Long.valueOf(message.getJMSExpiration()));
count++;
}
assertEquals(Integer.valueOf(1), Integer.valueOf(count));
}
/**
* Test overriding zero (0) expiration
*/
@Test
public void testForceExpirationZeroOverride() throws Exception {
long expiryTime = 100_000l;
applyHeaderMessageInterceptor(false, false, true, expiryTime, Long.MAX_VALUE);
long currentTime = System.currentTimeMillis();
Queue queue = createQueue("mis.forceExpiration.100k");
Message sendMessageP = session.createTextMessage("expiration=zero");
producer.setTimeToLive(100_000l);
producer.send(queue, sendMessageP);
queueBrowser = session.createBrowser(queue);
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
int count = 0;
while(browseEnumeration.hasMoreElements()) {
Message message = (Message)browseEnumeration.nextElement();
assertTrue(Long.valueOf(message.getJMSExpiration()) > currentTime + (expiryTime / 2));
count++;
}
assertEquals(Integer.valueOf(1), Integer.valueOf(count));
}
/**
* Test overriding zero (0) expiration
*/
@Test
public void testForceExpirationZeroOverrideDLQ() throws Exception {
long expiryTime = 1l;
applyHeaderMessageInterceptor(false, false, true, expiryTime, Long.MAX_VALUE);
Queue queue = createQueue("mis.forceExpiration.zero-no-dlq-expiry");
Message sendMessageP = session.createTextMessage("expiration=zero-no-dlq-expiry");
producer.send(queue, sendMessageP);
Thread.sleep(250l);
queueBrowser = session.createBrowser(queue);
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
int count = 0;
while(browseEnumeration.hasMoreElements()) {
count++;
}
assertEquals(Integer.valueOf(0), Integer.valueOf(count));
QueueBrowser dlqQueueBrowser = session.createBrowser(createQueue("mis.forceExpiration.zero-no-dlq-expiry.dlq"));
Enumeration<?> dlqBrowseEnumeration = dlqQueueBrowser.getEnumeration();
int dlqCount = 0;
while(dlqBrowseEnumeration.hasMoreElements()) {
Message dlqMessage = (Message)dlqBrowseEnumeration.nextElement();
assertEquals(sendMessageP.getJMSMessageID(), dlqMessage.getJMSMessageID());
assertEquals("Expiration should be zero" + dlqMessage.getJMSExpiration() + "\n", dlqMessage.getJMSExpiration(), 0);
dlqCount++;
}
assertEquals(Integer.valueOf(1), Integer.valueOf(dlqCount));
}
/**
* Test overriding expiration ceiling
*/
@Test
public void testForceExpirationCeilingOverride() throws Exception {
long zeroOverrideExpiryTime = 100_000l;
long expirationCeiling = Duration.ofDays(1).toMillis();
applyHeaderMessageInterceptor(false, false, true, zeroOverrideExpiryTime, expirationCeiling);
long currentTime = System.currentTimeMillis();
long expiryTime = Duration.ofDays(10).toMillis();
Queue queue = createQueue("mis.forceExpiration.maxValue");
Message sendMessageP = session.createTextMessage("expiration=ceiling");
producer.setTimeToLive(expiryTime);
producer.send(queue, sendMessageP);
queueBrowser = session.createBrowser(queue);
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
int count = 0;
while(browseEnumeration.hasMoreElements()) {
Message message = (Message)browseEnumeration.nextElement();
assertTrue(Long.valueOf(message.getJMSExpiration()) < (currentTime + Duration.ofDays(9).toMillis()));
count++;
}
assertEquals(Integer.valueOf(1), Integer.valueOf(count));
}
private PolicyMap applyHeaderMessageInterceptor(boolean forceDeliveryMode, boolean persistent, boolean forceExpiration, long zeroExpirationOverride, long expirationCeiling) {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
HeaderMessageInterceptorStrategy headerMessageInterceptorStrategy = new HeaderMessageInterceptorStrategy();
// Persistence related fields
headerMessageInterceptorStrategy.setForceDeliveryMode(forceDeliveryMode);
headerMessageInterceptorStrategy.setPersistent(persistent);
// Expiration related fields
headerMessageInterceptorStrategy.setForceExpiration(forceExpiration);
headerMessageInterceptorStrategy.setZeroExpirationOverride(zeroExpirationOverride);
headerMessageInterceptorStrategy.setExpirationCeiling(expirationCeiling);
defaultEntry.setMessageInterceptorStrategy(headerMessageInterceptorStrategy);
IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy();
individualDeadLetterStrategy.setQueuePrefix("");
individualDeadLetterStrategy.setQueueSuffix(".dlq");
defaultEntry.setDeadLetterStrategy(individualDeadLetterStrategy);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
return policyMap;
}
private Queue createQueue(String queueName) throws Exception {
Queue queue = session.createQueue(queueName);
producer = session.createProducer(queue);
return queue;
}
}