mirror of https://github.com/apache/activemq.git
resolve duplicate issue with concurrent durable subs and dlq, suppress the duplicates in the default Dead letter strategy. With no duplicates, concurrentStoreAndDispatchQueues true is fine. resolve https://issues.apache.org/activemq/browse/AMQ-2584
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1030013 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0c3117aac5
commit
66a945a8f1
|
@ -832,7 +832,8 @@ public class RegionBroker extends EmptyBroker {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Expired message with no DLQ strategy in place");
|
LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
|
||||||
|
+ message.getMessageId() + ", destination: " + message.getDestination());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.region.policy;
|
package org.apache.activemq.broker.region.policy;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQMessageAudit;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A strategy for choosing which destination is used for dead letter queue
|
* A strategy for choosing which destination is used for dead letter queue
|
||||||
|
@ -25,13 +28,21 @@ import org.apache.activemq.command.Message;
|
||||||
* @version $Revision: 426366 $
|
* @version $Revision: 426366 $
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
|
public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
|
||||||
|
private static final Log LOG = LogFactory.getLog(AbstractDeadLetterStrategy.class);
|
||||||
private boolean processNonPersistent = false;
|
private boolean processNonPersistent = false;
|
||||||
private boolean processExpired = true;
|
private boolean processExpired = true;
|
||||||
|
private ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
|
||||||
|
|
||||||
public boolean isSendToDeadLetterQueue(Message message) {
|
public boolean isSendToDeadLetterQueue(Message message) {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
result = true;
|
result = true;
|
||||||
|
if (audit.isDuplicate(message)) {
|
||||||
|
result = false;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Not adding duplicate to DLQ: " + message.getMessageId() + ", dest: " + message.getDestination());
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!message.isPersistent() && !processNonPersistent) {
|
if (!message.isPersistent() && !processNonPersistent) {
|
||||||
result = false;
|
result = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,7 +224,12 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport {
|
||||||
properties.put("maxFileLength", maxFileLengthVal);
|
properties.put("maxFileLength", maxFileLengthVal);
|
||||||
properties.put("cleanupInterval", "2000");
|
properties.put("cleanupInterval", "2000");
|
||||||
properties.put("checkpointInterval", "2000");
|
properties.put("checkpointInterval", "2000");
|
||||||
properties.put("concurrentStoreAndDispatchQueues", "false");
|
// there are problems with duplicate dispatch in the cursor, which maintain
|
||||||
|
// a map of messages. A dup dispatch can be dropped.
|
||||||
|
// see: org.apache.activemq.broker.region.cursors.OrderedPendingList
|
||||||
|
// Adding duplicate detection to the default DLQ strategy removes the problem
|
||||||
|
// which means we can leave the default for concurrent store and dispatch q
|
||||||
|
//properties.put("concurrentStoreAndDispatchQueues", "false");
|
||||||
|
|
||||||
IntrospectionSupport.setProperties(persistenceAdapter, properties);
|
IntrospectionSupport.setProperties(persistenceAdapter, properties);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue