From 18bcd21c3ee91f86ff3013a57765be3ae2358e93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Wed, 17 Apr 2019 11:27:14 +0100 Subject: [PATCH] ARTEMIS-2306 Make group first off by default, unless configured --- .../config/ActiveMQDefaultConfiguration.java | 2 +- .../artemis/core/server/impl/QueueImpl.java | 8 +- .../schema/artemis-configuration.xsd | 2 +- .../test/resources/artemis-configuration.xsd | 2 +- docs/user-manual/en/message-grouping.md | 38 +++++++ .../amqp/JMSMessageGroupsTest.java | 98 ++++++++++++++++--- .../integration/jms/client/GroupingTest.java | 25 ++--- 7 files changed, 142 insertions(+), 33 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 433d3c9f7e..8547414abb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -523,7 +523,7 @@ public final class ActiveMQDefaultConfiguration { public static final boolean DEFAULT_GROUP_REBALANCE = false; - public static final SimpleString DEFAULT_GROUP_FIRST_KEY = SimpleString.toSimpleString("JMSXFirstInGroupID"); + public static final SimpleString DEFAULT_GROUP_FIRST_KEY = null; public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 5c241433f2..56bae5422f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3256,7 +3256,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (exclusive) { if (groupConsumer == null) { exclusiveConsumer = consumer; - return new GroupFirstMessageReference(groupFirstKey, ref); + if (groupFirstKey != null) { + return new GroupFirstMessageReference(groupFirstKey, ref); + } } consumers.repeat(); } else if (groupID != null) { @@ -3265,7 +3267,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { consumers.repeat(); } else if (groupConsumer == null) { groups.put(groupID, consumer); - return new GroupFirstMessageReference(groupFirstKey, ref); + if (groupFirstKey != null) { + return new GroupFirstMessageReference(groupFirstKey, ref); + } } else { consumers.repeat(); } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index d1484d4726..957f8b4e6a 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3056,7 +3056,7 @@ - + key used to mark a message is first in a group for a consumer diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index db5141dffb..1dbfb74ff7 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -3056,7 +3056,7 @@ - + key used to mark a message is first in a group for a consumer diff --git a/docs/user-manual/en/message-grouping.md b/docs/user-manual/en/message-grouping.md index b53c3a689a..405b86878f 100644 --- a/docs/user-manual/en/message-grouping.md +++ b/docs/user-manual/en/message-grouping.md @@ -98,6 +98,44 @@ producer.send(message); This then closes the message group so if another message is sent in the future with the same message group ID it will be reassigned to a new consumer. +#### Notifying Consumer of Group Ownership change + +ActiveMQ supports putting a boolean header, set on the first message sent to a consumer for a particular message group. + +To enable this, you must set a header key that the broker will use to set the flag. + +In the examples we use `JMSXGroupFirstForConsumer` but it can be any header key value you want. + + +By setting `group-first-key` to `JMSXGroupFirstForConsumer` at the queue level, every time a new group is assigned a consumer the header `JMSXGroupFirstForConsumer` will be set to true on the first message. + +```xml +
+ + + +
+``` + +Or on auto-create when using the JMS Client by using address parameters when +creating the destination used by the consumer. + +```java +Queue queue = session.createQueue("my.destination.name?group-first-key=JMSXGroupFirstForConsumer"); +Topic topic = session.createTopic("my.destination.name?group-first-key=JMSXGroupFirstForConsumer"); +``` + +Also the default for all queues under and address can be defaulted using the +`address-setting` configuration: + +```xml + + JMSXGroupFirstForConsumer + +``` + +By default this is null, and therefor OFF. + #### Rebalancing Message Groups Sometimes after new consumers are added you can find that if you have long lived groups, that they have no groups assigned, and thus are not being utilised, this is because the long lived groups will already be assigned to existing consumers. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java index e23c788ae5..5f7d22e4ea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -30,6 +31,9 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +57,23 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport { return "AMQP,OPENWIRE,CORE"; } + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + super.configureAddressPolicy(server); + + AddressSettings addressSettings = new AddressSettings(); + + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + addressSettings.setAutoCreateQueues(isAutoCreateQueues()); + addressSettings.setAutoCreateAddresses(isAutoCreateAddresses()); + addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress())); + addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress())); + addressSettings.setDefaultGroupFirstKey(SimpleString.toSimpleString("JMSXFirstInGroupID")); + + + server.getConfiguration().getAddressesSettings().put("GroupFirst.#", addressSettings); + } + @Test(timeout = 60000) public void testMessageGroupsAMQPProducerAMQPConsumer() throws Exception { testMessageGroups(AMQPConnection, AMQPConnection); @@ -102,6 +123,8 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport { public void testMessageGroups(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { testGroupSeqIsNeverLost(producerConnectionSupplier, consumerConnectionSupplier); testGroupSeqCloseGroup(producerConnectionSupplier, consumerConnectionSupplier); + testGroupFirst(producerConnectionSupplier, consumerConnectionSupplier); + testGroupFirstDefaultOff(producerConnectionSupplier, consumerConnectionSupplier); } @@ -184,19 +207,72 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport { public void testGroupSeqIsNeverLost(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { AtomicInteger sequenceCounter = new AtomicInteger(); AtomicInteger consumedSequenceCounter = new AtomicInteger(); + String queueName = getQueueName(); for (int i = 0; i < ITERATIONS; ++i) { try (Connection producerConnection = producerConnectionSupplier.createConnection(); - Connection consumerConnection = producerConnectionSupplier.createConnection()) { - sendMessagesToBroker(producerConnection, MESSAGE_COUNT, sequenceCounter); - readMessagesOnBroker(consumerConnection, MESSAGE_COUNT, consumedSequenceCounter); + Connection consumerConnection = consumerConnectionSupplier.createConnection()) { + sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT, sequenceCounter); + readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT, consumedSequenceCounter, null); } } } - protected void readMessagesOnBroker(Connection connection, int count, AtomicInteger sequence) throws Exception { + public void testGroupFirst(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { + AtomicInteger sequenceCounter = new AtomicInteger(); + AtomicInteger consumedSequenceCounter = new AtomicInteger(); + //Use a queue that IS pre-fixed with GroupFirst so should full under Group First address settings + String queueName = "GroupFirst." + getQueueName(); + + for (int i = 0; i < ITERATIONS; ++i) { + try (Connection producerConnection = producerConnectionSupplier.createConnection(); + Connection consumerConnection = consumerConnectionSupplier.createConnection()) { + sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT, sequenceCounter); + readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT, consumedSequenceCounter, this::groupFirstCheck); + } + } + } + + private void groupFirstCheck(int i, Message message) { + try { + if (i == 0) { + assertTrue("Message should be marked with first in Group", message.getBooleanProperty("JMSXFirstInGroupID")); + } else { + assertFalse("Message should NOT be marked with first in Group", message.propertyExists("JMSXFirstInGroupID")); + } + } catch (JMSException e) { + fail(e.getMessage()); + } + } + + public void testGroupFirstDefaultOff(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { + AtomicInteger sequenceCounter = new AtomicInteger(); + AtomicInteger consumedSequenceCounter = new AtomicInteger(); + //Use a queue that IS NOT pre-fixed with GroupFirst so should full under default address settings. + String queueName = getQueueName(); + + for (int i = 0; i < ITERATIONS; ++i) { + try (Connection producerConnection = producerConnectionSupplier.createConnection(); + Connection consumerConnection = consumerConnectionSupplier.createConnection()) { + sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT, sequenceCounter); + readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT, consumedSequenceCounter, this::groupFirstOffCheck); + } + } + } + + private void groupFirstOffCheck(int i, Message message) { + try { + assertFalse("Message should NOT be marked with first in Group", message.propertyExists("JMSXFirstInGroupID")); + } catch (JMSException e) { + fail(e.getMessage()); + } + } + + + + protected void readMessagesOnBroker(String queueName, Connection connection, int count, AtomicInteger sequence, BiConsumer additionalCheck) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(getQueueName()); + Queue queue = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(queue); for (int i = 0; i < MESSAGE_COUNT; ++i) { @@ -208,19 +284,19 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport { LOG.debug("Message assigned JMSXGroupID := {}", gid); LOG.debug("Message assigned JMSXGroupSeq := {}", seq); assertEquals("Sequence order should match", sequence.incrementAndGet(), seq); - if (i == 0) { - assertTrue("Message should be marked with first in Group", message.getBooleanProperty("JMSXFirstInGroupID")); - } else { - assertFalse("Message should NOT be marked with first in Group", message.propertyExists("JMSXFirstInGroupID")); + if (additionalCheck != null) { + additionalCheck.accept(i, message); } } session.close(); } - protected void sendMessagesToBroker(Connection connection, int count, AtomicInteger sequence) throws Exception { + + + protected void sendMessagesToBroker(String queueName, Connection connection, int count, AtomicInteger sequence) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(getQueueName()); + Queue queue = session.createQueue(queueName); MessageProducer producer = session.createProducer(queue); byte[] buffer = new byte[MESSAGE_SIZE]; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java index 2dc3bcc2ce..ba8cd95304 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java @@ -30,7 +30,6 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.UUID; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -521,17 +520,10 @@ public class GroupingTest extends JMSTestBase { ctx.close(); } - @Test - public void testDefaultGroupFirstKey() throws Exception { - testGroupFirstKey(null); - } @Test - public void testCustomGroupFirstKey() throws Exception { - testGroupFirstKey("my-custom-key"); - } - - private void testGroupFirstKey(String customFirstGroupKey) throws Exception { + public void testGroupFirstKey() throws Exception { + String customFirstGroupKey = "my-custom-key"; ConnectionFactory fact = getCF(); Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup()); Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null); @@ -570,7 +562,6 @@ public class GroupingTest extends JMSTestBase { ctx.commit(); - String firstGroupKey = customFirstGroupKey == null ? ActiveMQDefaultConfiguration.getDefaultGroupFirstKey().toString() : customFirstGroupKey; //First set of msgs should go to the first consumer only for (int j = 0; j < 10; j++) { TextMessage tm = (TextMessage) consumer1.receive(10000); @@ -579,9 +570,9 @@ public class GroupingTest extends JMSTestBase { assertEquals("Message" + j, tm.getText()); assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1); if (j == 0) { - assertTrue(tm.getBooleanProperty(firstGroupKey)); + assertTrue(tm.getBooleanProperty(customFirstGroupKey)); } else { - assertFalse(tm.getBooleanProperty(firstGroupKey)); + assertFalse(tm.propertyExists(customFirstGroupKey)); } } @@ -598,9 +589,9 @@ public class GroupingTest extends JMSTestBase { assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2); if (j == 10) { - assertTrue(tm.getBooleanProperty(firstGroupKey)); + assertTrue(tm.getBooleanProperty(customFirstGroupKey)); } else { - assertFalse(tm.getBooleanProperty(firstGroupKey)); + assertFalse(tm.propertyExists(customFirstGroupKey)); } } @@ -617,9 +608,9 @@ public class GroupingTest extends JMSTestBase { assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3); if (j == 20) { - assertTrue(tm.getBooleanProperty(firstGroupKey)); + assertTrue(tm.getBooleanProperty(customFirstGroupKey)); } else { - assertFalse(tm.getBooleanProperty(firstGroupKey)); + assertFalse(tm.propertyExists(customFirstGroupKey)); } } ctx.commit();