ARTEMIS-468 Amendments to how redelivery count is handled on openwire

This commit is contained in:
Clebert Suconic 2016-04-07 12:12:16 -04:00
parent 8a998ad805
commit 50eac7c824
12 changed files with 71 additions and 61 deletions

View File

@ -343,6 +343,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
public void sendProducerCreditsMessage(int credits, SimpleString address) { public void sendProducerCreditsMessage(int credits, SimpleString address) {
} }
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
}
@Override @Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) { public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
} }

View File

@ -53,6 +53,12 @@ public class MQTTSessionCallback implements SessionCallback {
return 1; return 1;
} }
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
}
@Override @Override
public int sendLargeMessageContinuation(ServerConsumer consumerID, public int sendLargeMessageContinuation(ServerConsumer consumerID,
byte[] body, byte[] body,

View File

@ -745,7 +745,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
throw new IllegalStateException("Session not exist! : " + sessionId); throw new IllegalStateException("Session not exist! : " + sessionId);
} }
List<AMQConsumer> consumersList = amqSession.createConsumer(info, amqSession, new SlowConsumerDetection()); List<AMQConsumer> consumersList = amqSession.createConsumer(info, new SlowConsumerDetection());
this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList); this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
ss.addConsumer(info); ss.addConsumer(info);

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
@ -87,7 +88,6 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
public static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER";
private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID";
@ -373,7 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter {
} }
} }
coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter());
ActiveMQDestination replyTo = messageSend.getReplyTo(); ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) { if (replyTo != null) {
ByteSequence replyToBytes = marshaller.marshal(replyTo); ByteSequence replyToBytes = marshaller.marshal(replyTo);
@ -445,15 +444,15 @@ public class OpenWireMessageConverter implements MessageConverter {
} }
} }
public static MessageDispatch createMessageDispatch(ServerMessage message, public static MessageDispatch createMessageDispatch(MessageReference reference, ServerMessage message,
AMQConsumer consumer) throws IOException, JMSException { AMQConsumer consumer) throws IOException, JMSException {
ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination()); ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination());
//we can use core message id for sequenceId //we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
MessageDispatch md = new MessageDispatch(); MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId()); md.setConsumerId(consumer.getId());
md.setRedeliveryCounter(amqMessage.getRedeliveryCounter()); md.setRedeliveryCounter(reference.getDeliveryCount() - 1);
md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId()); md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
md.setMessage(amqMessage); md.setMessage(amqMessage);
ActiveMQDestination destination = amqMessage.getDestination(); ActiveMQDestination destination = amqMessage.getDestination();
@ -462,7 +461,7 @@ public class OpenWireMessageConverter implements MessageConverter {
return md; return md;
} }
private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { private static ActiveMQMessage toAMQMessage(MessageReference refernce, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
ActiveMQMessage amqMsg = null; ActiveMQMessage amqMsg = null;
byte coreType = coreMessage.getType(); byte coreType = coreMessage.getType();
switch (coreType) { switch (coreType) {
@ -762,10 +761,7 @@ public class OpenWireMessageConverter implements MessageConverter {
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
} }
Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER); amqMsg.setRedeliveryCounter(refernce.getDeliveryCount() - 1);
if (redeliveryCounter != null) {
amqMsg.setRedeliveryCounter(redeliveryCounter);
}
byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO); byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
if (replyToBytes != null) { if (replyToBytes != null) {

View File

@ -29,7 +29,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ConsumerListener;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -44,10 +43,9 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer implements ConsumerListener { public class AMQConsumer {
private AMQSession session; private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination openwireDestination; private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info; private ConsumerInfo info;
@ -186,7 +184,7 @@ public class AMQConsumer implements ConsumerListener {
return 0; return 0;
} }
dispatch = OpenWireMessageConverter.createMessageDispatch(message, this); dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
int size = dispatch.getMessage().getSize(); int size = dispatch.getMessage().getSize();
reference.setProtocolData(dispatch.getMessage().getMessageId()); reference.setProtocolData(dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch); session.deliverMessage(dispatch);
@ -307,19 +305,13 @@ public class AMQConsumer implements ConsumerListener {
} }
} }
@Override public void updateDeliveryCountAfterCancel(MessageReference ref) {
public void updateForCanceledRef(MessageReference ref) {
long seqId = ref.getMessage().getMessageID(); long seqId = ref.getMessage().getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId(); long lastDelSeqId = info.getLastDeliveredSequenceId();
ServerMessage coreMessage = ref.getMessage();
int redeliveryCounter = coreMessage.getIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER); // This is a specific rule of the protocol
if (openwireDestination.isTopic()) { if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) {
redeliveryCounter++; ref.decrementDeliveryCount();
coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
}
else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN || seqId <= lastDelSeqId) {
redeliveryCounter++;
coreMessage.putIntProperty(OpenWireMessageConverter.AMQ_MSG_REDELIVER_COUNTER, redeliveryCounter);
} }
} }

View File

@ -118,8 +118,19 @@ public class AMQSession implements SessionCallback {
} }
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
if (consumer.getProtocolData() != null) {
((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
return true;
}
else {
return false;
}
}
public List<AMQConsumer> createConsumer(ConsumerInfo info, public List<AMQConsumer> createConsumer(ConsumerInfo info,
AMQSession amqSession,
SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception { SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
//check destination //check destination
ActiveMQDestination dest = info.getDestination(); ActiveMQDestination dest = info.getDestination();

View File

@ -118,6 +118,11 @@ public class StompSession implements SessionCallback {
} }
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
}
@Override @Override
public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) { public int sendMessage(MessageReference ref, ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
LargeServerMessageImpl largeMessage = null; LargeServerMessageImpl largeMessage = null;

View File

@ -56,6 +56,11 @@ public final class CoreSessionCallback implements SessionCallback {
return connection.isWritable(callback); return connection.isWritable(callback);
} }
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
}
@Override @Override
public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { public int sendLargeMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount); Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount);

View File

@ -1,23 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
/**
*/
public interface ConsumerListener {
void updateForCanceledRef(MessageReference ref);
}

View File

@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConsumerListener;
import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -367,7 +366,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.incrementDeliveryCount(); ref.incrementDeliveryCount();
// If updateDeliveries = false (set by strict-update), // If updateDeliveries = false (set by strict-update),
// the updateDeliveryCount would still be updated after c // the updateDeliveryCountAfterCancel would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged()) { if (strictUpdateDeliveryCount && !ref.isPaged()) {
if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
!ref.getQueue().isInternalQueue() && !ref.getQueue().isInternalQueue() &&
@ -596,15 +595,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
if (!failed) { // We first update the deliveryCount at the protocol callback...
// We don't decrement delivery count if the client failed, since there's a possibility that refs // if that wasn't updated (if there is no specific logic, then we apply the default logic used on most protocols
// were actually delivered but we just didn't get any acks for them if (!callback.updateDeliveryCountAfterCancel(this, ref, failed)) {
// before failure if (!failed) {
ref.decrementDeliveryCount(); // We don't decrement delivery count if the client failed, since there's a possibility that refs
} // were actually delivered but we just didn't get any acks for them
// before failure
if (this.protocolData instanceof ConsumerListener) { ref.decrementDeliveryCount();
((ConsumerListener)protocolData).updateForCanceledRef(ref); }
} }
} }

View File

@ -33,6 +33,15 @@ public interface SessionCallback {
* like acks or other operations. */ * like acks or other operations. */
void afterDelivery() throws Exception; void afterDelivery() throws Exception;
/**
* Use this to updates specifics on the message after a redelivery happened.
* Return true if there was specific logic applied on the protocol, so the ServerConsumer won't make any adjustments.
* @param consumer
* @param ref
* @param failed
*/
boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed);
void sendProducerCreditsMessage(int credits, SimpleString address); void sendProducerCreditsMessage(int credits, SimpleString address);
void sendProducerCreditsFailMessage(int credits, SimpleString address); void sendProducerCreditsFailMessage(int credits, SimpleString address);

View File

@ -483,6 +483,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
targetCallback.sendProducerCreditsMessage(credits, address); targetCallback.sendProducerCreditsMessage(credits, address);
} }
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
}
@Override @Override
public void browserFinished(ServerConsumer consumer) { public void browserFinished(ServerConsumer consumer) {