From 8a998ad805e88ef2bf4cdb01c19c1d0ba1217a30 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 6 Apr 2016 20:59:57 +0800 Subject: [PATCH] ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests --- .../openwire/OpenWireMessageConverter.java | 8 +++-- .../protocol/openwire/amq/AMQConsumer.java | 30 +++++++++++++++---- .../artemis/core/server/ConsumerListener.java | 23 ++++++++++++++ .../core/server/impl/ServerConsumerImpl.java | 6 ++++ .../core/server/impl/ServerSessionImpl.java | 1 + 5 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 53464cce08..4516253493 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -87,7 +87,7 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; - private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; + public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; @@ -446,14 +446,16 @@ public class OpenWireMessageConverter implements MessageConverter { } public static MessageDispatch createMessageDispatch(ServerMessage message, - int deliveryCount, AMQConsumer consumer) throws IOException, JMSException { ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); + //we can use core message id for sequenceId + amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); + md.setRedeliveryCounter(amqMessage.getRedeliveryCounter()); + md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId()); md.setMessage(amqMessage); - md.setRedeliveryCounter(deliveryCount); ActiveMQDestination destination = amqMessage.getDestination(); md.setDestination(destination); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 3093ed8689..81cdec8b56 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; +import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -43,9 +44,10 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.wireformat.WireFormat; -public class AMQConsumer { +public class AMQConsumer implements ConsumerListener { private AMQSession session; private org.apache.activemq.command.ActiveMQDestination openwireDestination; private ConsumerInfo info; @@ -108,7 +110,6 @@ public class AMQConsumer { } serverConsumer.setProtocolData(this); - } private SimpleString createTopicSubscription(boolean isDurable, @@ -184,8 +185,8 @@ public class AMQConsumer { if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) { return 0; } - //decrement deliveryCount as AMQ client tends to add 1. - dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); + + dispatch = OpenWireMessageConverter.createMessageDispatch(message, this); int size = dispatch.getMessage().getSize(); reference.setProtocolData(dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); @@ -215,7 +216,6 @@ public class AMQConsumer { * Notice that we will start a new transaction on the cases where there is no transaction. */ public void acknowledge(MessageAck ack) throws Exception { - MessageId first = ack.getFirstMessageId(); MessageId last = ack.getLastMessageId(); @@ -252,6 +252,10 @@ public class AMQConsumer { } else if (ack.isPoisonAck()) { for (MessageReference ref : ackList) { + Throwable poisonCause = ack.getPoisonCause(); + if (poisonCause != null) { + ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString()); + } ref.getQueue().sendToDeadLetterAddress(transaction, ref); } } @@ -303,6 +307,22 @@ public class AMQConsumer { } } + @Override + public void updateForCanceledRef(MessageReference ref) { + long seqId = ref.getMessage().getMessageID(); + long lastDelSeqId = info.getLastDeliveredSequenceId(); + ServerMessage coreMessage = ref.getMessage(); + int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER); + if (openwireDestination.isTopic()) { + redeliveryCounter++; + coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); + } + else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) { + redeliveryCounter++; + coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter); + } + } + /** * The MessagePullHandler is used with slow consumer policies. * */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java new file mode 100644 index 0000000000..2b2be9c3da --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ConsumerListener.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +/** + */ +public interface ConsumerListener { + void updateForCanceledRef(MessageReference ref); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 7860ed804a..12ca54c5bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.ConsumerListener; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; @@ -65,6 +66,7 @@ import org.apache.activemq.artemis.utils.TypedProperties; * Concrete implementation of a ClientConsumer. */ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { + //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log"); // Constants ------------------------------------------------------------------------------------ private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -600,6 +602,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // before failure ref.decrementDeliveryCount(); } + + if (this.protocolData instanceof ConsumerListener) { + ((ConsumerListener)protocolData).updateForCanceledRef(ref); + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 31102aa486..7a817f00b3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -317,6 +317,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { protected void doClose(final boolean failed) throws Exception { synchronized (this) { + this.setStarted(false); if (closed) return;