From 8a998ad805e88ef2bf4cdb01c19c1d0ba1217a30 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 6 Apr 2016 20:59:57 +0800 Subject: [PATCH 1/2] ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests --- .../openwire/OpenWireMessageConverter.java | 8 +++-- .../protocol/openwire/amq/AMQConsumer.java | 30 +++++++++++++++---- .../artemis/core/server/ConsumerListener.java | 23 ++++++++++++++ .../core/server/impl/ServerConsumerImpl.java | 6 ++++ .../core/server/impl/ServerSessionImpl.java | 1 + 5 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 53464cce08..4516253493 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -87,7 +87,7 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; - private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; + public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; @@ -446,14 +446,16 @@ public class OpenWireMessageConverter implements MessageConverter { } public static MessageDispatch createMessageDispatch(ServerMessage message, - int deliveryCount, AMQConsumer consumer) throws IOException, JMSException { ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); + //we can use core message id for sequenceId + amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); + md.setRedeliveryCounter(amqMessage.getRedeliveryCounter()); + md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId()); md.setMessage(amqMessage); - md.setRedeliveryCounter(deliveryCount); ActiveMQDestination destination = amqMessage.getDestination(); md.setDestination(destination); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 3093ed8689..81cdec8b56 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -43,9 +44,10 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.wireformat.WireFormat; -public class AMQConsumer { +public class AMQConsumer implements ConsumerListener { private AMQSession session; private org.apache.activemq.command.ActiveMQDestination openwireDestination; private ConsumerInfo info; @@ -108,7 +110,6 @@ public class AMQConsumer { } serverConsumer.setProtocolData(this); - } private SimpleString createTopicSubscription(boolean isDurable, @@ -184,8 +185,8 @@ public class AMQConsumer { if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) { return 0; } - //decrement deliveryCount as AMQ client tends to add 1. - dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); + + dispatch = OpenWireMessageConverter.createMessageDispatch(message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); @@ -215,7 +216,6 @@ public class AMQConsumer { * Notice that we will start a new transaction on the cases where there is no transaction. */ public void acknowledge(MessageAck ack) throws Exception { - MessageId first = ack.getFirstMessageId(); MessageId last = ack.getLastMessageId(); @@ -252,6 +252,10 @@ public class AMQConsumer { } else if (ack.isPoisonAck()) { for (MessageReference ref : ackList) { + Throwable poisonCause = ack.getPoisonCause(); + if (poisonCause != null) { + ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString()); + } ref.getQueue().sendToDeadLetterAddress(transaction, ref); } } @@ -303,6 +307,22 @@ public class AMQConsumer { } } + @Override + public void updateForCanceledRef(MessageReference ref) { + long seqId = ref.getMessage().getMessageID(); + long lastDelSeqId = info.getLastDeliveredSequenceId(); + ServerMessage coreMessage = ref.getMessage(); + int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER); + if (openwireDestination.isTopic()) { + redeliveryCounter++; + coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); + } + else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) { + redeliveryCounter++; + coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); + } + } + /** * The MessagePullHandler is used with slow consumer policies. * */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java new file mode 100644 index 0000000000..2b2be9c3da --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +/** + */ +public interface ConsumerListener { + void updateForCanceledRef(MessageReference ref); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 7860ed804a..12ca54c5bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; @@ -65,6 +66,7 @@ import org.apache.activemq.artemis.utils.TypedProperties; * Concrete implementation of a ClientConsumer. */ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { + //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log"); // Constants ------------------------------------------------------------------------------------ private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -600,6 +602,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // before failure ref.decrementDeliveryCount(); } + + if (this.protocolData instanceof ConsumerListener) { + ((ConsumerListener)protocolData).updateForCanceledRef(ref); + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 31102aa486..7a817f00b3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -317,6 +317,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { protected void doClose(final boolean failed) throws Exception { synchronized (this) { + this.setStarted(false); if (closed) return; From 50eac7c824e586aa858fb1f56676feffd4be7523 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 7 Apr 2016 12:12:16 -0400 Subject: [PATCH 2/2] ARTEMIS-468 Amendments to how redelivery count is handled on openwire --- .../ProtonSessionIntegrationCallback.java | 5 ++++ .../protocol/mqtt/MQTTSessionCallback.java | 6 +++++ .../protocol/openwire/OpenWireConnection.java | 2 +- .../openwire/OpenWireMessageConverter.java | 16 +++++-------- .../protocol/openwire/amq/AMQConsumer.java | 22 ++++++------------ .../protocol/openwire/amq/AMQSession.java | 13 ++++++++++- .../core/protocol/stomp/StompSession.java | 5 ++++ .../core/impl/CoreSessionCallback.java | 5 ++++ .../artemis/core/server/ConsumerListener.java | 23 ------------------- .../core/server/impl/ServerConsumerImpl.java | 21 ++++++++--------- .../spi/core/protocol/SessionCallback.java | 9 ++++++++ .../integration/client/HangConsumerTest.java | 5 ++++ 12 files changed, 71 insertions(+), 61 deletions(-) delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 55bade919d..1c6ea01f3d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -343,6 +343,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se public void sendProducerCreditsMessage(int credits, SimpleString address) { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 82b1ed695f..57cb7fef56 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -53,6 +53,12 @@ public class MQTTSessionCallback implements SessionCallback { return 1; } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + + @Override public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 3ccb98d3b4..818d3051a4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -745,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se throw new IllegalStateException("Session not exist! : " + sessionId); } - List consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection()); + List consumersList = amqSession.createConsumer(info, new SlowConsumerDetection()); this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList); ss.addConsumer(info); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 4516253493..cfe5b47d0d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; @@ -87,7 +88,6 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; - public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; @@ -373,7 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter { } } - coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter()); ActiveMQDestination replyTo = messageSend.getReplyTo(); if (replyTo != null) { ByteSequence replyToBytes = marshaller.marshal(replyTo); @@ -445,15 +444,15 @@ public class OpenWireMessageConverter implements MessageConverter { } } - public static MessageDispatch createMessageDispatch(ServerMessage message, + public static MessageDispatch createMessageDispatch(MessageReference reference, ServerMessage message, AMQConsumer consumer) throws IOException, JMSException { - ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); + ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination()); //we can use core message id for sequenceId amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); - md.setRedeliveryCounter(amqMessage.getRedeliveryCounter()); + md.setRedeliveryCounter(reference.getDeliveryCount() - 1); md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId()); md.setMessage(amqMessage); ActiveMQDestination destination = amqMessage.getDestination(); @@ -462,7 +461,7 @@ public class OpenWireMessageConverter implements MessageConverter { return md; } - private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { + private static ActiveMQMessage toAMQMessage(MessageReference refernce, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { ActiveMQMessage amqMsg = null; byte coreType = coreMessage.getType(); switch (coreType) { @@ -762,10 +761,7 @@ public class OpenWireMessageConverter implements MessageConverter { amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); } - Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER); - if (redeliveryCounter != null) { - amqMsg.setRedeliveryCounter(redeliveryCounter); - } + amqMsg.setRedeliveryCounter(refernce.getDeliveryCount() - 1); byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO); if (replyToBytes != null) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 81cdec8b56..01820d6a8f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; -import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -44,10 +43,9 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; -import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.wireformat.WireFormat; -public class AMQConsumer implements ConsumerListener { +public class AMQConsumer { private AMQSession session; private org.apache.activemq.command.ActiveMQDestination openwireDestination; private ConsumerInfo info; @@ -186,7 +184,7 @@ public class AMQConsumer implements ConsumerListener { return 0; } - dispatch = OpenWireMessageConverter.createMessageDispatch(message, this); + dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); @@ -307,19 +305,13 @@ public class AMQConsumer implements ConsumerListener { } } - @Override - public void updateForCanceledRef(MessageReference ref) { + public void updateDeliveryCountAfterCancel(MessageReference ref) { long seqId = ref.getMessage().getMessageID(); long lastDelSeqId = info.getLastDeliveredSequenceId(); - ServerMessage coreMessage = ref.getMessage(); - int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER); - if (openwireDestination.isTopic()) { - redeliveryCounter++; - coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); - } - else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) { - redeliveryCounter++; - coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); + + // This is a specific rule of the protocol + if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) { + ref.decrementDeliveryCount(); } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 74dd951512..84354cdff0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -118,8 +118,19 @@ public class AMQSession implements SessionCallback { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + if (consumer.getProtocolData() != null) { + ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref); + return true; + } + else { + return false; + } + + } + public List createConsumer(ConsumerInfo info, - AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { //check destination ActiveMQDestination dest = info.getDestination(); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 9b5c70d1e9..8db5720424 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -118,6 +118,11 @@ public class StompSession implements SessionCallback { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) { LargeServerMessageImpl largeMessage = null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 9d6125b34b..f4d69d1b6e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -56,6 +56,11 @@ public final class CoreSessionCallback implements SessionCallback { return connection.isWritable(callback); } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java deleted file mode 100644 index 2b2be9c3da..0000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server; - -/** - */ -public interface ConsumerListener { - void updateForCanceledRef(MessageReference ref); -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 12ca54c5bd..5fb6018fdb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; @@ -367,7 +366,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.incrementDeliveryCount(); // If updateDeliveries = false (set by strict-update), - // the updateDeliveryCount would still be updated after c + // the updateDeliveryCountAfterCancel would still be updated after c if (strictUpdateDeliveryCount && !ref.isPaged()) { if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && @@ -596,15 +595,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { - if (!failed) { - // We don't decrement delivery count if the client failed, since there's a possibility that refs - // were actually delivered but we just didn't get any acks for them - // before failure - ref.decrementDeliveryCount(); - } - - if (this.protocolData instanceof ConsumerListener) { - ((ConsumerListener)protocolData).updateForCanceledRef(ref); + // We first update the deliveryCount at the protocol callback... + // if that wasn't updated (if there is no specific logic, then we apply the default logic used on most protocols + if (!callback.updateDeliveryCountAfterCancel(this, ref, failed)) { + if (!failed) { + // We don't decrement delivery count if the client failed, since there's a possibility that refs + // were actually delivered but we just didn't get any acks for them + // before failure + ref.decrementDeliveryCount(); + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index cf0ec694b5..9f23f802d0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -33,6 +33,15 @@ public interface SessionCallback { * like acks or other operations. */ void afterDelivery() throws Exception; + /** + * Use this to updates specifics on the message after a redelivery happened. + * Return true if there was specific logic applied on the protocol, so the ServerConsumer won't make any adjustments. + * @param consumer + * @param ref + * @param failed + */ + boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed); + void sendProducerCreditsMessage(int credits, SimpleString address); void sendProducerCreditsFailMessage(int credits, SimpleString address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index a3bae652f0..2c888964b7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -483,6 +483,11 @@ public class HangConsumerTest extends ActiveMQTestBase { targetCallback.sendProducerCreditsMessage(credits, address); } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public void browserFinished(ServerConsumer consumer) {