diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 5ab6737fd0..95c7e159a3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -59,6 +59,7 @@ public abstract class BaseDestination implements Destination { public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; public static final int MAX_PRODUCERS_TO_AUDIT = 64; public static final int MAX_AUDIT_DEPTH = 10000; + public static final String DUPLICATE_FROM_STORE_MSG_PREFIX = "duplicate from store for "; protected final AtomicBoolean started = new AtomicBoolean(); protected final ActiveMQDestination destination; @@ -881,16 +882,16 @@ public abstract class BaseDestination implements Destination { } @Override - public void duplicateFromStore(Message message, Subscription durableSub) { + public void duplicateFromStore(Message message, Subscription subscription) { ConnectionContext connectionContext = createConnectionContext(); - getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId()); - Throwable cause = new Throwable("duplicate from store for " + destination); + getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId()); + Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination); message.setRegionDestination(this); broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); messageAck.setPoisonCause(cause); try { - acknowledge(connectionContext, durableSub, messageAck, message); + acknowledge(connectionContext, subscription, messageAck, message); } catch (IOException e) { getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java index f270744d16..8f66890eb1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java @@ -38,6 +38,8 @@ import org.apache.activemq.state.ProducerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.broker.region.BaseDestination.DUPLICATE_FROM_STORE_MSG_PREFIX; + /** * Replace regular DLQ handling with redelivery via a resend to the original destination * after a delay @@ -128,8 +130,8 @@ public class RedeliveryPlugin extends BrokerPluginSupport { @Override public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) { - if (messageReference.isExpired()) { - // there are two uses of sendToDeadLetterQueue, we are only interested in valid messages + if (messageReference.isExpired() || (poisonCause != null && poisonCause.getMessage() != null && poisonCause.getMessage().contains(DUPLICATE_FROM_STORE_MSG_PREFIX))) { + // there are three uses of sendToDeadLetterQueue, we are only interested in valid messages return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); } else { try { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java index 9971e8efe5..90e33abbd3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java @@ -26,6 +26,8 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.broker.util.RedeliveryPlugin; @@ -129,6 +131,46 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data")); } + public void testNoScheduledRedeliveryOfDuplicates() throws Exception { + broker = createBroker(true); + + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setUseCache(false); // disable the cache such that duplicates are not suppressed on send + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + + ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection(); + consumerConnection.start(); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(destination); + + ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection(); + producerConnection.start(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(destination); + Message message = producerSession.createMessage(); + message.setStringProperty("data", data); + producer.send(message); + + message = consumer.receive(1000); + assertNotNull("got message", message); + message.acknowledge(); + + // send it again + // should go to dlq as a duplicate from the store + producerConnection.getTransport().request(message); + + // validate DLQ + MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); + Message dlqMessage = dlqConsumer.receive(4000); + assertNotNull("Got message from dql", dlqMessage); + assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data")); + } + private void sendMessage(int timeToLive) throws Exception { ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection(); producerConnection.start(); @@ -144,8 +186,16 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { } private void startBroker(boolean deleteMessages) throws Exception { + broker = createBroker(false); + if (deleteMessages) { + broker.setDeleteAllMessagesOnStartup(true); + } + broker.start(); + } + + private BrokerService createBroker(boolean persistent) throws Exception { broker = new BrokerService(); - broker.setPersistent(false); + broker.setPersistent(persistent); broker.setSchedulerSupport(true); RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin(); @@ -160,11 +210,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap); broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin}); - - if (deleteMessages) { - broker.setDeleteAllMessagesOnStartup(true); - } - broker.start(); + return broker; } private void stopBroker() throws Exception {