ARTEMIS-4418 use consumer delivery sequence in messageId for openwire broker sequence id, makes delivery count calculation independent of message order

This commit is contained in:
Gary Tully 2023-09-06 17:51:47 +01:00
parent 9864e005d2
commit 91debf25db
5 changed files with 33 additions and 19 deletions

View File

@ -481,15 +481,16 @@ public final class OpenWireMessageConverter {
public static MessageDispatch createMessageDispatch(MessageReference reference,
ICoreMessage message,
WireFormat marshaller,
AMQConsumer consumer, UUID serverNodeUUID) throws IOException {
AMQConsumer consumer,
UUID serverNodeUUID,
long consumerDeliverySequenceId) throws IOException {
ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer, serverNodeUUID);
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
amqMessage.getMessageId().setBrokerSequenceId(consumerDeliverySequenceId);
MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId());
md.setRedeliveryCounter(reference.getDeliveryCount() - 1);
md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
md.setDeliverySequenceId(consumerDeliverySequenceId);
md.setMessage(amqMessage);
ActiveMQDestination destination = amqMessage.getDestination();
md.setDestination(destination);

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@ -81,6 +82,7 @@ public class AMQConsumer {
private boolean internalAddress = false;
private volatile Set<MessageReference> rolledbackMessageRefs;
private ScheduledFuture<?> delayedDispatchPrompter;
private AtomicLong deliveredSequenceId = new AtomicLong(0);
public AMQConsumer(AMQSession amqSession,
org.apache.activemq.command.ActiveMQDestination d,
@ -292,7 +294,7 @@ public class AMQConsumer {
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
}
//handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat()
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this, session.getCoreServer().getNodeManager().getUUID());
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this, session.getCoreServer().getNodeManager().getUUID(), deliveredSequenceId.getAndIncrement());
int size = dispatch.getMessage().getSize();
reference.setProtocolData(MessageId.class, dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
@ -446,7 +448,7 @@ public class AMQConsumer {
// treat as delivered
return true;
}
if (ref.getMessageID() <= info.getLastDeliveredSequenceId() && !isRolledBack(ref)) {
if (ref.getProtocolData(MessageId.class).getBrokerSequenceId() <= info.getLastDeliveredSequenceId() && !isRolledBack(ref)) {
// treat as delivered
return true;
}

View File

@ -73,7 +73,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID, i);
MessageId messageId = dispatch.getMessage().getMessageId();
assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
@ -95,7 +95,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID, i);
MessageId messageId = dispatch.getMessage().getMessageId();
assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
@ -114,7 +114,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String);
}
@ -130,7 +130,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch marshalled = (MessageDispatch) openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID)));
MessageDispatch marshalled = (MessageDispatch) openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0)));
assertEquals(5, marshalled.getMessage().getProperties().keySet().size());
Message converted = OpenWireMessageConverter.inbound(marshalled.getMessage(), openWireFormat, null);
for (int i = 0; i < 5; i++) {
@ -161,7 +161,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(PRODUCER_ID, classicMessageDispatch.getMessage().getProducerId().toString());
}
@ -178,7 +178,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(PRODUCER_ID, messageDispatch.getMessage().getProducerId().toString());
}
@ -194,7 +194,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(MESSAGE_ID, classicMessageDispatch.getMessage().getMessageId().toString());
}
@ -211,7 +211,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(MESSAGE_ID, messageDispatch.getMessage().getMessageId().toString());
}
@ -228,7 +228,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals("queue://" + DESTINATION, messageDispatch.getMessage().getOriginalDestination().toString());
}
@ -245,7 +245,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals("queue://" + DESTINATION, messageDispatch.getMessage().getReplyTo().toString());
}
@ -269,7 +269,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertNull(messageDispatch.getMessage().getProperty(hdrArrival));
assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime));

View File

@ -656,6 +656,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
browserDeliverer.close();
} else {
messageQueue.removeConsumer(this);
messageQueue.deliverAsync();
}
session.removeConsumer(id);

View File

@ -55,11 +55,20 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
}
@Test(timeout = 60_000)
public void testConsumerSingleMessageLoop() throws Exception {
public void testConsumerSingleMessageLoopExclusive() throws Exception {
doTestConsumerSingleMessageLoop(true);
}
@Test(timeout = 60_000)
public void testConsumerSingleMessageLoopNonExclusive() throws Exception {
doTestConsumerSingleMessageLoop(false);
}
public void doTestConsumerSingleMessageLoop(boolean exclusive) throws Exception {
Connection exConn = null;
SimpleString durableQueue = new SimpleString("exampleQueue");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(exclusive));
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
@ -157,6 +166,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
}
session.commit();
// force a local socket close such that the broker sees an exception on the connection and fails the consumer via close
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
exConn.close();
}