implement https://issues.apache.org/jira/browse/AMQ-3894 - broker based redelivery via schedular resend and a per destination redelivery policy, plugin and tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1352902 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-06-22 14:31:40 +00:00
parent dc35766d0e
commit ac8c8d1bc9
6 changed files with 503 additions and 11 deletions

View File

@ -18,28 +18,31 @@ package org.apache.activemq;
import java.io.Serializable;
import java.util.Random;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.util.IntrospectionSupport;
/**
* Configuration options used to control how messages are re-delivered when they
* Configuration options for a messageConsumer used to control how messages are re-delivered when they
* are rolled back.
* May be used server side on a per destination basis via the Broker RedeliveryPlugin
*
* @org.apache.xbean.XBean element="redeliveryPolicy"
*
*/
public class RedeliveryPolicy implements Cloneable, Serializable {
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
public static final int NO_MAXIMUM_REDELIVERIES = -1;
private static Random randomNumberGenerator;
// +/-15% for a 30% spread -cgs
private double collisionAvoidanceFactor = 0.15d;
private int maximumRedeliveries = 6;
private long maximumRedeliveryDelay = -1;
private long initialRedeliveryDelay = 1000L;
private boolean useCollisionAvoidance;
private boolean useExponentialBackOff;
private double backOffMultiplier = 5.0;
private long redeliveryDelay = initialRedeliveryDelay;
protected double collisionAvoidanceFactor = 0.15d;
protected int maximumRedeliveries = 6;
protected long maximumRedeliveryDelay = -1;
protected long initialRedeliveryDelay = 1000L;
protected boolean useCollisionAvoidance;
protected boolean useExponentialBackOff;
protected double backOffMultiplier = 5.0;
protected long redeliveryDelay = initialRedeliveryDelay;
public RedeliveryPolicy() {
}
@ -150,4 +153,9 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
public long getRedeliveryDelay() {
return redeliveryDelay;
}
@Override
public String toString() {
return IntrospectionSupport.toString(this, DestinationMapEntry.class, null);
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.List;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.filter.DestinationMapEntry;
/**
* Represents a destination based configuration of policies so that individual
* destinations or wildcard hierarchies of destinations can be configured using
* different policies.
*
* @org.apache.xbean.XBean
*
*
*/
public class RedeliveryPolicyMap extends DestinationMap {
private RedeliveryPolicy defaultEntry;
public RedeliveryPolicy getEntryFor(ActiveMQDestination destination) {
RedeliveryPolicy answer = (RedeliveryPolicy) chooseValue(destination);
if (answer == null) {
answer = getDefaultEntry();
}
return answer;
}
/**
* Sets the individual entries on the redeliveryPolicyMap
*
* @org.apache.xbean.ElementType class="org.apache.activemq.RedeliveryPolicy"
*/
public void setRedeliveryPolicyEntries(List entries) {
super.setEntries(entries);
}
public RedeliveryPolicy getDefaultEntry() {
return defaultEntry;
}
public void setDefaultEntry(RedeliveryPolicy defaultEntry) {
this.defaultEntry = defaultEntry;
}
protected Class<? extends DestinationMapEntry> getEntryClass() {
return RedeliveryPolicy.class;
}
}

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import java.io.IOException;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.util.BrokerSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Replace regular DLQ handling with redelivery via a resend to the original destination
* after a delay
* A destination matching RedeliveryPolicy controls the quantity and delay for re-sends
* If there is no matching policy or an existing policy limit is exceeded by default
* regular DLQ processing resumes. This is controlled via sendToDlqIfMaxRetriesExceeded
* and fallbackToDeadLetter
*
* @org.apache.xbean.XBean element="redeliveryPlugin"
*/
public class RedeliveryPlugin extends BrokerPluginSupport {
private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPlugin.class);
public static final String REDELIVERY_DELAY = "redeliveryDelay";
RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
boolean sendToDlqIfMaxRetriesExceeded = true;
private boolean fallbackToDeadLetter = true;
@Override
public Broker installPlugin(Broker broker) throws Exception {
if (!broker.getBrokerService().isSchedulerSupport()) {
throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker");
}
validatePolicyDelay(1000);
return super.installPlugin(broker);
}
/*
* sending to dlq is called as part of a poison ack processing, before the message is acknowledged and removed
* by the destination so a delay is vital to avoid resending before it has been consumed
*/
private void validatePolicyDelay(long limit) {
final ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
for (Object entry : redeliveryPolicyMap.get(matchAll)) {
RedeliveryPolicy redeliveryPolicy = (RedeliveryPolicy) entry;
validateLimit(limit, redeliveryPolicy);
}
RedeliveryPolicy defaultEntry = redeliveryPolicyMap.getDefaultEntry();
if (defaultEntry != null) {
validateLimit(limit, defaultEntry);
}
}
private void validateLimit(long limit, RedeliveryPolicy redeliveryPolicy) {
if (redeliveryPolicy.getInitialRedeliveryDelay() < limit) {
throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
}
if (redeliveryPolicy.getRedeliveryDelay() < limit) {
throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
}
}
public RedeliveryPolicyMap getRedeliveryPolicyMap() {
return redeliveryPolicyMap;
}
public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
this.redeliveryPolicyMap = redeliveryPolicyMap;
}
public boolean isSendToDlqIfMaxRetriesExceeded() {
return sendToDlqIfMaxRetriesExceeded;
}
/**
* What to do if the maxretries on a matching redelivery policy is exceeded.
* when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
* when false, there is no action
* @param sendToDlqIfMaxRetriesExceeded
*/
public void setSendToDlqIfMaxRetriesExceeded(boolean sendToDlqIfMaxRetriesExceeded) {
this.sendToDlqIfMaxRetriesExceeded = sendToDlqIfMaxRetriesExceeded;
}
public boolean isFallbackToDeadLetter() {
return fallbackToDeadLetter;
}
/**
* What to do if there is no matching redelivery policy for a destination.
* when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
* when false, there is no action
* @param fallbackToDeadLetter
*/
public void setFallbackToDeadLetter(boolean fallbackToDeadLetter) {
this.fallbackToDeadLetter = fallbackToDeadLetter;
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
if (next.get().isExpired(messageReference)) {
// there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
super.sendToDeadLetterQueue(context, messageReference, subscription);
} else {
try {
final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(messageReference.getRegionDestination().getActiveMQDestination());
if (redeliveryPolicy != null) {
int redeliveryCount = messageReference.getRedeliveryCounter();
if (redeliveryCount < redeliveryPolicy.getMaximumRedeliveries()) {
long delay = ( redeliveryCount == 0 ?
redeliveryPolicy.getInitialRedeliveryDelay() :
redeliveryPolicy.getNextRedeliveryDelay(getExistingDelay(messageReference)));
scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
} else if (isSendToDlqIfMaxRetriesExceeded()) {
super.sendToDeadLetterQueue(context, messageReference, subscription);
} else {
LOG.debug("Discarding message that exceeds max redelivery count, " + messageReference.getMessageId());
}
} else if (isFallbackToDeadLetter()) {
super.sendToDeadLetterQueue(context, messageReference, subscription);
} else {
LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + messageReference.getRegionDestination().getActiveMQDestination());
}
} catch (Exception exception) {
// abort the ack, will be effective if client use transactions or individual ack with sync send
RuntimeException toThrow = new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
LOG.error(toThrow.toString(), exception);
throw toThrow;
}
}
}
private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("redelivery #" + redeliveryCount + " of: " + messageReference.getMessageId() + " with delay: "
+ delay + ", dest: " + messageReference.getRegionDestination().getActiveMQDestination());
}
final Message old = messageReference.getMessage();
Message message = old.copy();
message.setTransactionId(null);
message.setMemoryUsage(null);
message.setMarshalledProperties(null);
message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
message.setProperty(REDELIVERY_DELAY, delay);
message.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setRedeliveryCounter(redeliveryCount);
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
ProducerInfo info = new ProducerInfo();
ProducerState state = new ProducerState(info);
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setProducerState(state);
producerExchange.setMutable(true);
producerExchange.setConnectionContext(context);
context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
}
private int getExistingDelay(MessageReference messageReference) throws IOException {
Object val = messageReference.getMessage().getProperty(REDELIVERY_DELAY);
if (val instanceof Long) {
return ((Long)val).intValue();
}
return 0;
}
}

View File

@ -29,7 +29,7 @@ import org.apache.activemq.command.*;
*/
public abstract class DestinationMapEntry<T> implements Comparable<T> {
private ActiveMQDestination destination;
protected ActiveMQDestination destination;
public int compareTo(Object that) {
if (that instanceof DestinationMapEntry) {

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import java.util.concurrent.TimeUnit;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.util.RedeliveryPlugin;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class);
BrokerService broker = null;
final ActiveMQQueue destination = new ActiveMQQueue("Redelivery");
final String data = "hi";
final long redeliveryDelayMillis = 2000;
final int maxBrokerRedeliveries = 2;
public void testScheduledRedelivery() throws Exception {
sendMessage();
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setMaximumRedeliveries(0);
consumerConnection.setRedeliveryPolicy(redeliveryPolicy);
consumerConnection.start();
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
Message message = consumer.receive(1000);
assertNotNull("got message", message);
LOG.info("got: " + message);
consumerSession.rollback();
for (int i=0;i<maxBrokerRedeliveries;i++) {
Message shouldBeNull = consumer.receive(500);
assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull);
TimeUnit.SECONDS.sleep(3);
Message brokerRedeliveryMessage = consumer.receive(500);
LOG.info("got: " + brokerRedeliveryMessage);
assertNotNull("got message via broker redelivery after delay", brokerRedeliveryMessage);
assertEquals("message matches", message.getStringProperty("data"), brokerRedeliveryMessage.getStringProperty("data"));
assertEquals("has expiryDelay specified", redeliveryDelayMillis, brokerRedeliveryMessage.getLongProperty(RedeliveryPlugin.REDELIVERY_DELAY));
consumerSession.rollback();
}
// validate DLQ
MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
Message dlqMessage = dlqConsumer.receive(2000);
assertNotNull("Got message from dql", dlqMessage);
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
consumerSession.commit();
}
private void sendMessage() throws Exception {
ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
producerConnection.start();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
Message message = producerSession.createMessage();
message.setStringProperty("data", data);
producer.send(message);
producerConnection.close();
}
private void startBroker(boolean deleteMessages) throws Exception {
broker = new BrokerService();
broker.setSchedulerSupport(true);
RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
RedeliveryPolicy brokerRedeliveryPolicy = new RedeliveryPolicy();
brokerRedeliveryPolicy.setRedeliveryDelay(redeliveryDelayMillis);
brokerRedeliveryPolicy.setInitialRedeliveryDelay(redeliveryDelayMillis);
brokerRedeliveryPolicy.setMaximumRedeliveries(maxBrokerRedeliveries);
RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
redeliveryPolicyMap.setDefaultEntry(brokerRedeliveryPolicy);
redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
if (deleteMessages) {
broker.setDeleteAllMessagesOnStartup(true);
}
broker.start();
}
private void stopBroker() throws Exception {
if (broker != null)
broker.stop();
broker = null;
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://localhost");
}
@Override
protected void setUp() throws Exception {
super.setUp();
startBroker(true);
}
@Override
protected void tearDown() throws Exception {
stopBroker();
super.tearDown();
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import junit.framework.TestCase;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ErrorBroker;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedeliveryPluginTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPluginTest.class);
RedeliveryPlugin underTest = new RedeliveryPlugin();
public void testInstallPluginValidation() throws Exception {
RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
RedeliveryPolicy defaultEntry = new RedeliveryPolicy();
defaultEntry.setInitialRedeliveryDelay(500);
redeliveryPolicyMap.setDefaultEntry(defaultEntry);
underTest.setRedeliveryPolicyMap(redeliveryPolicyMap);
final BrokerService brokerService = new BrokerService();
brokerService.setSchedulerSupport(false);
Broker broker = new ErrorBroker("hi") {
@Override
public BrokerService getBrokerService() {
return brokerService;
}
};
try {
underTest.installPlugin(broker);
fail("expect exception on no scheduler support");
} catch (Exception expected) {
LOG.info("expected: " + expected);
}
brokerService.setSchedulerSupport(true);
try {
underTest.installPlugin(broker);
fail("expect exception on small initial delay");
} catch (Exception expected) {
LOG.info("expected: " + expected);
}
defaultEntry.setInitialRedeliveryDelay(5000);
defaultEntry.setRedeliveryDelay(500);
brokerService.setSchedulerSupport(true);
try {
underTest.installPlugin(broker);
fail("expect exception on small redelivery delay");
} catch (Exception expected) {
LOG.info("expected: " + expected);
}
}
}