From 50eac7c824e586aa858fb1f56676feffd4be7523 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 7 Apr 2016 12:12:16 -0400 Subject: [PATCH] ARTEMIS-468 Amendments to how redelivery count is handled on openwire --- .../ProtonSessionIntegrationCallback.java | 5 ++++ .../protocol/mqtt/MQTTSessionCallback.java | 6 +++++ .../protocol/openwire/OpenWireConnection.java | 2 +- .../openwire/OpenWireMessageConverter.java | 16 +++++-------- .../protocol/openwire/amq/AMQConsumer.java | 22 ++++++------------ .../protocol/openwire/amq/AMQSession.java | 13 ++++++++++- .../core/protocol/stomp/StompSession.java | 5 ++++ .../core/impl/CoreSessionCallback.java | 5 ++++ .../artemis/core/server/ConsumerListener.java | 23 ------------------- .../core/server/impl/ServerConsumerImpl.java | 21 ++++++++--------- .../spi/core/protocol/SessionCallback.java | 9 ++++++++ .../integration/client/HangConsumerTest.java | 5 ++++ 12 files changed, 71 insertions(+), 61 deletions(-) delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 55bade919d..1c6ea01f3d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -343,6 +343,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se public void sendProducerCreditsMessage(int credits, SimpleString address) { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 82b1ed695f..57cb7fef56 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -53,6 +53,12 @@ public class MQTTSessionCallback implements SessionCallback { return 1; } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + + @Override public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 3ccb98d3b4..818d3051a4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -745,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se throw new IllegalStateException("Session not exist! : " + sessionId); } - List consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection()); + List consumersList = amqSession.createConsumer(info, new SlowConsumerDetection()); this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList); ss.addConsumer(info); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 4516253493..cfe5b47d0d 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; @@ -87,7 +88,6 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; - public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; @@ -373,7 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter { } } - coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter()); ActiveMQDestination replyTo = messageSend.getReplyTo(); if (replyTo != null) { ByteSequence replyToBytes = marshaller.marshal(replyTo); @@ -445,15 +444,15 @@ public class OpenWireMessageConverter implements MessageConverter { } } - public static MessageDispatch createMessageDispatch(ServerMessage message, + public static MessageDispatch createMessageDispatch(MessageReference reference, ServerMessage message, AMQConsumer consumer) throws IOException, JMSException { - ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); + ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination()); //we can use core message id for sequenceId amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); - md.setRedeliveryCounter(amqMessage.getRedeliveryCounter()); + md.setRedeliveryCounter(reference.getDeliveryCount() - 1); md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId()); md.setMessage(amqMessage); ActiveMQDestination destination = amqMessage.getDestination(); @@ -462,7 +461,7 @@ public class OpenWireMessageConverter implements MessageConverter { return md; } - private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { + private static ActiveMQMessage toAMQMessage(MessageReference refernce, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { ActiveMQMessage amqMsg = null; byte coreType = coreMessage.getType(); switch (coreType) { @@ -762,10 +761,7 @@ public class OpenWireMessageConverter implements MessageConverter { amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); } - Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER); - if (redeliveryCounter != null) { - amqMsg.setRedeliveryCounter(redeliveryCounter); - } + amqMsg.setRedeliveryCounter(refernce.getDeliveryCount() - 1); byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO); if (replyToBytes != null) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 81cdec8b56..01820d6a8f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; -import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -44,10 +43,9 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; -import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.wireformat.WireFormat; -public class AMQConsumer implements ConsumerListener { +public class AMQConsumer { private AMQSession session; private org.apache.activemq.command.ActiveMQDestination openwireDestination; private ConsumerInfo info; @@ -186,7 +184,7 @@ public class AMQConsumer implements ConsumerListener { return 0; } - dispatch = OpenWireMessageConverter.createMessageDispatch(message, this); + dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); @@ -307,19 +305,13 @@ public class AMQConsumer implements ConsumerListener { } } - @Override - public void updateForCanceledRef(MessageReference ref) { + public void updateDeliveryCountAfterCancel(MessageReference ref) { long seqId = ref.getMessage().getMessageID(); long lastDelSeqId = info.getLastDeliveredSequenceId(); - ServerMessage coreMessage = ref.getMessage(); - int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER); - if (openwireDestination.isTopic()) { - redeliveryCounter++; - coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); - } - else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) { - redeliveryCounter++; - coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); + + // This is a specific rule of the protocol + if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) { + ref.decrementDeliveryCount(); } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 74dd951512..84354cdff0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -118,8 +118,19 @@ public class AMQSession implements SessionCallback { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + if (consumer.getProtocolData() != null) { + ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref); + return true; + } + else { + return false; + } + + } + public List createConsumer(ConsumerInfo info, - AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { //check destination ActiveMQDestination dest = info.getDestination(); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 9b5c70d1e9..8db5720424 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -118,6 +118,11 @@ public class StompSession implements SessionCallback { } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) { LargeServerMessageImpl largeMessage = null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 9d6125b34b..f4d69d1b6e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -56,6 +56,11 @@ public final class CoreSessionCallback implements SessionCallback { return connection.isWritable(callback); } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java deleted file mode 100644 index 2b2be9c3da..0000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server; - -/** - */ -public interface ConsumerListener { - void updateForCanceledRef(MessageReference ref); -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 12ca54c5bd..5fb6018fdb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; @@ -367,7 +366,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.incrementDeliveryCount(); // If updateDeliveries = false (set by strict-update), - // the updateDeliveryCount would still be updated after c + // the updateDeliveryCountAfterCancel would still be updated after c if (strictUpdateDeliveryCount && !ref.isPaged()) { if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && @@ -596,15 +595,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { - if (!failed) { - // We don't decrement delivery count if the client failed, since there's a possibility that refs - // were actually delivered but we just didn't get any acks for them - // before failure - ref.decrementDeliveryCount(); - } - - if (this.protocolData instanceof ConsumerListener) { - ((ConsumerListener)protocolData).updateForCanceledRef(ref); + // We first update the deliveryCount at the protocol callback... + // if that wasn't updated (if there is no specific logic, then we apply the default logic used on most protocols + if (!callback.updateDeliveryCountAfterCancel(this, ref, failed)) { + if (!failed) { + // We don't decrement delivery count if the client failed, since there's a possibility that refs + // were actually delivered but we just didn't get any acks for them + // before failure + ref.decrementDeliveryCount(); + } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index cf0ec694b5..9f23f802d0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -33,6 +33,15 @@ public interface SessionCallback { * like acks or other operations. */ void afterDelivery() throws Exception; + /** + * Use this to updates specifics on the message after a redelivery happened. + * Return true if there was specific logic applied on the protocol, so the ServerConsumer won't make any adjustments. + * @param consumer + * @param ref + * @param failed + */ + boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed); + void sendProducerCreditsMessage(int credits, SimpleString address); void sendProducerCreditsFailMessage(int credits, SimpleString address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index a3bae652f0..2c888964b7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -483,6 +483,11 @@ public class HangConsumerTest extends ActiveMQTestBase { targetCallback.sendProducerCreditsMessage(credits, address); } + @Override + public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { + return false; + } + @Override public void browserFinished(ServerConsumer consumer) {