ARTEMIS-468 Fix openwire redelivery related regressions under integration-tests

This commit is contained in:
Howard Gao 2016-04-06 20:59:57 +08:00 committed by Clebert Suconic
parent cf4636e96c
commit 8a998ad805
5 changed files with 60 additions and 8 deletions

View File

@ -87,7 +87,7 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
@ -446,14 +446,16 @@ public class OpenWireMessageConverter implements MessageConverter {
} }
public static MessageDispatch createMessageDispatch(ServerMessage message, public static MessageDispatch createMessageDispatch(ServerMessage message,
int deliveryCount,
AMQConsumer consumer) throws IOException, JMSException { AMQConsumer consumer) throws IOException, JMSException {
ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
MessageDispatch md = new MessageDispatch(); MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId()); md.setConsumerId(consumer.getId());
md.setRedeliveryCounter(amqMessage.getRedeliveryCounter());
md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
md.setMessage(amqMessage); md.setMessage(amqMessage);
md.setRedeliveryCounter(deliveryCount);
ActiveMQDestination destination = amqMessage.getDestination(); ActiveMQDestination destination = amqMessage.getDestination();
md.setDestination(destination); md.setDestination(destination);

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ConsumerListener;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -43,9 +44,10 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer { public class AMQConsumer implements ConsumerListener {
private AMQSession session; private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination openwireDestination; private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info; private ConsumerInfo info;
@ -108,7 +110,6 @@ public class AMQConsumer {
} }
serverConsumer.setProtocolData(this); serverConsumer.setProtocolData(this);
} }
private SimpleString createTopicSubscription(boolean isDurable, private SimpleString createTopicSubscription(boolean isDurable,
@ -184,8 +185,8 @@ public class AMQConsumer {
if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) { if (messagePullHandler != null && !messagePullHandler.checkForcedConsumer(message)) {
return 0; return 0;
} }
//decrement deliveryCount as AMQ client tends to add 1.
dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); dispatch = OpenWireMessageConverter.createMessageDispatch(message, this);
int size = dispatch.getMessage().getSize(); int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId()); reference.setProtocolData(dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch); session.deliverMessage(dispatch);
@ -215,7 +216,6 @@ public class AMQConsumer {
* Notice that we will start a new transaction on the cases where there is no transaction. */ * Notice that we will start a new transaction on the cases where there is no transaction. */
public void acknowledge(MessageAck ack) throws Exception { public void acknowledge(MessageAck ack) throws Exception {
MessageId first = ack.getFirstMessageId(); MessageId first = ack.getFirstMessageId();
MessageId last = ack.getLastMessageId(); MessageId last = ack.getLastMessageId();
@ -252,6 +252,10 @@ public class AMQConsumer {
} }
else if (ack.isPoisonAck()) { else if (ack.isPoisonAck()) {
for (MessageReference ref : ackList) { for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString());
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref); ref.getQueue().sendToDeadLetterAddress(transaction, ref);
} }
} }
@ -303,6 +307,22 @@ public class AMQConsumer {
} }
} }
@Override
public void updateForCanceledRef(MessageReference ref) {
long seqId = ref.getMessage().getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId();
ServerMessage coreMessage = ref.getMessage();
int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER);
if (openwireDestination.isTopic()) {
redeliveryCounter++;
coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
}
else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) {
redeliveryCounter++;
coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
}
}
/** /**
* The MessagePullHandler is used with slow consumer policies. * The MessagePullHandler is used with slow consumer policies.
* */ * */

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
/**
*/
public interface ConsumerListener {
void updateForCanceledRef(MessageReference ref);
}

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConsumerListener;
import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -65,6 +66,7 @@ import org.apache.activemq.artemis.utils.TypedProperties;
* Concrete implementation of a ClientConsumer. * Concrete implementation of a ClientConsumer.
*/ */
public class ServerConsumerImpl implements ServerConsumer, ReadyListener { public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
//private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log");
// Constants ------------------------------------------------------------------------------------ // Constants ------------------------------------------------------------------------------------
private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -600,6 +602,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// before failure // before failure
ref.decrementDeliveryCount(); ref.decrementDeliveryCount();
} }
if (this.protocolData instanceof ConsumerListener) {
((ConsumerListener)protocolData).updateForCanceledRef(ref);
}
} }
@Override @Override

View File

@ -317,6 +317,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected void doClose(final boolean failed) throws Exception { protected void doClose(final boolean failed) throws Exception {
synchronized (this) { synchronized (this) {
this.setStarted(false);
if (closed) if (closed)
return; return;