[AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)

- Default 'true' to match existing behavior
 - Added counter to DestinationView
This commit is contained in:
Matt Pavlovich 2021-12-20 08:38:32 -06:00
parent f2dbc92743
commit 6a25d654f2
7 changed files with 83 additions and 2 deletions

View File

@ -102,6 +102,11 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getDispatched().getCount();
}
@Override
public long getDuplicateFromStoreCount() {
return destination.getDestinationStatistics().getDuplicateFromStore().getCount();
}
@Override
public long getInFlightCount() {
return destination.getDestinationStatistics().getInflight().getCount();
@ -570,4 +575,8 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
}
@Override
public boolean isSendDuplicateFromStoreToDLQ() {
return destination.isSendDuplicateFromStoreToDLQ();
}
}

View File

@ -69,6 +69,26 @@ public interface DestinationViewMBean {
@MBeanInfo("Number of messages that has been acknowledged (and removed) from the destination.")
long getDequeueCount();
/**
* Returns the number of duplicate messages that have been paged-in
* from the store.
*
* @return The number of duplicate messages that have been paged-in
* from the store.
*/
@MBeanInfo("Number of duplicate messages that have been paged-in from the store.")
long getDuplicateFromStoreCount();
/**
* Returns the config setting to send a duplicate message from store
* to the dead letter queue.
*
* @return The config setting to send a duplicate message from store
* to the dead letter queue.
*/
@MBeanInfo("Config setting to send a duplicate from store message to the dead letter queue.")
boolean isSendDuplicateFromStoreToDLQ();
/**
* Returns the number of messages that have been acknowledged by network subscriptions from the
* destination.

View File

@ -88,6 +88,7 @@ public abstract class BaseDestination implements Destination {
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
private boolean sendAdvisoryIfNoConsumers;
private boolean sendDuplicateFromStoreToDLQ = true;
private boolean includeBodyForAdvisory;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService;
@ -477,6 +478,14 @@ public abstract class BaseDestination implements Destination {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
public boolean isSendDuplicateFromStoreToDLQ() {
return this.sendDuplicateFromStoreToDLQ;
}
public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ;
}
public boolean isIncludeBodyForAdvisory() {
return includeBodyForAdvisory;
}
@ -889,11 +898,14 @@ public abstract class BaseDestination implements Destination {
@Override
public void duplicateFromStore(Message message, Subscription subscription) {
destinationStatistics.getDuplicateFromStore().increment();
ConnectionContext connectionContext = createConnectionContext();
getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId());
Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
message.setRegionDestination(this);
broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
if(this.isSendDuplicateFromStoreToDLQ()) {
broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
}
MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1);
messageAck.setPoisonCause(cause);
try {

View File

@ -245,4 +245,8 @@ public interface Destination extends Service, Task, Message.MessageDestination {
public void clearPendingMessages(int pendingAdditionsCount);
void duplicateFromStore(Message message, Subscription subscription);
boolean isSendDuplicateFromStoreToDLQ();
void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
}

View File

@ -394,6 +394,16 @@ public class DestinationFilter implements Destination {
next.duplicateFromStore(message, subscription);
}
@Override
public boolean isSendDuplicateFromStoreToDLQ() {
return next.isSendDuplicateFromStoreToDLQ();
}
@Override
public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;

View File

@ -37,6 +37,7 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
protected CountStatisticImpl duplicateFromStore;
protected CountStatisticImpl inflight;
protected CountStatisticImpl expired;
protected TimeStatisticImpl processTime;
@ -50,6 +51,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
duplicateFromStore = new CountStatisticImpl("duplicateFromStore", "The number of duplicate messages that have been paged-in from the store for this destination");
forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination");
inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
expired = new CountStatisticImpl("expired", "The number of messages that have expired");
@ -68,6 +70,7 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
addStatistic("duplicateFromStore", duplicateFromStore);
addStatistic("inflight", inflight);
addStatistic("expired", expired);
addStatistic("consumers", consumers);
@ -124,6 +127,10 @@ public class DestinationStatistics extends StatsImpl {
return dispatched;
}
public CountStatisticImpl getDuplicateFromStore() {
return duplicateFromStore;
}
public TimeStatisticImpl getProcessTime() {
return this.processTime;
}
@ -145,6 +152,7 @@ public class DestinationStatistics extends StatsImpl {
dequeues.reset();
forwards.reset();
dispatched.reset();
duplicateFromStore.reset();
inflight.reset();
expired.reset();
blockedSends.reset();
@ -158,6 +166,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setEnabled(enabled);
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
duplicateFromStore.setEnabled(enabled);
forwards.setEnabled(enabled);
inflight.setEnabled(enabled);
expired.setEnabled(true);
@ -177,6 +186,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
duplicateFromStore.setParent(parent.duplicateFromStore);
forwards.setParent(parent.forwards);
inflight.setParent(parent.inflight);
expired.setParent(parent.expired);
@ -192,6 +202,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(null);
dispatched.setParent(null);
dequeues.setParent(null);
duplicateFromStore.setParent(null);
forwards.setParent(null);
inflight.setParent(null);
expired.setParent(null);

View File

@ -51,6 +51,7 @@ public class PolicyEntry extends DestinationMapEntry {
private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
private boolean sendDuplicateFromStoreToDLQ = true;
private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
private PendingMessageLimitStrategy pendingMessageLimitStrategy;
private MessageEvictionStrategy messageEvictionStrategy;
@ -241,7 +242,6 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("maxBrowsePageSize", includedProperties)) {
destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
}
if (isUpdate("minimumMessageSize", includedProperties)) {
destination.setMinimumMessageSize((int) getMinimumMessageSize());
}
@ -296,6 +296,9 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
}
if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
}
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
@ -456,6 +459,18 @@ public class PolicyEntry extends DestinationMapEntry {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
public boolean isSendDuplicateFromStoreToDLQ() {
return sendDuplicateFromStoreToDLQ;
}
/**
* Sends a copy of message to DLQ if a duplicate messages are paged-in from
* the messages store
*/
public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ;
}
public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy;
}