diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 55bade919d..1c6ea01f3d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -343,6 +343,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se public void sendProducerCreditsMessage(int credits, SimpleString address) { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 82b1ed695f..57cb7fef56 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -53,6 +53,12 @@ public class MQTTSessionCallback implements SessionCallback { return 1; } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + + @Override public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 3ccb98d3b4..818d3051a4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -745,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se throw new IllegalStateException("Session not exist! : " + sessionId); } - List consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection()); + List consumersList = amqSession.createConsumer(info, new SlowConsumerDetection()); this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList); ss.addConsumer(info); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 53464cce08..cfe5b47d0d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; @@ -87,7 +88,6 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; - private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; @@ -373,7 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter { } } - coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter()); ActiveMQDestination replyTo = messageSend.getReplyTo(); if (replyTo != null) { ByteSequence replyToBytes = marshaller.marshal(replyTo); @@ -445,22 +444,24 @@ public class OpenWireMessageConverter implements MessageConverter { } } - public static MessageDispatch createMessageDispatch(ServerMessage message, - int deliveryCount, + public static MessageDispatch createMessageDispatch(MessageReference reference, ServerMessage message, AMQConsumer consumer) throws IOException, JMSException { - ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); + ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination()); + //we can use core message id for sequenceId + amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); + md.setRedeliveryCounter(reference.getDeliveryCount() - 1); + md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId()); md.setMessage(amqMessage); - md.setRedeliveryCounter(deliveryCount); ActiveMQDestination destination = amqMessage.getDestination(); md.setDestination(destination); return md; } - private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { + private static ActiveMQMessage toAMQMessage(MessageReference refernce, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { ActiveMQMessage amqMsg = null; byte coreType = coreMessage.getType(); switch (coreType) { @@ -760,10 +761,7 @@ public class OpenWireMessageConverter implements MessageConverter { amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); } - Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER); - if (redeliveryCounter != null) { - amqMsg.setRedeliveryCounter(redeliveryCounter); - } + amqMsg.setRedeliveryCounter(refernce.getDeliveryCount() - 1); byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO); if (replyToBytes != null) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 3093ed8689..01820d6a8f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -108,7 +108,6 @@ public class AMQConsumer { } serverConsumer.setProtocolData(this); - } private SimpleString createTopicSubscription(boolean isDurable, @@ -184,8 +183,8 @@ public class AMQConsumer { if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) { return 0; } - //decrement deliveryCount as AMQ client tends to add 1. - dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); + + dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); @@ -215,7 +214,6 @@ public class AMQConsumer { * Notice that we will start a new transaction on the cases where there is no transaction. */ public void acknowledge(MessageAck ack) throws Exception { - MessageId first = ack.getFirstMessageId(); MessageId last = ack.getLastMessageId(); @@ -252,6 +250,10 @@ public class AMQConsumer { } else if (ack.isPoisonAck()) { for (MessageReference ref : ackList) { + Throwable poisonCause = ack.getPoisonCause(); + if (poisonCause != null) { + ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString()); + } ref.getQueue().sendToDeadLetterAddress(transaction, ref); } } @@ -303,6 +305,16 @@ public class AMQConsumer { } } + public void updateDeliveryCountAfterCancel(MessageReference ref) { + long seqId = ref.getMessage().getMessageID(); + long lastDelSeqId = info.getLastDeliveredSequenceId(); + + // This is a specific rule of the protocol + if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) { + ref.decrementDeliveryCount(); + } + } + /** * The MessagePullHandler is used with slow consumer policies. * */ diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 74dd951512..84354cdff0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -118,8 +118,19 @@ public class AMQSession implements SessionCallback { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + if (consumer.getProtocolData() != null) { + ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref); + return true; + } + else { + return false; + } + + } + public List createConsumer(ConsumerInfo info, - AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { //check destination ActiveMQDestination dest = info.getDestination(); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 9b5c70d1e9..8db5720424 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -118,6 +118,11 @@ public class StompSession implements SessionCallback { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) { LargeServerMessageImpl largeMessage = null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 9d6125b34b..f4d69d1b6e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -56,6 +56,11 @@ public final class CoreSessionCallback implements SessionCallback { return connection.isWritable(callback); } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 7860ed804a..5fb6018fdb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -65,6 +65,7 @@ import org.apache.activemq.artemis.utils.TypedProperties; * Concrete implementation of a ClientConsumer. */ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { + //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log"); // Constants ------------------------------------------------------------------------------------ private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -365,7 +366,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.incrementDeliveryCount(); // If updateDeliveries = false (set by strict-update), - // the updateDeliveryCount would still be updated after c + // the updateDeliveryCountAfterCancel would still be updated after c if (strictUpdateDeliveryCount && !ref.isPaged()) { if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && @@ -594,11 +595,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { - if (!failed) { - // We don't decrement delivery count if the client failed, since there's a possibility that refs - // were actually delivered but we just didn't get any acks for them - // before failure - ref.decrementDeliveryCount(); + // We first update the deliveryCount at the protocol callback... + // if that wasn't updated (if there is no specific logic, then we apply the default logic used on most protocols + if (!callback.updateDeliveryCountAfterCancel(this, ref, failed)) { + if (!failed) { + // We don't decrement delivery count if the client failed, since there's a possibility that refs + // were actually delivered but we just didn't get any acks for them + // before failure + ref.decrementDeliveryCount(); + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 31102aa486..7a817f00b3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -317,6 +317,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { protected void doClose(final boolean failed) throws Exception { synchronized (this) { + this.setStarted(false); if (closed) return; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index cf0ec694b5..9f23f802d0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -33,6 +33,15 @@ public interface SessionCallback { * like acks or other operations. */ void afterDelivery() throws Exception; + /** + * Use this to updates specifics on the message after a redelivery happened. + * Return true if there was specific logic applied on the protocol, so the ServerConsumer won't make any adjustments. + * @param consumer + * @param ref + * @param failed + */ + boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed); + void sendProducerCreditsMessage(int credits, SimpleString address); void sendProducerCreditsFailMessage(int credits, SimpleString address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index a3bae652f0..2c888964b7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -483,6 +483,11 @@ public class HangConsumerTest extends ActiveMQTestBase { targetCallback.sendProducerCreditsMessage(credits, address); } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public void browserFinished(ServerConsumer consumer) {