mirror of
https://github.com/apache/activemq.git
synced 2025-02-09 19:45:55 +00:00
[AMQ-9455] DestinationPolicy support for MessageInterceptorStrategy
This commit is contained in:
parent
9d11e0d7e9
commit
c465330be5
@ -28,6 +28,7 @@ import org.apache.activemq.broker.BrokerService;
|
|||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
|
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
@ -35,7 +36,6 @@ import org.apache.activemq.command.Message;
|
|||||||
import org.apache.activemq.command.MessageAck;
|
import org.apache.activemq.command.MessageAck;
|
||||||
import org.apache.activemq.command.MessageDispatchNotification;
|
import org.apache.activemq.command.MessageDispatchNotification;
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
|
|
||||||
import org.apache.activemq.security.SecurityContext;
|
import org.apache.activemq.security.SecurityContext;
|
||||||
import org.apache.activemq.state.ProducerState;
|
import org.apache.activemq.state.ProducerState;
|
||||||
import org.apache.activemq.store.MessageStore;
|
import org.apache.activemq.store.MessageStore;
|
||||||
@ -99,6 +99,7 @@ public abstract class BaseDestination implements Destination {
|
|||||||
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
|
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
|
||||||
protected int cursorMemoryHighWaterMark = 70;
|
protected int cursorMemoryHighWaterMark = 70;
|
||||||
protected int storeUsageHighWaterMark = 100;
|
protected int storeUsageHighWaterMark = 100;
|
||||||
|
private MessageInterceptorStrategy messageInterceptorStrategy;
|
||||||
private SlowConsumerStrategy slowConsumerStrategy;
|
private SlowConsumerStrategy slowConsumerStrategy;
|
||||||
private boolean prioritizedMessages;
|
private boolean prioritizedMessages;
|
||||||
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
||||||
@ -942,4 +943,12 @@ public abstract class BaseDestination implements Destination {
|
|||||||
public SystemUsage getSystemUsage() {
|
public SystemUsage getSystemUsage() {
|
||||||
return systemUsage;
|
return systemUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MessageInterceptorStrategy getMessageInterceptorStrategy() {
|
||||||
|
return this.messageInterceptorStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInterceptorStrategy) {
|
||||||
|
this.messageInterceptorStrategy = messageInterceptorStrategy;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||||||
|
|
||||||
import jakarta.jms.InvalidSelectorException;
|
import jakarta.jms.InvalidSelectorException;
|
||||||
import jakarta.jms.JMSException;
|
import jakarta.jms.JMSException;
|
||||||
|
import jakarta.jms.MessageFormatException;
|
||||||
|
import jakarta.jms.MessageFormatRuntimeException;
|
||||||
import jakarta.jms.ResourceAllocationException;
|
import jakarta.jms.ResourceAllocationException;
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
@ -625,6 +627,15 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||||||
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
|
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
|
||||||
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
|
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
|
||||||
&& !context.isInRecoveryMode();
|
&& !context.isInRecoveryMode();
|
||||||
|
|
||||||
|
if(getMessageInterceptorStrategy() != null) {
|
||||||
|
try {
|
||||||
|
getMessageInterceptorStrategy().process(producerExchange, message);
|
||||||
|
} catch (MessageFormatRuntimeException e) {
|
||||||
|
throw new MessageFormatException(e.getMessage(), e.getErrorCode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (message.isExpired()) {
|
if (message.isExpired()) {
|
||||||
// message not stored - or added to stats yet - so chuck here
|
// message not stored - or added to stats yet - so chuck here
|
||||||
broker.getRoot().messageExpired(context, message, null);
|
broker.getRoot().messageExpired(context, message, null);
|
||||||
|
@ -64,6 +64,8 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import jakarta.jms.JMSException;
|
import jakarta.jms.JMSException;
|
||||||
|
import jakarta.jms.MessageFormatException;
|
||||||
|
import jakarta.jms.MessageFormatRuntimeException;
|
||||||
|
|
||||||
import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
|
import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
|
||||||
|
|
||||||
@ -371,6 +373,14 @@ public class Topic extends BaseDestination implements Task {
|
|||||||
|
|
||||||
message.setRegionDestination(this);
|
message.setRegionDestination(this);
|
||||||
|
|
||||||
|
if(getMessageInterceptorStrategy() != null) {
|
||||||
|
try {
|
||||||
|
getMessageInterceptorStrategy().process(producerExchange, message);
|
||||||
|
} catch (MessageFormatRuntimeException e) {
|
||||||
|
throw new MessageFormatException(e.getMessage(), e.getErrorCode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// There is delay between the client sending it and it arriving at the
|
// There is delay between the client sending it and it arriving at the
|
||||||
// destination.. it may have expired.
|
// destination.. it may have expired.
|
||||||
if (message.isExpired()) {
|
if (message.isExpired()) {
|
||||||
|
@ -0,0 +1,51 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.region.policy;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
|
||||||
|
import jakarta.jms.MessageFormatRuntimeException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configurable chain of MessageInterceptorStrategies
|
||||||
|
*
|
||||||
|
* @org.apache.xbean.XBean
|
||||||
|
*/
|
||||||
|
public class ChainMessageInterceptorStrategy implements MessageInterceptorStrategy {
|
||||||
|
|
||||||
|
private MessageInterceptorStrategy[] messageInterceptorStrategies;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(ProducerBrokerExchange producerBrokerExchange, Message message) throws MessageFormatRuntimeException {
|
||||||
|
if(messageInterceptorStrategies == null || messageInterceptorStrategies.length == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Arrays.stream(messageInterceptorStrategies).forEach(m -> m.process(producerBrokerExchange, message));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessageStrategies(MessageInterceptorStrategy[] messageInterceptorStrategies) {
|
||||||
|
this.messageInterceptorStrategies = messageInterceptorStrategies;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageInterceptorStrategy[] getMessageInterceptorStrategies() {
|
||||||
|
return this.messageInterceptorStrategies;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,158 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.region.policy;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import jakarta.jms.MessageFormatRuntimeException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enforce message policies for JMS Header values
|
||||||
|
*
|
||||||
|
* @org.apache.xbean.XBean
|
||||||
|
*/
|
||||||
|
public class HeaderMessageInterceptorStrategy implements MessageInterceptorStrategy {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(HeaderMessageInterceptorStrategy.class);
|
||||||
|
|
||||||
|
boolean forceDeliveryMode = false;
|
||||||
|
|
||||||
|
boolean persistent = true;
|
||||||
|
|
||||||
|
boolean forceExpiration = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* variable which (when non-zero) is used to override
|
||||||
|
* the expiration date for messages that arrive with
|
||||||
|
* no expiration date set (in Milliseconds).
|
||||||
|
*/
|
||||||
|
long zeroExpirationOverride = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* variable which (when non-zero) is used to limit
|
||||||
|
* the expiration date (in Milliseconds).
|
||||||
|
*/
|
||||||
|
long expirationCeiling = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If true, the plugin will not update timestamp to past values
|
||||||
|
* False by default
|
||||||
|
*/
|
||||||
|
boolean futureOnly = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* if true, update timestamp even if message has passed through a network
|
||||||
|
* default false
|
||||||
|
*/
|
||||||
|
boolean processNetworkMessages = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* setter method for zeroExpirationOverride
|
||||||
|
*/
|
||||||
|
public void setZeroExpirationOverride(long ttl)
|
||||||
|
{
|
||||||
|
this.zeroExpirationOverride = ttl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* setter method for expirationCeiling
|
||||||
|
*/
|
||||||
|
public void setExpirationCeiling(long expirationCeiling)
|
||||||
|
{
|
||||||
|
this.expirationCeiling = expirationCeiling;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFutureOnly(boolean futureOnly) {
|
||||||
|
this.futureOnly = futureOnly;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProcessNetworkMessages(Boolean processNetworkMessages) {
|
||||||
|
this.processNetworkMessages = processNetworkMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(final ProducerBrokerExchange producerBrokerExchange, final Message message) throws MessageFormatRuntimeException {
|
||||||
|
if(!isProcessNetworkMessages() && producerBrokerExchange.getConnectionContext().isNetworkConnection()) {
|
||||||
|
// Message passed through a network and processNetworkMessages=true is not set
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(isForceExpiration()) {
|
||||||
|
if (message.getTimestamp() > 0 && !message.getDestination().isDLQ()) {
|
||||||
|
long oldExpiration = message.getExpiration();
|
||||||
|
long newTimeStamp = System.currentTimeMillis();
|
||||||
|
long timeToLive = zeroExpirationOverride;
|
||||||
|
long oldTimestamp = message.getTimestamp();
|
||||||
|
if (oldExpiration > 0) {
|
||||||
|
timeToLive = oldExpiration - oldTimestamp;
|
||||||
|
}
|
||||||
|
if (timeToLive > 0 && expirationCeiling > 0 && timeToLive > expirationCeiling) {
|
||||||
|
timeToLive = expirationCeiling;
|
||||||
|
}
|
||||||
|
long expiration = timeToLive + newTimeStamp;
|
||||||
|
// In the scenario that the Broker is behind the clients we never want to set the
|
||||||
|
// Timestamp and Expiration in the past
|
||||||
|
if(!futureOnly || (expiration > oldExpiration)) {
|
||||||
|
if (timeToLive > 0 && expiration > 0) {
|
||||||
|
message.setExpiration(expiration);
|
||||||
|
}
|
||||||
|
message.setTimestamp(newTimeStamp);
|
||||||
|
LOG.debug("Set message {} timestamp from {} to {}", message.getMessageId(), oldTimestamp, newTimeStamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(forceDeliveryMode) {
|
||||||
|
message.setPersistent(isPersistent());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setForceDeliveryMode(boolean forceDeliveryMode) {
|
||||||
|
this.forceDeliveryMode = forceDeliveryMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isForceDeliveryMode() {
|
||||||
|
return this.forceDeliveryMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setForceExpiration(boolean forceExpiration) {
|
||||||
|
this.forceExpiration = forceExpiration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isForceExpiration() {
|
||||||
|
return this.forceExpiration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPersistent(boolean persistent) {
|
||||||
|
this.persistent = persistent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPersistent() {
|
||||||
|
return this.persistent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProcessNetworkMessages(boolean processNetworkMessages) {
|
||||||
|
this.processNetworkMessages = processNetworkMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isProcessNetworkMessages() {
|
||||||
|
return this.processNetworkMessages;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,48 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.region.policy;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import jakarta.jms.MessageFormatRuntimeException;
|
||||||
|
|
||||||
|
public interface MessageInterceptorStrategy {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When a PolicyEntry is configured with a MessageInterceptorStrategy, the
|
||||||
|
* process method is invoked with the current ProducerBrokerExchange and Message before
|
||||||
|
* the message is stored in any destination cache or persistence store.
|
||||||
|
*
|
||||||
|
* Implementations may reference data from the ProducerBrokerExchange and may check or
|
||||||
|
* modify headers, properties, body or other metadata on the Message.
|
||||||
|
*
|
||||||
|
* Any change to the message must adhere to OpenWire and ActiveMQ requirements or risk
|
||||||
|
* issues with memory usage, compatibility, and general correct functioning.
|
||||||
|
*
|
||||||
|
* Implementations shall not copy, or clone the message.
|
||||||
|
*
|
||||||
|
* Implementations may throw a <tt>MessageFormatRuntimeException</tt>
|
||||||
|
* that is returned to the client to indicate a message should not be added to the queue.
|
||||||
|
*
|
||||||
|
* @param producerBrokerExchange
|
||||||
|
* @param message
|
||||||
|
* @return
|
||||||
|
* @throws MessageFormatRuntimeException
|
||||||
|
*/
|
||||||
|
void process(final ProducerBrokerExchange producerBrokerExchange, final Message message) throws MessageFormatRuntimeException;
|
||||||
|
|
||||||
|
}
|
@ -115,6 +115,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||||||
private int sendFailIfNoSpace = -1;
|
private int sendFailIfNoSpace = -1;
|
||||||
private long sendFailIfNoSpaceAfterTimeout = -1;
|
private long sendFailIfNoSpaceAfterTimeout = -1;
|
||||||
|
|
||||||
|
private MessageInterceptorStrategy messageInterceptorStrategy = null;
|
||||||
|
|
||||||
public void configure(Broker broker,Queue queue) {
|
public void configure(Broker broker,Queue queue) {
|
||||||
baseConfiguration(broker,queue);
|
baseConfiguration(broker,queue);
|
||||||
@ -139,6 +140,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||||||
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
|
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
|
||||||
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
|
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
|
||||||
queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
|
queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
|
||||||
|
queue.setMessageInterceptorStrategy(getMessageInterceptorStrategy());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void update(Queue queue) {
|
public void update(Queue queue) {
|
||||||
@ -201,6 +203,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||||||
topic.getMemoryUsage().setLimit(memoryLimit);
|
topic.getMemoryUsage().setLimit(memoryLimit);
|
||||||
}
|
}
|
||||||
topic.setLazyDispatch(isLazyDispatch());
|
topic.setLazyDispatch(isLazyDispatch());
|
||||||
|
topic.setMessageInterceptorStrategy(getMessageInterceptorStrategy());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void update(Topic topic) {
|
public void update(Topic topic) {
|
||||||
@ -1165,4 +1168,12 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||||||
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
|
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
|
||||||
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
|
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInterceptorStrategy) {
|
||||||
|
this.messageInterceptorStrategy = messageInterceptorStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageInterceptorStrategy getMessageInterceptorStrategy() {
|
||||||
|
return this.messageInterceptorStrategy;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,168 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.policy;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import jakarta.jms.BytesMessage;
|
||||||
|
import jakarta.jms.Connection;
|
||||||
|
import jakarta.jms.ConnectionFactory;
|
||||||
|
import jakarta.jms.JMSException;
|
||||||
|
import jakarta.jms.MessageFormatRuntimeException;
|
||||||
|
import jakarta.jms.MessageProducer;
|
||||||
|
import jakarta.jms.Queue;
|
||||||
|
import jakarta.jms.QueueBrowser;
|
||||||
|
import jakarta.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
|
import org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
||||||
|
import org.apache.activemq.TestSupport;
|
||||||
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This unit test is to test that MessageInterceptorStrategy features
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MessageInterceptorStrategyMemoryUsageTest extends TestSupport {
|
||||||
|
|
||||||
|
BrokerService broker;
|
||||||
|
ConnectionFactory factory;
|
||||||
|
Connection connection;
|
||||||
|
Session session;
|
||||||
|
MessageProducer producer;
|
||||||
|
QueueBrowser queueBrowser;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
broker = new BrokerService();
|
||||||
|
|
||||||
|
File testDataDir = new File("target/activemq-data/message-interceptor-strategy");
|
||||||
|
broker.setDataDirectoryFile(testDataDir);
|
||||||
|
broker.setUseJmx(true);
|
||||||
|
broker.setDeleteAllMessagesOnStartup(true);
|
||||||
|
broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
|
||||||
|
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
|
||||||
|
broker.addConnector("tcp://localhost:0");
|
||||||
|
broker.start();
|
||||||
|
factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
|
||||||
|
.get(0).getConnectUri().toString());
|
||||||
|
connection = factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if(producer != null) {
|
||||||
|
producer.close();
|
||||||
|
}
|
||||||
|
session.close();
|
||||||
|
connection.stop();
|
||||||
|
connection.close();
|
||||||
|
broker.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test sending messages that have body modified have correct usage
|
||||||
|
*
|
||||||
|
* Start with 10x 1k message bodies that get increased to 1mb
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMemoryUsageBodyIncrease() throws Exception {
|
||||||
|
applyHeaderMessageInterceptor(1*1024*1024);
|
||||||
|
String queueName = "mis.bodySize.increase";
|
||||||
|
Queue queue = createQueue(queueName);
|
||||||
|
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
BytesMessage sendMessageP = session.createBytesMessage();
|
||||||
|
byte[] origBody = new byte[1*1024];
|
||||||
|
sendMessageP.writeBytes(origBody);
|
||||||
|
producer.send(queue, sendMessageP);
|
||||||
|
}
|
||||||
|
|
||||||
|
QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
|
||||||
|
assertEquals(Long.valueOf(10_496_000l), Long.valueOf(queueViewMBean.getMemoryUsageByteCount()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test sending messages that have body modified have correct usage
|
||||||
|
*
|
||||||
|
* Start with 10x 1mb message bodies that get decreased to 1kb
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMemoryUsageBodyDecrease() throws Exception {
|
||||||
|
applyHeaderMessageInterceptor(1*1024);
|
||||||
|
String queueName = "mis.bodySize.decrease";
|
||||||
|
Queue queue = createQueue(queueName);
|
||||||
|
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
BytesMessage sendMessageP = session.createBytesMessage();
|
||||||
|
byte[] origBody = new byte[1*1024*1024];
|
||||||
|
sendMessageP.writeBytes(origBody);
|
||||||
|
producer.send(queue, sendMessageP);
|
||||||
|
}
|
||||||
|
|
||||||
|
QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
|
||||||
|
assertEquals(Long.valueOf(20_480), Long.valueOf(queueViewMBean.getMemoryUsageByteCount()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private PolicyMap applyHeaderMessageInterceptor(final int bodySize) {
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry defaultEntry = new PolicyEntry();
|
||||||
|
|
||||||
|
MessageInterceptorStrategy bodySizeMessageInterceptorStrategy= new MessageInterceptorStrategy() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) throws MessageFormatRuntimeException {
|
||||||
|
if(bodySize > 0) {
|
||||||
|
try {
|
||||||
|
message.clearBody();
|
||||||
|
} catch (JMSException e) {
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
byte[] newBody = new byte[bodySize];
|
||||||
|
new Random().nextBytes(newBody);
|
||||||
|
message.setContent(new ByteSequence(newBody));
|
||||||
|
message.storeContent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
defaultEntry.setMessageInterceptorStrategy(bodySizeMessageInterceptorStrategy);
|
||||||
|
|
||||||
|
policyMap.setDefaultEntry(defaultEntry);
|
||||||
|
broker.setDestinationPolicy(policyMap);
|
||||||
|
return policyMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Queue createQueue(String queueName) throws Exception {
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
producer = session.createProducer(queue);
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,291 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.policy;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
|
||||||
|
import jakarta.jms.Connection;
|
||||||
|
import jakarta.jms.ConnectionFactory;
|
||||||
|
import jakarta.jms.DeliveryMode;
|
||||||
|
import jakarta.jms.Message;
|
||||||
|
import jakarta.jms.MessageProducer;
|
||||||
|
import jakarta.jms.Queue;
|
||||||
|
import jakarta.jms.QueueBrowser;
|
||||||
|
import jakarta.jms.Session;
|
||||||
|
import jakarta.jms.Topic;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.policy.HeaderMessageInterceptorStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
||||||
|
import org.apache.activemq.test.TestSupport;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This unit test is to test that MessageInterceptorStrategy features
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MessageInterceptorStrategyTest extends TestSupport {
|
||||||
|
|
||||||
|
BrokerService broker;
|
||||||
|
ConnectionFactory factory;
|
||||||
|
Connection connection;
|
||||||
|
Session session;
|
||||||
|
MessageProducer producer;
|
||||||
|
QueueBrowser queueBrowser;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
broker = new BrokerService();
|
||||||
|
|
||||||
|
File testDataDir = new File("target/activemq-data/message-interceptor-strategy");
|
||||||
|
broker.setDataDirectoryFile(testDataDir);
|
||||||
|
broker.setUseJmx(true);
|
||||||
|
broker.setDeleteAllMessagesOnStartup(true);
|
||||||
|
broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
|
||||||
|
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
|
||||||
|
broker.addConnector("tcp://localhost:0");
|
||||||
|
broker.start();
|
||||||
|
factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
|
||||||
|
.get(0).getConnectUri().toString());
|
||||||
|
connection = factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if(producer != null) {
|
||||||
|
producer.close();
|
||||||
|
}
|
||||||
|
session.close();
|
||||||
|
connection.stop();
|
||||||
|
connection.close();
|
||||||
|
broker.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test sending messages can be forced to Persistent
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testForceDeliveryModePersistent() throws Exception {
|
||||||
|
applyHeaderMessageInterceptor(true, true, false, 0l, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
Queue queue = createQueue("mis.forceDeliveryMode.true");
|
||||||
|
Message sendMessageP = session.createTextMessage("forceDeliveryMode=true");
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
producer.send(queue, sendMessageP);
|
||||||
|
|
||||||
|
Message sendMessageNP = session.createTextMessage("forceDeliveryMode=true");
|
||||||
|
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||||
|
producer.send(queue, sendMessageNP);
|
||||||
|
|
||||||
|
queueBrowser = session.createBrowser(queue);
|
||||||
|
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while(browseEnumeration.hasMoreElements()) {
|
||||||
|
Message message = (Message)browseEnumeration.nextElement();
|
||||||
|
assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(Integer.valueOf(2), Integer.valueOf(count));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test sending messages can be forced to NonPersistent
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testForceDeliveryModeNonPersistent() throws Exception {
|
||||||
|
applyHeaderMessageInterceptor(true, false, false, 0l, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
Queue queue = createQueue("mis.forceDeliveryMode.false");
|
||||||
|
Message sendMessageP = session.createTextMessage("forceDeliveryMode=false");
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
producer.send(queue, sendMessageP);
|
||||||
|
|
||||||
|
Message sendMessageNP = session.createTextMessage("forceDeliveryMode=false");
|
||||||
|
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||||
|
producer.send(queue, sendMessageNP);
|
||||||
|
|
||||||
|
queueBrowser = session.createBrowser(queue);
|
||||||
|
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while(browseEnumeration.hasMoreElements()) {
|
||||||
|
Message message = (Message)browseEnumeration.nextElement();
|
||||||
|
assertEquals(DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(Integer.valueOf(2), Integer.valueOf(count));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test not overriding expiration
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testForceExpirationDisabled() throws Exception {
|
||||||
|
applyHeaderMessageInterceptor(false, false, false, 100_000l, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
Queue queue = createQueue("mis.forceExpiration.zero");
|
||||||
|
Message sendMessageP = session.createTextMessage("expiration=zero");
|
||||||
|
producer.setTimeToLive(0l);
|
||||||
|
producer.send(queue, sendMessageP);
|
||||||
|
|
||||||
|
queueBrowser = session.createBrowser(queue);
|
||||||
|
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while(browseEnumeration.hasMoreElements()) {
|
||||||
|
Message message = (Message)browseEnumeration.nextElement();
|
||||||
|
assertEquals(Long.valueOf(0l), Long.valueOf(message.getJMSExpiration()));
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(Integer.valueOf(1), Integer.valueOf(count));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overriding zero (0) expiration
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testForceExpirationZeroOverride() throws Exception {
|
||||||
|
long expiryTime = 100_000l;
|
||||||
|
applyHeaderMessageInterceptor(false, false, true, expiryTime, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
|
Queue queue = createQueue("mis.forceExpiration.100k");
|
||||||
|
Message sendMessageP = session.createTextMessage("expiration=zero");
|
||||||
|
producer.setTimeToLive(100_000l);
|
||||||
|
producer.send(queue, sendMessageP);
|
||||||
|
|
||||||
|
queueBrowser = session.createBrowser(queue);
|
||||||
|
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while(browseEnumeration.hasMoreElements()) {
|
||||||
|
Message message = (Message)browseEnumeration.nextElement();
|
||||||
|
assertTrue(Long.valueOf(message.getJMSExpiration()) > currentTime + (expiryTime / 2));
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(Integer.valueOf(1), Integer.valueOf(count));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overriding zero (0) expiration
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testForceExpirationZeroOverrideDLQ() throws Exception {
|
||||||
|
long expiryTime = 1l;
|
||||||
|
applyHeaderMessageInterceptor(false, false, true, expiryTime, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
Queue queue = createQueue("mis.forceExpiration.zero-no-dlq-expiry");
|
||||||
|
Message sendMessageP = session.createTextMessage("expiration=zero-no-dlq-expiry");
|
||||||
|
producer.send(queue, sendMessageP);
|
||||||
|
|
||||||
|
Thread.sleep(250l);
|
||||||
|
|
||||||
|
queueBrowser = session.createBrowser(queue);
|
||||||
|
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while(browseEnumeration.hasMoreElements()) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(Integer.valueOf(0), Integer.valueOf(count));
|
||||||
|
|
||||||
|
QueueBrowser dlqQueueBrowser = session.createBrowser(createQueue("mis.forceExpiration.zero-no-dlq-expiry.dlq"));
|
||||||
|
Enumeration<?> dlqBrowseEnumeration = dlqQueueBrowser.getEnumeration();
|
||||||
|
|
||||||
|
int dlqCount = 0;
|
||||||
|
while(dlqBrowseEnumeration.hasMoreElements()) {
|
||||||
|
Message dlqMessage = (Message)dlqBrowseEnumeration.nextElement();
|
||||||
|
assertEquals(sendMessageP.getJMSMessageID(), dlqMessage.getJMSMessageID());
|
||||||
|
assertEquals("Expiration should be zero" + dlqMessage.getJMSExpiration() + "\n", dlqMessage.getJMSExpiration(), 0);
|
||||||
|
dlqCount++;
|
||||||
|
}
|
||||||
|
assertEquals(Integer.valueOf(1), Integer.valueOf(dlqCount));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overriding expiration ceiling
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testForceExpirationCeilingOverride() throws Exception {
|
||||||
|
long zeroOverrideExpiryTime = 100_000l;
|
||||||
|
long expirationCeiling = Duration.ofDays(1).toMillis();
|
||||||
|
applyHeaderMessageInterceptor(false, false, true, zeroOverrideExpiryTime, expirationCeiling);
|
||||||
|
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
|
long expiryTime = Duration.ofDays(10).toMillis();
|
||||||
|
Queue queue = createQueue("mis.forceExpiration.maxValue");
|
||||||
|
Message sendMessageP = session.createTextMessage("expiration=ceiling");
|
||||||
|
producer.setTimeToLive(expiryTime);
|
||||||
|
producer.send(queue, sendMessageP);
|
||||||
|
|
||||||
|
queueBrowser = session.createBrowser(queue);
|
||||||
|
Enumeration<?> browseEnumeration = queueBrowser.getEnumeration();
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while(browseEnumeration.hasMoreElements()) {
|
||||||
|
Message message = (Message)browseEnumeration.nextElement();
|
||||||
|
assertTrue(Long.valueOf(message.getJMSExpiration()) < (currentTime + Duration.ofDays(9).toMillis()));
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(Integer.valueOf(1), Integer.valueOf(count));
|
||||||
|
}
|
||||||
|
|
||||||
|
private PolicyMap applyHeaderMessageInterceptor(boolean forceDeliveryMode, boolean persistent, boolean forceExpiration, long zeroExpirationOverride, long expirationCeiling) {
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry defaultEntry = new PolicyEntry();
|
||||||
|
|
||||||
|
HeaderMessageInterceptorStrategy headerMessageInterceptorStrategy = new HeaderMessageInterceptorStrategy();
|
||||||
|
|
||||||
|
// Persistence related fields
|
||||||
|
headerMessageInterceptorStrategy.setForceDeliveryMode(forceDeliveryMode);
|
||||||
|
headerMessageInterceptorStrategy.setPersistent(persistent);
|
||||||
|
|
||||||
|
// Expiration related fields
|
||||||
|
headerMessageInterceptorStrategy.setForceExpiration(forceExpiration);
|
||||||
|
headerMessageInterceptorStrategy.setZeroExpirationOverride(zeroExpirationOverride);
|
||||||
|
headerMessageInterceptorStrategy.setExpirationCeiling(expirationCeiling);
|
||||||
|
defaultEntry.setMessageInterceptorStrategy(headerMessageInterceptorStrategy);
|
||||||
|
|
||||||
|
IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy();
|
||||||
|
individualDeadLetterStrategy.setQueuePrefix("");
|
||||||
|
individualDeadLetterStrategy.setQueueSuffix(".dlq");
|
||||||
|
defaultEntry.setDeadLetterStrategy(individualDeadLetterStrategy);
|
||||||
|
|
||||||
|
policyMap.setDefaultEntry(defaultEntry);
|
||||||
|
broker.setDestinationPolicy(policyMap);
|
||||||
|
return policyMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Queue createQueue(String queueName) throws Exception {
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
producer = session.createProducer(queue);
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user