AMQ-8055 - rename posion to correct poison

This commit is contained in:
Benjamin Graf 2020-10-23 09:50:18 +02:00
parent fa8b4c5215
commit a2e718f259
8 changed files with 20 additions and 20 deletions

View File

@ -280,7 +280,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
// the DLQ. If a custom redelivery policy is used on the broker the message // the DLQ. If a custom redelivery policy is used on the broker the message
// can still be redelivered based on the configation of that policy. // can still be redelivered based on the configation of that policy.
LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state); LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state);
settle(delivery, MessageAck.POSION_ACK_TYPE); settle(delivery, MessageAck.POISON_ACK_TYPE);
} else if (state instanceof Released) { } else if (state instanceof Released) {
LOG.trace("onDelivery: Released state = {}", state); LOG.trace("onDelivery: Released state = {}", state);
// re-deliver && don't increment the counter. // re-deliver && don't increment the counter.
@ -297,7 +297,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (undeliverableHere != null && undeliverableHere) { if (undeliverableHere != null && undeliverableHere) {
// receiver does not want the message.. // receiver does not want the message..
// perhaps we should DLQ it? // perhaps we should DLQ it?
ackType = MessageAck.POSION_ACK_TYPE; ackType = MessageAck.POISON_ACK_TYPE;
} }
settle(delivery, ackType); settle(delivery, ackType);
} }

View File

@ -894,7 +894,7 @@ public abstract class BaseDestination implements Destination {
Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination); Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
message.setRegionDestination(this); message.setRegionDestination(this);
broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1);
messageAck.setPoisonCause(cause); messageAck.setPoisonCause(cause);
try { try {
acknowledge(connectionContext, subscription, messageAck, message); acknowledge(connectionContext, subscription, messageAck, message);

View File

@ -2093,7 +2093,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
dropMessage(ref); dropMessage(ref);
if (gotToTheStore(ref.getMessage())) { if (gotToTheStore(ref.getMessage())) {
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage()); LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1)); store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
} }
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination)); broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
} }

View File

@ -1093,11 +1093,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (messageDispatch != null) { if (messageDispatch != null) {
LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
try { try {
MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POISON_ACK_TYPE, 1);
poisonAck.setPoisonCause(error); poisonAck.setPoisonCause(error);
localBroker.oneway(poisonAck); localBroker.oneway(poisonAck);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Failed to posion ack message following forward failure: ", ioe); LOG.error("Failed to poison ack message following forward failure: ", ioe);
} }
fireFailedForwardAdvisory(messageDispatch, error); fireFailedForwardAdvisory(messageDispatch, error);
} else { } else {

View File

@ -519,7 +519,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
sendPullCommand(timeout); sendPullCommand(timeout);
} else if (redeliveryExceeded(md)) { } else if (redeliveryExceeded(md)) {
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md); LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); poisonAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
if (timeout > 0) { if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0); timeout = Math.max(deadline - System.currentTimeMillis(), 0);
} }
@ -541,11 +541,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired(); return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired();
} }
private void posionAck(MessageDispatch md, String cause) throws JMSException { private void poisonAck(MessageDispatch md, String cause) throws JMSException {
MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); MessageAck poisonAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
posionAck.setFirstMessageId(md.getMessage().getMessageId()); poisonAck.setFirstMessageId(md.getMessage().getMessageId());
posionAck.setPoisonCause(new Throwable(cause)); poisonAck.setPoisonCause(new Throwable(cause));
session.sendAck(posionAck); session.sendAck(poisonAck);
} }
private boolean redeliveryExceeded(MessageDispatch md) { private boolean redeliveryExceeded(MessageDispatch md) {
@ -1271,7 +1271,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// DLQ. // DLQ.
// Acknowledge the last message. // Acknowledge the last message.
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); MessageAck ack = new MessageAck(lastMd, MessageAck.POISON_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId); ack.setFirstMessageId(firstMsgId);
ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy
+ ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause())); + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
@ -1422,7 +1422,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) { if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) { if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) { if (redeliveryExceeded(md)) {
posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
return; return;
} }
ActiveMQMessage message = createActiveMQMessage(md); ActiveMQMessage message = createActiveMQMessage(md);
@ -1483,7 +1483,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
dispatch(md); dispatch(md);
} else { } else {
LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md); LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId()); poisonAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
} }
} }
} }

View File

@ -900,7 +900,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
earlyAck.setFirstMessageId(message.getMessageId()); earlyAck.setFirstMessageId(message.getMessageId());
} else if (connection.isDuplicate(ActiveMQSession.this, message)) { } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId()); LOG.debug("{} got duplicate: {}", this, message.getMessageId());
earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
earlyAck.setFirstMessageId(md.getMessage().getMessageId()); earlyAck.setFirstMessageId(md.getMessage().getMessageId());
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
} }
@ -986,7 +986,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
// sent to the // sent to the
// DLQ. // DLQ.
// Acknowledge the last message. // Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId()); ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack); LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);

View File

@ -44,7 +44,7 @@ public class MessageAck extends BaseCommand {
* message was not processed and the message was considered a poison * message was not processed and the message was considered a poison
* message. * message.
*/ */
public static final byte POSION_ACK_TYPE = 1; public static final byte POISON_ACK_TYPE = 1;
/** /**
* In case the client want's to explicitly let the broker know that a * In case the client want's to explicitly let the broker know that a
@ -117,7 +117,7 @@ public class MessageAck extends BaseCommand {
} }
public boolean isPoisonAck() { public boolean isPoisonAck() {
return ackType == POSION_ACK_TYPE; return ackType == POISON_ACK_TYPE;
} }
public boolean isStandardAck() { public boolean isStandardAck() {

View File

@ -238,7 +238,7 @@ public class StompSubscription {
MessageAck ack = new MessageAck(); MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination()); ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId()); ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.POSION_ACK_TYPE); ack.setAckType(MessageAck.POISON_ACK_TYPE);
ack.setMessageID(msgId); ack.setMessageID(msgId);
if (transactionId != null) { if (transactionId != null) {
transactedMessages.add(ackEntry); transactedMessages.add(ackEntry);