AMQ-7062 - have redelivery plugin ignore messages detected as duplicates

This commit is contained in:
gtully 2018-09-26 11:22:59 +01:00
parent cae66f5d37
commit 5246151288
3 changed files with 61 additions and 12 deletions

View File

@ -59,6 +59,7 @@ public abstract class BaseDestination implements Destination {
public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
public static final int MAX_PRODUCERS_TO_AUDIT = 64;
public static final int MAX_AUDIT_DEPTH = 10000;
public static final String DUPLICATE_FROM_STORE_MSG_PREFIX = "duplicate from store for ";
protected final AtomicBoolean started = new AtomicBoolean();
protected final ActiveMQDestination destination;
@ -881,16 +882,16 @@ public abstract class BaseDestination implements Destination {
}
@Override
public void duplicateFromStore(Message message, Subscription durableSub) {
public void duplicateFromStore(Message message, Subscription subscription) {
ConnectionContext connectionContext = createConnectionContext();
getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
Throwable cause = new Throwable("duplicate from store for " + destination);
getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId());
Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
message.setRegionDestination(this);
broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
messageAck.setPoisonCause(cause);
try {
acknowledge(connectionContext, durableSub, messageAck, message);
acknowledge(connectionContext, subscription, messageAck, message);
} catch (IOException e) {
getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck);
}

View File

@ -38,6 +38,8 @@ import org.apache.activemq.state.ProducerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.broker.region.BaseDestination.DUPLICATE_FROM_STORE_MSG_PREFIX;
/**
* Replace regular DLQ handling with redelivery via a resend to the original destination
* after a delay
@ -128,8 +130,8 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
if (messageReference.isExpired()) {
// there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
if (messageReference.isExpired() || (poisonCause != null && poisonCause.getMessage() != null && poisonCause.getMessage().contains(DUPLICATE_FROM_STORE_MSG_PREFIX))) {
// there are three uses of sendToDeadLetterQueue, we are only interested in valid messages
return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
} else {
try {

View File

@ -26,6 +26,8 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.util.RedeliveryPlugin;
@ -129,6 +131,46 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
}
public void testNoScheduledRedeliveryOfDuplicates() throws Exception {
broker = createBroker(true);
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setUseCache(false); // disable the cache such that duplicates are not suppressed on send
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policyEntry);
broker.setDestinationPolicy(policyMap);
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
consumerConnection.start();
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(destination);
ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
producerConnection.start();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
Message message = producerSession.createMessage();
message.setStringProperty("data", data);
producer.send(message);
message = consumer.receive(1000);
assertNotNull("got message", message);
message.acknowledge();
// send it again
// should go to dlq as a duplicate from the store
producerConnection.getTransport().request(message);
// validate DLQ
MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
Message dlqMessage = dlqConsumer.receive(4000);
assertNotNull("Got message from dql", dlqMessage);
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
}
private void sendMessage(int timeToLive) throws Exception {
ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
producerConnection.start();
@ -144,8 +186,16 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
}
private void startBroker(boolean deleteMessages) throws Exception {
broker = createBroker(false);
if (deleteMessages) {
broker.setDeleteAllMessagesOnStartup(true);
}
broker.start();
}
private BrokerService createBroker(boolean persistent) throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
broker.setPersistent(persistent);
broker.setSchedulerSupport(true);
RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
@ -160,11 +210,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
if (deleteMessages) {
broker.setDeleteAllMessagesOnStartup(true);
}
broker.start();
return broker;
}
private void stopBroker() throws Exception {