diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index ab968b8aa2..e3586e230a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -280,7 +280,7 @@ public class AmqpSender extends AmqpAbstractLink { // the DLQ. If a custom redelivery policy is used on the broker the message // can still be redelivered based on the configation of that policy. LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state); - settle(delivery, MessageAck.POSION_ACK_TYPE); + settle(delivery, MessageAck.POISON_ACK_TYPE); } else if (state instanceof Released) { LOG.trace("onDelivery: Released state = {}", state); // re-deliver && don't increment the counter. @@ -297,7 +297,7 @@ public class AmqpSender extends AmqpAbstractLink { if (undeliverableHere != null && undeliverableHere) { // receiver does not want the message.. // perhaps we should DLQ it? - ackType = MessageAck.POSION_ACK_TYPE; + ackType = MessageAck.POISON_ACK_TYPE; } settle(delivery, ackType); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 805ef6f61a..443a8f23e2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -894,7 +894,7 @@ public abstract class BaseDestination implements Destination { Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination); message.setRegionDestination(this); broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); - MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); + MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1); messageAck.setPoisonCause(cause); try { acknowledge(connectionContext, subscription, messageAck, message); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index b7761edf49..7385e4ff8a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -2093,7 +2093,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index dropMessage(ref); if (gotToTheStore(ref.getMessage())) { LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage()); - store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1)); + store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1)); } broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination)); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 602e6eb56a..a9cbd8fc1c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1093,11 +1093,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (messageDispatch != null) { LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); try { - MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); + MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POISON_ACK_TYPE, 1); poisonAck.setPoisonCause(error); localBroker.oneway(poisonAck); } catch (IOException ioe) { - LOG.error("Failed to posion ack message following forward failure: ", ioe); + LOG.error("Failed to poison ack message following forward failure: ", ioe); } fireFailedForwardAdvisory(messageDispatch, error); } else { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a1e7d0aa44..764f8c93b5 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -519,7 +519,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC sendPullCommand(timeout); } else if (redeliveryExceeded(md)) { LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md); - posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); + poisonAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); if (timeout > 0) { timeout = Math.max(deadline - System.currentTimeMillis(), 0); } @@ -541,11 +541,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired(); } - private void posionAck(MessageDispatch md, String cause) throws JMSException { - MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); - posionAck.setFirstMessageId(md.getMessage().getMessageId()); - posionAck.setPoisonCause(new Throwable(cause)); - session.sendAck(posionAck); + private void poisonAck(MessageDispatch md, String cause) throws JMSException { + MessageAck poisonAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1); + poisonAck.setFirstMessageId(md.getMessage().getMessageId()); + poisonAck.setPoisonCause(new Throwable(cause)); + session.sendAck(poisonAck); } private boolean redeliveryExceeded(MessageDispatch md) { @@ -1271,7 +1271,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // DLQ. // Acknowledge the last message. - MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); + MessageAck ack = new MessageAck(lastMd, MessageAck.POISON_ACK_TYPE, deliveredMessages.size()); ack.setFirstMessageId(firstMsgId); ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause())); @@ -1422,7 +1422,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) { if (listener != null && unconsumedMessages.isRunning()) { if (redeliveryExceeded(md)) { - posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); + poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); return; } ActiveMQMessage message = createActiveMQMessage(md); @@ -1483,7 +1483,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC dispatch(md); } else { LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md); - posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId()); + poisonAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId()); } } } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 73c161671e..35bddf2852 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -900,7 +900,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta earlyAck.setFirstMessageId(message.getMessageId()); } else if (connection.isDuplicate(ActiveMQSession.this, message)) { LOG.debug("{} got duplicate: {}", this, message.getMessageId()); - earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); + earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1); earlyAck.setFirstMessageId(md.getMessage().getMessageId()); earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); } @@ -986,7 +986,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta // sent to the // DLQ. // Acknowledge the last message. - MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); + MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack); diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java index bb0a72f9c1..dc575d477e 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java @@ -44,7 +44,7 @@ public class MessageAck extends BaseCommand { * message was not processed and the message was considered a poison * message. */ - public static final byte POSION_ACK_TYPE = 1; + public static final byte POISON_ACK_TYPE = 1; /** * In case the client want's to explicitly let the broker know that a @@ -117,7 +117,7 @@ public class MessageAck extends BaseCommand { } public boolean isPoisonAck() { - return ackType == POSION_ACK_TYPE; + return ackType == POISON_ACK_TYPE; } public boolean isStandardAck() { diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 95fe986b25..2f66770ed3 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -238,7 +238,7 @@ public class StompSubscription { MessageAck ack = new MessageAck(); ack.setDestination(consumerInfo.getDestination()); ack.setConsumerId(consumerInfo.getConsumerId()); - ack.setAckType(MessageAck.POSION_ACK_TYPE); + ack.setAckType(MessageAck.POISON_ACK_TYPE); ack.setMessageID(msgId); if (transactionId != null) { transactedMessages.add(ackEntry);