From f30ca44c825efa365380b1a977295637c7cbff92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Mon, 22 Oct 2018 16:31:00 +0100 Subject: [PATCH] ARTEMIS-2142 Support JMSXGroupSeq -1 to close/reset group. Add test cases Add GroupSequence to Message Interface Implement Support closing/reset group in queue impl Update Documentation (copy from activemq5) Change/Fix OpenWireMessageConverter to use default of 0 if not set, for OpenWire as per documentation http://activemq.apache.org/activemq-message-properties.html --- .../activemq/artemis/api/core/Message.java | 6 + .../core/message/impl/CoreMessage.java | 6 + .../activemq/artemis/reader/MessageUtil.java | 6 +- .../artemis/jms/client/ActiveMQMessage.java | 123 +++++++++---- .../protocol/amqp/broker/AMQPMessage.java | 11 ++ .../openwire/OpenWireMessageConverter.java | 4 +- .../core/server/ActiveMQServerLogger.java | 4 + .../artemis/core/server/impl/QueueImpl.java | 34 +++- docs/user-manual/en/message-grouping.md | 17 ++ .../amqp/JMSMessageGroupsTest.java | 161 ++++++++++++++++-- 10 files changed, 323 insertions(+), 49 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 667f95f673..953e11264a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -113,6 +113,8 @@ public interface Message { */ SimpleString HDR_GROUP_ID = new SimpleString("_AMQ_GROUP_ID"); + SimpleString HDR_GROUP_SEQUENCE = new SimpleString("_AMQ_GROUP_SEQUENCE"); + /** * to determine if the Large Message was compressed. */ @@ -248,6 +250,10 @@ public interface Message { return null; } + default int getGroupSequence() { + return 0; + } + SimpleString getReplyTo(); Message setReplyTo(SimpleString address); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 62a81a17df..d7165598c2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -289,6 +289,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return this.getSimpleStringProperty(Message.HDR_GROUP_ID); } + @Override + public int getGroupSequence() { + final Integer integer = this.getIntProperty(Message.HDR_GROUP_SEQUENCE); + return integer == null ? 0 : integer; + } + /** * @param sendBuffer * @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java index db1f3dc335..0eceb61b2a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; - import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; @@ -49,6 +48,8 @@ public class MessageUtil { public static final String JMSXGROUPID = "JMSXGroupID"; + public static final String JMSXGROUPSEQ = "JMSXGroupSeq"; + public static final String JMSXUSERID = "JMSXUserID"; public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID"); @@ -154,6 +155,8 @@ public class MessageUtil { for (SimpleString propName : message.getPropertyNames()) { if (propName.equals(Message.HDR_GROUP_ID)) { set.add(MessageUtil.JMSXGROUPID); + } else if (propName.equals(Message.HDR_GROUP_SEQUENCE)) { + set.add(MessageUtil.JMSXGROUPSEQ); } else if (propName.equals(Message.HDR_VALIDATED_USER)) { set.add(MessageUtil.JMSXUSERID); } else if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE) && !propName.startsWith(Message.HDR_ROUTE_TO_IDS)) { @@ -169,6 +172,7 @@ public class MessageUtil { public static boolean propertyExists(Message message, String name) { return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) || (MessageUtil.JMSXGROUPID.equals(name) && message.containsProperty(Message.HDR_GROUP_ID)) || + (MessageUtil.JMSXGROUPSEQ.equals(name) && message.containsProperty(Message.HDR_GROUP_SEQUENCE)) || (MessageUtil.JMSXUSERID.equals(name) && message.containsProperty(Message.HDR_VALIDATED_USER)); } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index a3360ef135..791caa8c56 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.jms.client; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.IllegalStateException; @@ -30,11 +36,6 @@ import javax.management.openmbean.CompositeDataSupport; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.OutputStream; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -561,12 +562,14 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public int getIntProperty(final String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return message.getDeliveryCount(); - } - try { - return message.getIntProperty(name); + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { + return message.getDeliveryCount(); + } else if (MessageUtil.JMSXGROUPSEQ.equals(name)) { + return message.getGroupSequence(); + } else { + return message.getIntProperty(name); + } } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -574,12 +577,14 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public long getLongProperty(final String name) throws JMSException { - if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return message.getDeliveryCount(); - } - try { - return message.getLongProperty(name); + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { + return message.getDeliveryCount(); + } else if (MessageUtil.JMSXGROUPSEQ.equals(name)) { + return message.getGroupSequence(); + } else { + return message.getLongProperty(name); + } } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -611,7 +616,9 @@ public class ActiveMQMessage implements javax.jms.Message { try { if (MessageUtil.JMSXGROUPID.equals(name)) { - return message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID); + return Objects.toString(message.getGroupID(), null); + } else if (MessageUtil.JMSXGROUPSEQ.equals(name)) { + return Integer.toString(message.getGroupSequence()); } else if (MessageUtil.JMSXUSERID.equals(name)) { return message.getValidatedUserID(); } else { @@ -624,13 +631,20 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public Object getObjectProperty(final String name) throws JMSException { + final Object val; if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { - return String.valueOf(message.getDeliveryCount()); + val = message.getDeliveryCount(); + } else if (MessageUtil.JMSXGROUPID.equals(name)) { + val = message.getGroupID(); + } else if (MessageUtil.JMSXGROUPSEQ.equals(name)) { + val = message.getGroupSequence(); + } else if (MessageUtil.JMSXUSERID.equals(name)) { + val = message.getValidatedUserID(); + } else { + val = message.getObjectProperty(name); } - - Object val = message.getObjectProperty(name); if (val instanceof SimpleString) { - val = val.toString(); + return val.toString(); } return val; } @@ -662,12 +676,18 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public void setIntProperty(final String name, final int value) throws JMSException { checkProperty(name); + if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) { + return; + } message.putIntProperty(name, value); } @Override public void setLongProperty(final String name, final long value) throws JMSException { checkProperty(name); + if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) { + return; + } message.putLongProperty(name, value); } @@ -687,9 +707,11 @@ public class ActiveMQMessage implements javax.jms.Message { public void setStringProperty(final String name, final String value) throws JMSException { checkProperty(name); - if (handleCoreProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) { + if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) { return; - } else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) { + } else if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) { + return; + } else if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) { return; } else { message.putStringProperty(name, value); @@ -698,11 +720,13 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public void setObjectProperty(final String name, final Object value) throws JMSException { - if (handleCoreProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) { + if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) { return; } - - if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) { + if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) { + return; + } + if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) { return; } @@ -716,14 +740,14 @@ public class ActiveMQMessage implements javax.jms.Message { return; } - checkProperty(name); - if (ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM.equals(name)) { setInputStream((InputStream) value); return; } + checkProperty(name); + try { message.putObjectProperty(name, value); } catch (ActiveMQPropertyConversionException e) { @@ -979,10 +1003,47 @@ public class ActiveMQMessage implements javax.jms.Message { } } - private boolean handleCoreProperty(final String name, - final Object value, - String jmsPropertyName, - SimpleString corePropertyName) { + private boolean handleCoreIntegerProperty(final String name, + final Object value, + String jmsPropertyName, + SimpleString corePropertyName) { + if (jmsPropertyName.equals(name)) { + return handleCoreIntegerProperty(name, getInteger(value), jmsPropertyName, corePropertyName); + } + return false; + } + + private boolean handleCoreIntegerProperty(final String name, + final int value, + String jmsPropertyName, + SimpleString corePropertyName) { + boolean result = false; + + if (jmsPropertyName.equals(name)) { + message.putIntProperty(corePropertyName, value); + result = true; + } + + return result; + } + + private static int getInteger(final Object value) { + Objects.requireNonNull(value); + final int integer; + if (value instanceof Integer) { + integer = (Integer) value; + } else if (value instanceof Number) { + integer = ((Number) value).intValue(); + } else { + integer = Integer.parseInt(value.toString()); + } + return integer; + } + + private boolean handleCoreStringProperty(final String name, + final Object value, + String jmsPropertyName, + SimpleString corePropertyName) { boolean result = false; if (jmsPropertyName.equals(name)) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index c7f1b506f8..ab6733052d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1099,6 +1099,17 @@ public class AMQPMessage extends RefCountMessage { } } + @Override + public int getGroupSequence() { + ensureMessageDataScanned(); + + if (properties != null && properties.getGroupSequence() != null) { + return properties.getGroupSequence().intValue(); + } else { + return 0; + } + } + @Override public Long getScheduledDeliveryTime() { if (scheduledTime < 0) { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 49d897de58..e958267f5c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -83,7 +83,7 @@ public final class OpenWireMessageConverter { private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID"); private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE"); private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID; - private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE"); + private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE; private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID"); private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION"); private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID"); @@ -616,7 +616,7 @@ public final class OpenWireMessageConverter { Integer groupSequence = (Integer) coreMessage.getObjectProperty(AMQ_MSG_GROUP_SEQUENCE); if (groupSequence == null) { - groupSequence = -1; + groupSequence = 0; } amqMsg.setGroupSequence(groupSequence); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index cad835aa2e..43bed889ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1613,6 +1613,10 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void problemAddingConfigReloadCallback(String propertyName, @Cause Exception e); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222278, value = "Unable to extract GroupSequence from message", format = Message.Format.MESSAGE_FORMAT) + void unableToExtractGroupSequence(@Cause Throwable e); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 747db5c6ac..292cfc1e52 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2531,8 +2531,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { removeMessageReference(holder, ref); - if (groupID != null && groupConsumer == null && redistributor == null) { - groups.put(groupID, consumer); + if (redistributor == null) { + handleMessageGroup(ref, consumer, groupConsumer, groupID); } handled++; @@ -2635,6 +2635,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + private int extractGroupSequence(MessageReference ref) { + if (internalQueue) { + return 0; + } else { + try { + // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever + return ref.getMessage().getGroupSequence(); + } catch (Throwable e) { + ActiveMQServerLogger.LOGGER.unableToExtractGroupSequence(e); + return 0; + } + } + } + protected void refRemoved(MessageReference ref) { queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate()); pendingMetrics.decrementMetrics(ref); @@ -3110,8 +3124,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - if (groupID != null && groupConsumer == null && redistributor == null) { - groups.put(groupID, consumer); + + if (redistributor == null) { + handleMessageGroup(ref, consumer, groupConsumer, groupID); } messagesAdded.incrementAndGet(); @@ -3130,6 +3145,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) { + if (groupID != null) { + if (extractGroupSequence(ref) == -1) { + groups.remove(groupID); + } + if (groupConsumer == null) { + groups.put(groupID, consumer); + } + } + } + private void proceedDeliver(Consumer consumer, MessageReference reference) { try { consumer.proceedDeliver(reference); diff --git a/docs/user-manual/en/message-grouping.md b/docs/user-manual/en/message-grouping.md index 050832747f..1c4e7efbd7 100644 --- a/docs/user-manual/en/message-grouping.md +++ b/docs/user-manual/en/message-grouping.md @@ -81,6 +81,23 @@ java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialCont connectionFactory.myConnectionFactory=tcp://localhost:61616?groupID=Group-0 ``` + +#### Closing a Message Group +You generally don't need to close a message group, you just keep using it. + +However if you really do want to close a group you can add a negative sequence number. + +Example: +```java +Mesasge message = session.createTextMessage("hey"); +message.setStringProperty("JMSXGroupID", "Group-0"); +message.setIntProperty("JMSXGroupSeq", -1); +... +producer.send(message); +``` + +This then closes the message group so if another message is sent in the future with the same message group ID it will be reassigned to a new consumer. + ## Example See the [Message Group Example](examples.md#message-group) which shows how diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java index 628c814e8d..b0bb8a6315 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java @@ -21,12 +21,15 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; - +import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,25 +41,160 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport { private static final int ITERATIONS = 10; private static final int MESSAGE_COUNT = 10; private static final int MESSAGE_SIZE = 10 * 1024; - private static final int RECEIVE_TIMEOUT = 3000; + private static final int RECEIVE_TIMEOUT = 1000; private static final String JMSX_GROUP_ID = "JmsGroupsTest"; + private ConnectionSupplier AMQPConnection = () -> createConnection(); + private ConnectionSupplier CoreConnection = () -> createCoreConnection(); + private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection(); + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + @Test(timeout = 60000) - public void testGroupSeqIsNeverLost() throws Exception { + public void testMessageGroupsAMQPProducerAMQPConsumer() throws Exception { + testMessageGroups(AMQPConnection, AMQPConnection); + } + + @Test(timeout = 60000) + public void testMessageGroupsCoreProducerCoreConsumer() throws Exception { + testMessageGroups(CoreConnection, CoreConnection); + } + + @Test(timeout = 60000) + public void testMessageGroupsCoreProducerAMQPConsumer() throws Exception { + testMessageGroups(CoreConnection, AMQPConnection); + } + + @Test(timeout = 60000) + public void testMessageGroupsAMQPProducerCoreConsumer() throws Exception { + testMessageGroups(AMQPConnection, CoreConnection); + } + + @Test(timeout = 60000) + public void testMessageGroupsOpenWireProducerOpenWireConsumer() throws Exception { + testMessageGroups(OpenWireConnection, OpenWireConnection); + } + + @Test(timeout = 60000) + public void testMessageGroupsCoreProducerOpenWireConsumer() throws Exception { + testMessageGroups(CoreConnection, OpenWireConnection); + } + + @Test(timeout = 60000) + public void testMessageGroupsOpenWireProducerCoreConsumer() throws Exception { + testMessageGroups(OpenWireConnection, CoreConnection); + } + + @Test(timeout = 60000) + public void testMessageGroupsAMQPProducerOpenWireConsumer() throws Exception { + testMessageGroups(AMQPConnection, OpenWireConnection); + } + + @Test(timeout = 60000) + public void testMessageGroupsOpenWireProducerAMQPConsumer() throws Exception { + testMessageGroups(OpenWireConnection, AMQPConnection); + } + + + public void testMessageGroups(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { + testGroupSeqIsNeverLost(producerConnectionSupplier, consumerConnectionSupplier); + testGroupSeqCloseGroup(producerConnectionSupplier, consumerConnectionSupplier); + } + + + public void testGroupSeqCloseGroup(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { + final QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(getQueueName())); + + try (Connection producerConnection = producerConnectionSupplier.createConnection(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(producerSession.createQueue(getQueueName())); + + Connection consumerConnection = producerConnectionSupplier.createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName())); + MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName())); + MessageConsumer consumer3 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()))) { + + producerConnection.start(); + consumerConnection.start(); + + //Ensure group and close group, ensuring group is closed + sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer1, consumer2, consumer3, queueBinding); + + //Ensure round robin on group to consumer assignment (consumer2 now), then close group again + sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer2, consumer3, consumer1, queueBinding); + + //Ensure round robin on group to consumer assignment (consumer3 now), then close group again + sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer3, consumer1, consumer1, queueBinding); + + + } + } + + private void sendAndConsumeAndThenCloseGroup(Session producerSession, MessageProducer producer, MessageConsumer expectedGroupConsumer, MessageConsumer consumerA, MessageConsumer consumerB, QueueBinding queueBinding) throws JMSException { + + for (int j = 1; j <= MESSAGE_COUNT; j++) { + TextMessage message = producerSession.createTextMessage(); + message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID); + message.setIntProperty("JMSXGroupSeq", j); + message.setText("Message" + j); + + producer.send(message); + } + + //Group should have been reset and next consumer chosen, as such all msgs should now go to the second consumer (round robin'd) + for (int j = 1; j <= MESSAGE_COUNT; j++) { + TextMessage tm = (TextMessage) expectedGroupConsumer.receive(RECEIVE_TIMEOUT); + assertNotNull(tm); + assertEquals(JMSX_GROUP_ID, tm.getStringProperty("JMSXGroupID")); + assertEquals(j, tm.getIntProperty("JMSXGroupSeq")); + assertEquals("Message" + j, tm.getText()); + + assertNull(consumerA.receiveNoWait()); + assertNull(consumerB.receiveNoWait()); + } + + assertEquals(1, queueBinding.getQueue().getGroupCount()); + + TextMessage message = producerSession.createTextMessage(); + message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID); + //Close Group using -1 JMSXGroupSeq + message.setIntProperty("JMSXGroupSeq", -1); + message.setText("Message" + " group close"); + + producer.send(message); + + TextMessage receivedGroupCloseMessage = (TextMessage) expectedGroupConsumer.receive(RECEIVE_TIMEOUT); + assertNotNull(receivedGroupCloseMessage); + assertEquals(JMSX_GROUP_ID, receivedGroupCloseMessage.getStringProperty("JMSXGroupID")); + assertEquals(-1, receivedGroupCloseMessage.getIntProperty("JMSXGroupSeq")); + assertEquals("group close should goto the existing group consumer", "Message" + " group close", receivedGroupCloseMessage.getText()); + + assertNull(consumerA.receiveNoWait()); + assertNull(consumerB.receiveNoWait()); + + assertEquals(0, queueBinding.getQueue().getGroupCount()); + + } + + + public void testGroupSeqIsNeverLost(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { AtomicInteger sequenceCounter = new AtomicInteger(); + AtomicInteger consumedSequenceCounter = new AtomicInteger(); for (int i = 0; i < ITERATIONS; ++i) { - Connection connection = createConnection(); - try { - sendMessagesToBroker(connection, MESSAGE_COUNT, sequenceCounter); - readMessagesOnBroker(connection, MESSAGE_COUNT); - } finally { - connection.close(); + try (Connection producerConnection = producerConnectionSupplier.createConnection(); + Connection consumerConnection = producerConnectionSupplier.createConnection()) { + sendMessagesToBroker(producerConnection, MESSAGE_COUNT, sequenceCounter); + readMessagesOnBroker(consumerConnection, MESSAGE_COUNT, consumedSequenceCounter); } } } - protected void readMessagesOnBroker(Connection connection, int count) throws Exception { + protected void readMessagesOnBroker(Connection connection, int count, AtomicInteger sequence) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(getQueueName()); MessageConsumer consumer = session.createConsumer(queue); @@ -66,9 +204,10 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport { assertNotNull(message); LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName()); String gid = message.getStringProperty("JMSXGroupID"); - String seq = message.getStringProperty("JMSXGroupSeq"); + int seq = message.getIntProperty("JMSXGroupSeq"); LOG.debug("Message assigned JMSXGroupID := {}", gid); LOG.debug("Message assigned JMSXGroupSeq := {}", seq); + assertEquals("Sequence order should match", sequence.incrementAndGet(), seq); } session.close();