diff --git a/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java index e5e11355ec..c53b3d5892 100644 --- a/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java @@ -18,28 +18,31 @@ package org.apache.activemq; import java.io.Serializable; import java.util.Random; +import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.util.IntrospectionSupport; /** - * Configuration options used to control how messages are re-delivered when they + * Configuration options for a messageConsumer used to control how messages are re-delivered when they * are rolled back. + * May be used server side on a per destination basis via the Broker RedeliveryPlugin * * @org.apache.xbean.XBean element="redeliveryPolicy" * */ -public class RedeliveryPolicy implements Cloneable, Serializable { +public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable { public static final int NO_MAXIMUM_REDELIVERIES = -1; private static Random randomNumberGenerator; // +/-15% for a 30% spread -cgs - private double collisionAvoidanceFactor = 0.15d; - private int maximumRedeliveries = 6; - private long maximumRedeliveryDelay = -1; - private long initialRedeliveryDelay = 1000L; - private boolean useCollisionAvoidance; - private boolean useExponentialBackOff; - private double backOffMultiplier = 5.0; - private long redeliveryDelay = initialRedeliveryDelay; + protected double collisionAvoidanceFactor = 0.15d; + protected int maximumRedeliveries = 6; + protected long maximumRedeliveryDelay = -1; + protected long initialRedeliveryDelay = 1000L; + protected boolean useCollisionAvoidance; + protected boolean useExponentialBackOff; + protected double backOffMultiplier = 5.0; + protected long redeliveryDelay = initialRedeliveryDelay; public RedeliveryPolicy() { } @@ -150,4 +153,9 @@ public class RedeliveryPolicy implements Cloneable, Serializable { public long getRedeliveryDelay() { return redeliveryDelay; } + + @Override + public String toString() { + return IntrospectionSupport.toString(this, DestinationMapEntry.class, null); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java new file mode 100644 index 0000000000..098c317cf1 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RedeliveryPolicyMap.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import java.util.List; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.filter.DestinationMap; +import org.apache.activemq.filter.DestinationMapEntry; + +/** + * Represents a destination based configuration of policies so that individual + * destinations or wildcard hierarchies of destinations can be configured using + * different policies. + * + * @org.apache.xbean.XBean + * + * + */ +public class RedeliveryPolicyMap extends DestinationMap { + + private RedeliveryPolicy defaultEntry; + + public RedeliveryPolicy getEntryFor(ActiveMQDestination destination) { + RedeliveryPolicy answer = (RedeliveryPolicy) chooseValue(destination); + if (answer == null) { + answer = getDefaultEntry(); + } + return answer; + } + + /** + * Sets the individual entries on the redeliveryPolicyMap + * + * @org.apache.xbean.ElementType class="org.apache.activemq.RedeliveryPolicy" + */ + public void setRedeliveryPolicyEntries(List entries) { + super.setEntries(entries); + } + + public RedeliveryPolicy getDefaultEntry() { + return defaultEntry; + } + + public void setDefaultEntry(RedeliveryPolicy defaultEntry) { + this.defaultEntry = defaultEntry; + } + + protected Class getEntryClass() { + return RedeliveryPolicy.class; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java new file mode 100644 index 0000000000..04244b548b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util; + +import java.io.IOException; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.filter.AnyDestination; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.util.BrokerSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Replace regular DLQ handling with redelivery via a resend to the original destination + * after a delay + * A destination matching RedeliveryPolicy controls the quantity and delay for re-sends + * If there is no matching policy or an existing policy limit is exceeded by default + * regular DLQ processing resumes. This is controlled via sendToDlqIfMaxRetriesExceeded + * and fallbackToDeadLetter + * + * @org.apache.xbean.XBean element="redeliveryPlugin" + */ +public class RedeliveryPlugin extends BrokerPluginSupport { + private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPlugin.class); + public static final String REDELIVERY_DELAY = "redeliveryDelay"; + + RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); + boolean sendToDlqIfMaxRetriesExceeded = true; + private boolean fallbackToDeadLetter = true; + + @Override + public Broker installPlugin(Broker broker) throws Exception { + if (!broker.getBrokerService().isSchedulerSupport()) { + throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker"); + } + validatePolicyDelay(1000); + return super.installPlugin(broker); + } + + /* + * sending to dlq is called as part of a poison ack processing, before the message is acknowledged and removed + * by the destination so a delay is vital to avoid resending before it has been consumed + */ + private void validatePolicyDelay(long limit) { + final ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}); + for (Object entry : redeliveryPolicyMap.get(matchAll)) { + RedeliveryPolicy redeliveryPolicy = (RedeliveryPolicy) entry; + validateLimit(limit, redeliveryPolicy); + } + RedeliveryPolicy defaultEntry = redeliveryPolicyMap.getDefaultEntry(); + if (defaultEntry != null) { + validateLimit(limit, defaultEntry); + } + } + + private void validateLimit(long limit, RedeliveryPolicy redeliveryPolicy) { + if (redeliveryPolicy.getInitialRedeliveryDelay() < limit) { + throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy); + } + if (redeliveryPolicy.getRedeliveryDelay() < limit) { + throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy); + } + } + + public RedeliveryPolicyMap getRedeliveryPolicyMap() { + return redeliveryPolicyMap; + } + + public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { + this.redeliveryPolicyMap = redeliveryPolicyMap; + } + + public boolean isSendToDlqIfMaxRetriesExceeded() { + return sendToDlqIfMaxRetriesExceeded; + } + + /** + * What to do if the maxretries on a matching redelivery policy is exceeded. + * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue + * when false, there is no action + * @param sendToDlqIfMaxRetriesExceeded + */ + public void setSendToDlqIfMaxRetriesExceeded(boolean sendToDlqIfMaxRetriesExceeded) { + this.sendToDlqIfMaxRetriesExceeded = sendToDlqIfMaxRetriesExceeded; + } + + public boolean isFallbackToDeadLetter() { + return fallbackToDeadLetter; + } + + /** + * What to do if there is no matching redelivery policy for a destination. + * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue + * when false, there is no action + * @param fallbackToDeadLetter + */ + public void setFallbackToDeadLetter(boolean fallbackToDeadLetter) { + this.fallbackToDeadLetter = fallbackToDeadLetter; + } + + @Override + public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) { + if (next.get().isExpired(messageReference)) { + // there are two uses of sendToDeadLetterQueue, we are only interested in valid messages + super.sendToDeadLetterQueue(context, messageReference, subscription); + } else { + try { + final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(messageReference.getRegionDestination().getActiveMQDestination()); + if (redeliveryPolicy != null) { + int redeliveryCount = messageReference.getRedeliveryCounter(); + if (redeliveryCount < redeliveryPolicy.getMaximumRedeliveries()) { + + long delay = ( redeliveryCount == 0 ? + redeliveryPolicy.getInitialRedeliveryDelay() : + redeliveryPolicy.getNextRedeliveryDelay(getExistingDelay(messageReference))); + + scheduleRedelivery(context, messageReference, delay, ++redeliveryCount); + } else if (isSendToDlqIfMaxRetriesExceeded()) { + super.sendToDeadLetterQueue(context, messageReference, subscription); + } else { + LOG.debug("Discarding message that exceeds max redelivery count, " + messageReference.getMessageId()); + } + } else if (isFallbackToDeadLetter()) { + super.sendToDeadLetterQueue(context, messageReference, subscription); + } else { + LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + messageReference.getRegionDestination().getActiveMQDestination()); + } + } catch (Exception exception) { + // abort the ack, will be effective if client use transactions or individual ack with sync send + RuntimeException toThrow = new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception); + LOG.error(toThrow.toString(), exception); + throw toThrow; + } + } + } + + private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("redelivery #" + redeliveryCount + " of: " + messageReference.getMessageId() + " with delay: " + + delay + ", dest: " + messageReference.getRegionDestination().getActiveMQDestination()); + } + final Message old = messageReference.getMessage(); + Message message = old.copy(); + + message.setTransactionId(null); + message.setMemoryUsage(null); + message.setMarshalledProperties(null); + message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID); + + message.setProperty(REDELIVERY_DELAY, delay); + message.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); + message.setRedeliveryCounter(redeliveryCount); + + boolean originalFlowControl = context.isProducerFlowControl(); + try { + context.setProducerFlowControl(false); + ProducerInfo info = new ProducerInfo(); + ProducerState state = new ProducerState(info); + ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setProducerState(state); + producerExchange.setMutable(true); + producerExchange.setConnectionContext(context); + context.getBroker().send(producerExchange, message); + } finally { + context.setProducerFlowControl(originalFlowControl); + } + } + + private int getExistingDelay(MessageReference messageReference) throws IOException { + Object val = messageReference.getMessage().getProperty(REDELIVERY_DELAY); + if (val instanceof Long) { + return ((Long)val).intValue(); + } + return 0; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java b/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java index 870f2a0d84..6c9c31bf72 100644 --- a/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/filter/DestinationMapEntry.java @@ -29,7 +29,7 @@ import org.apache.activemq.command.*; */ public abstract class DestinationMapEntry implements Comparable { - private ActiveMQDestination destination; + protected ActiveMQDestination destination; public int compareTo(Object that) { if (that instanceof DestinationMapEntry) { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java new file mode 100644 index 0000000000..5d6c17e3de --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.concurrent.TimeUnit; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.broker.util.RedeliveryPlugin; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { + + static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class); + BrokerService broker = null; + + final ActiveMQQueue destination = new ActiveMQQueue("Redelivery"); + final String data = "hi"; + final long redeliveryDelayMillis = 2000; + final int maxBrokerRedeliveries = 2; + + public void testScheduledRedelivery() throws Exception { + + sendMessage(); + + ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection(); + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setInitialRedeliveryDelay(0); + redeliveryPolicy.setMaximumRedeliveries(0); + consumerConnection.setRedeliveryPolicy(redeliveryPolicy); + consumerConnection.start(); + Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumerSession.createConsumer(destination); + Message message = consumer.receive(1000); + assertNotNull("got message", message); + LOG.info("got: " + message); + consumerSession.rollback(); + + for (int i=0;i