ARTEMIS-2378 respect openwire removeInfo lastSequenceId when dealing with delivery count

This commit is contained in:
gtully 2019-06-13 12:28:51 +01:00 committed by Clebert Suconic
parent 9021c89f05
commit d1add00b00
4 changed files with 72 additions and 20 deletions

View File

@ -113,6 +113,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
@ -866,6 +867,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
ss.addConsumer(info);
info.setLastDeliveredSequenceId(RemoveInfo.LAST_DELIVERED_UNKNOWN);
if (consumersList.size() == 0) {
return;
@ -1075,6 +1077,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
private void propagateLastSequenceId(SessionState sessionState, long lastDeliveredSequenceId) {
for (ConsumerState consumerState : sessionState.getConsumerStates()) {
consumerState.getInfo().setLastDeliveredSequenceId(lastDeliveredSequenceId);
}
}
CommandProcessor commandProcessorInstance = new CommandProcessor();
// This will listen for commands through the protocolmanager
@ -1181,6 +1189,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
state.removeSession(id);
propagateLastSequenceId(session, lastDeliveredSequenceId);
removeSession(context, session.getInfo());
return null;
}
@ -1632,6 +1641,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
//we let protocol manager to handle connection add/remove
try {
for (SessionState sessionState : state.getSessionStates()) {
propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
}
protocolManager.removeConnection(state.getInfo(), null);
} catch (Throwable e) {
// log

View File

@ -386,30 +386,17 @@ public class AMQConsumer {
}
public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
long lastDelSeqId = info.getLastDeliveredSequenceId();
//in activemq5, closing a durable subscription won't close the consumer
//at broker. Messages will be treated as if being redelivered to
//the same consumer.
if (this.info.isDurable() && this.getOpenwireDestination().isTopic()) {
if (RemoveInfo.LAST_DELIVERED_UNKNOWN == info.getLastDeliveredSequenceId()) {
// treat as delivered
return true;
}
//because delivering count is always one greater than redelivery count
//we adjust it down before further calculating.
ref.decrementDeliveryCount();
// This is a specific rule of the protocol
if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
// this takes care of un-acked messages in non-tx deliveries
// tx cases are handled by
// org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
ref.incrementDeliveryCount();
} else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !isRolledBack(ref)) {
ref.incrementDeliveryCount();
if (ref.getMessageID() <= info.getLastDeliveredSequenceId() && !isRolledBack(ref)) {
// treat as delivered
return true;
}
return true;
// default behaviour
return false;
}
/**

View File

@ -148,6 +148,7 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
m = consumer.receive(5000);
System.out.println("m2 received: " + m);
assertNotNull(m);
assertFalse("redelivered flag set", m.getJMSRedelivered());
// install another consumer while message dispatch is unacked/uncommitted
Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);

View File

@ -25,6 +25,8 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
@ -685,4 +687,54 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
}
@Test
public void verifyNoRedeliveryFlagAfterCloseNoReceive() throws Exception {
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
this.makeSureCoreQueueExist("TEST");
Queue queue = session.createQueue("TEST");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("test"));
} finally {
connection.close();
}
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST");
MessageConsumer consumer = session.createConsumer(queue);
TimeUnit.MILLISECONDS.sleep(500);
// nothing received
consumer.close();
// try again, expect no redelivery flag
consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
assertNotNull("Message null", message);
System.out.println("received message: " + message);
System.out.println("is redelivered: " + message.getJMSRedelivered());
assertFalse(message.getJMSRedelivered());
} finally {
connection.close();
}
}
}