mirror of https://github.com/apache/activemq.git
Merge pull request #568 from graben/AMQ-8055
AMQ-8055 - rename posion to correct poison
This commit is contained in:
commit
6cb09e7c0b
|
@ -280,7 +280,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
// the DLQ. If a custom redelivery policy is used on the broker the message
|
// the DLQ. If a custom redelivery policy is used on the broker the message
|
||||||
// can still be redelivered based on the configation of that policy.
|
// can still be redelivered based on the configation of that policy.
|
||||||
LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state);
|
LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state);
|
||||||
settle(delivery, MessageAck.POSION_ACK_TYPE);
|
settle(delivery, MessageAck.POISON_ACK_TYPE);
|
||||||
} else if (state instanceof Released) {
|
} else if (state instanceof Released) {
|
||||||
LOG.trace("onDelivery: Released state = {}", state);
|
LOG.trace("onDelivery: Released state = {}", state);
|
||||||
// re-deliver && don't increment the counter.
|
// re-deliver && don't increment the counter.
|
||||||
|
@ -297,7 +297,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
if (undeliverableHere != null && undeliverableHere) {
|
if (undeliverableHere != null && undeliverableHere) {
|
||||||
// receiver does not want the message..
|
// receiver does not want the message..
|
||||||
// perhaps we should DLQ it?
|
// perhaps we should DLQ it?
|
||||||
ackType = MessageAck.POSION_ACK_TYPE;
|
ackType = MessageAck.POISON_ACK_TYPE;
|
||||||
}
|
}
|
||||||
settle(delivery, ackType);
|
settle(delivery, ackType);
|
||||||
}
|
}
|
||||||
|
|
|
@ -894,7 +894,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
|
Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
|
||||||
message.setRegionDestination(this);
|
message.setRegionDestination(this);
|
||||||
broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
|
broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
|
||||||
MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
|
MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1);
|
||||||
messageAck.setPoisonCause(cause);
|
messageAck.setPoisonCause(cause);
|
||||||
try {
|
try {
|
||||||
acknowledge(connectionContext, subscription, messageAck, message);
|
acknowledge(connectionContext, subscription, messageAck, message);
|
||||||
|
|
|
@ -2093,7 +2093,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
dropMessage(ref);
|
dropMessage(ref);
|
||||||
if (gotToTheStore(ref.getMessage())) {
|
if (gotToTheStore(ref.getMessage())) {
|
||||||
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
|
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
|
||||||
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
|
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
|
||||||
}
|
}
|
||||||
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
|
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1093,11 +1093,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
if (messageDispatch != null) {
|
if (messageDispatch != null) {
|
||||||
LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
|
LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
|
||||||
try {
|
try {
|
||||||
MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
|
MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POISON_ACK_TYPE, 1);
|
||||||
poisonAck.setPoisonCause(error);
|
poisonAck.setPoisonCause(error);
|
||||||
localBroker.oneway(poisonAck);
|
localBroker.oneway(poisonAck);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Failed to posion ack message following forward failure: ", ioe);
|
LOG.error("Failed to poison ack message following forward failure: ", ioe);
|
||||||
}
|
}
|
||||||
fireFailedForwardAdvisory(messageDispatch, error);
|
fireFailedForwardAdvisory(messageDispatch, error);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -519,7 +519,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
sendPullCommand(timeout);
|
sendPullCommand(timeout);
|
||||||
} else if (redeliveryExceeded(md)) {
|
} else if (redeliveryExceeded(md)) {
|
||||||
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
|
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
|
||||||
posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
|
poisonAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
|
||||||
if (timeout > 0) {
|
if (timeout > 0) {
|
||||||
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
|
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
|
||||||
}
|
}
|
||||||
|
@ -541,11 +541,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired();
|
return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void posionAck(MessageDispatch md, String cause) throws JMSException {
|
private void poisonAck(MessageDispatch md, String cause) throws JMSException {
|
||||||
MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
|
MessageAck poisonAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
|
||||||
posionAck.setFirstMessageId(md.getMessage().getMessageId());
|
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
|
||||||
posionAck.setPoisonCause(new Throwable(cause));
|
poisonAck.setPoisonCause(new Throwable(cause));
|
||||||
session.sendAck(posionAck);
|
session.sendAck(poisonAck);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean redeliveryExceeded(MessageDispatch md) {
|
private boolean redeliveryExceeded(MessageDispatch md) {
|
||||||
|
@ -1271,7 +1271,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
// DLQ.
|
// DLQ.
|
||||||
// Acknowledge the last message.
|
// Acknowledge the last message.
|
||||||
|
|
||||||
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
|
MessageAck ack = new MessageAck(lastMd, MessageAck.POISON_ACK_TYPE, deliveredMessages.size());
|
||||||
ack.setFirstMessageId(firstMsgId);
|
ack.setFirstMessageId(firstMsgId);
|
||||||
ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy
|
ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy
|
||||||
+ ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
|
+ ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
|
||||||
|
@ -1422,7 +1422,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
|
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
|
||||||
if (listener != null && unconsumedMessages.isRunning()) {
|
if (listener != null && unconsumedMessages.isRunning()) {
|
||||||
if (redeliveryExceeded(md)) {
|
if (redeliveryExceeded(md)) {
|
||||||
posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
|
poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ActiveMQMessage message = createActiveMQMessage(md);
|
ActiveMQMessage message = createActiveMQMessage(md);
|
||||||
|
@ -1483,7 +1483,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
dispatch(md);
|
dispatch(md);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
|
LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
|
||||||
posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
|
poisonAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -900,7 +900,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
earlyAck.setFirstMessageId(message.getMessageId());
|
earlyAck.setFirstMessageId(message.getMessageId());
|
||||||
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
|
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
|
||||||
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
|
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
|
||||||
earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
|
earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
|
||||||
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
|
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
|
||||||
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
|
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
|
||||||
}
|
}
|
||||||
|
@ -986,7 +986,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
// sent to the
|
// sent to the
|
||||||
// DLQ.
|
// DLQ.
|
||||||
// Acknowledge the last message.
|
// Acknowledge the last message.
|
||||||
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
|
MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
|
||||||
ack.setFirstMessageId(md.getMessage().getMessageId());
|
ack.setFirstMessageId(md.getMessage().getMessageId());
|
||||||
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
|
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
|
||||||
LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
|
LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
|
||||||
|
|
|
@ -44,7 +44,7 @@ public class MessageAck extends BaseCommand {
|
||||||
* message was not processed and the message was considered a poison
|
* message was not processed and the message was considered a poison
|
||||||
* message.
|
* message.
|
||||||
*/
|
*/
|
||||||
public static final byte POSION_ACK_TYPE = 1;
|
public static final byte POISON_ACK_TYPE = 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In case the client want's to explicitly let the broker know that a
|
* In case the client want's to explicitly let the broker know that a
|
||||||
|
@ -117,7 +117,7 @@ public class MessageAck extends BaseCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isPoisonAck() {
|
public boolean isPoisonAck() {
|
||||||
return ackType == POSION_ACK_TYPE;
|
return ackType == POISON_ACK_TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isStandardAck() {
|
public boolean isStandardAck() {
|
||||||
|
|
|
@ -238,7 +238,7 @@ public class StompSubscription {
|
||||||
MessageAck ack = new MessageAck();
|
MessageAck ack = new MessageAck();
|
||||||
ack.setDestination(consumerInfo.getDestination());
|
ack.setDestination(consumerInfo.getDestination());
|
||||||
ack.setConsumerId(consumerInfo.getConsumerId());
|
ack.setConsumerId(consumerInfo.getConsumerId());
|
||||||
ack.setAckType(MessageAck.POSION_ACK_TYPE);
|
ack.setAckType(MessageAck.POISON_ACK_TYPE);
|
||||||
ack.setMessageID(msgId);
|
ack.setMessageID(msgId);
|
||||||
if (transactionId != null) {
|
if (transactionId != null) {
|
||||||
transactedMessages.add(ackEntry);
|
transactedMessages.add(ackEntry);
|
||||||
|
|
Loading…
Reference in New Issue