This commit is contained in:
Clebert Suconic 2019-04-17 16:25:43 -04:00
commit 8477dd95ee
7 changed files with 142 additions and 33 deletions

View File

@ -523,7 +523,7 @@ public final class ActiveMQDefaultConfiguration {
public static final boolean DEFAULT_GROUP_REBALANCE = false; public static final boolean DEFAULT_GROUP_REBALANCE = false;
public static final SimpleString DEFAULT_GROUP_FIRST_KEY = SimpleString.toSimpleString("JMSXFirstInGroupID"); public static final SimpleString DEFAULT_GROUP_FIRST_KEY = null;
public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST; public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST;

View File

@ -3256,7 +3256,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (exclusive) { if (exclusive) {
if (groupConsumer == null) { if (groupConsumer == null) {
exclusiveConsumer = consumer; exclusiveConsumer = consumer;
return new GroupFirstMessageReference(groupFirstKey, ref); if (groupFirstKey != null) {
return new GroupFirstMessageReference(groupFirstKey, ref);
}
} }
consumers.repeat(); consumers.repeat();
} else if (groupID != null) { } else if (groupID != null) {
@ -3265,7 +3267,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumers.repeat(); consumers.repeat();
} else if (groupConsumer == null) { } else if (groupConsumer == null) {
groups.put(groupID, consumer); groups.put(groupID, consumer);
return new GroupFirstMessageReference(groupFirstKey, ref); if (groupFirstKey != null) {
return new GroupFirstMessageReference(groupFirstKey, ref);
}
} else { } else {
consumers.repeat(); consumers.repeat();
} }

View File

@ -3056,7 +3056,7 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="default-group-first-key" type="xsd:string" default="JMSXFirstInGroupID" maxOccurs="1" minOccurs="0"> <xsd:element name="default-group-first-key" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
key used to mark a message is first in a group for a consumer key used to mark a message is first in a group for a consumer

View File

@ -3056,7 +3056,7 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="default-group-first-key" type="xsd:string" default="JMSXFirstInGroupID" maxOccurs="1" minOccurs="0"> <xsd:element name="default-group-first-key" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
key used to mark a message is first in a group for a consumer key used to mark a message is first in a group for a consumer

View File

@ -98,6 +98,44 @@ producer.send(message);
This then closes the message group so if another message is sent in the future with the same message group ID it will be reassigned to a new consumer. This then closes the message group so if another message is sent in the future with the same message group ID it will be reassigned to a new consumer.
#### Notifying Consumer of Group Ownership change
ActiveMQ supports putting a boolean header, set on the first message sent to a consumer for a particular message group.
To enable this, you must set a header key that the broker will use to set the flag.
In the examples we use `JMSXGroupFirstForConsumer` but it can be any header key value you want.
By setting `group-first-key` to `JMSXGroupFirstForConsumer` at the queue level, every time a new group is assigned a consumer the header `JMSXGroupFirstForConsumer` will be set to true on the first message.
```xml
<address name="foo.bar">
<multicast>
<queue name="orders1" group-first-key="JMSXGroupFirstForConsumer"/>
</multicast>
</address>
```
Or on auto-create when using the JMS Client by using address parameters when
creating the destination used by the consumer.
```java
Queue queue = session.createQueue("my.destination.name?group-first-key=JMSXGroupFirstForConsumer");
Topic topic = session.createTopic("my.destination.name?group-first-key=JMSXGroupFirstForConsumer");
```
Also the default for all queues under and address can be defaulted using the
`address-setting` configuration:
```xml
<address-setting match="my.address">
<default-group-first-key>JMSXGroupFirstForConsumer</default-group-first-key>
</address-setting>
```
By default this is null, and therefor OFF.
#### Rebalancing Message Groups #### Rebalancing Message Groups
Sometimes after new consumers are added you can find that if you have long lived groups, that they have no groups assigned, and thus are not being utilised, this is because the long lived groups will already be assigned to existing consumers. Sometimes after new consumers are added you can find that if you have long lived groups, that they have no groups assigned, and thus are not being utilised, this is because the long lived groups will already be assigned to existing consumers.

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
@ -30,6 +31,9 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -53,6 +57,23 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
return "AMQP,OPENWIRE,CORE"; return "AMQP,OPENWIRE,CORE";
} }
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
super.configureAddressPolicy(server);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setAutoCreateQueues(isAutoCreateQueues());
addressSettings.setAutoCreateAddresses(isAutoCreateAddresses());
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
addressSettings.setDefaultGroupFirstKey(SimpleString.toSimpleString("JMSXFirstInGroupID"));
server.getConfiguration().getAddressesSettings().put("GroupFirst.#", addressSettings);
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testMessageGroupsAMQPProducerAMQPConsumer() throws Exception { public void testMessageGroupsAMQPProducerAMQPConsumer() throws Exception {
testMessageGroups(AMQPConnection, AMQPConnection); testMessageGroups(AMQPConnection, AMQPConnection);
@ -102,6 +123,8 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
public void testMessageGroups(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { public void testMessageGroups(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
testGroupSeqIsNeverLost(producerConnectionSupplier, consumerConnectionSupplier); testGroupSeqIsNeverLost(producerConnectionSupplier, consumerConnectionSupplier);
testGroupSeqCloseGroup(producerConnectionSupplier, consumerConnectionSupplier); testGroupSeqCloseGroup(producerConnectionSupplier, consumerConnectionSupplier);
testGroupFirst(producerConnectionSupplier, consumerConnectionSupplier);
testGroupFirstDefaultOff(producerConnectionSupplier, consumerConnectionSupplier);
} }
@ -184,19 +207,72 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
public void testGroupSeqIsNeverLost(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { public void testGroupSeqIsNeverLost(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
AtomicInteger sequenceCounter = new AtomicInteger(); AtomicInteger sequenceCounter = new AtomicInteger();
AtomicInteger consumedSequenceCounter = new AtomicInteger(); AtomicInteger consumedSequenceCounter = new AtomicInteger();
String queueName = getQueueName();
for (int i = 0; i < ITERATIONS; ++i) { for (int i = 0; i < ITERATIONS; ++i) {
try (Connection producerConnection = producerConnectionSupplier.createConnection(); try (Connection producerConnection = producerConnectionSupplier.createConnection();
Connection consumerConnection = producerConnectionSupplier.createConnection()) { Connection consumerConnection = consumerConnectionSupplier.createConnection()) {
sendMessagesToBroker(producerConnection, MESSAGE_COUNT, sequenceCounter); sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT, sequenceCounter);
readMessagesOnBroker(consumerConnection, MESSAGE_COUNT, consumedSequenceCounter); readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT, consumedSequenceCounter, null);
} }
} }
} }
protected void readMessagesOnBroker(Connection connection, int count, AtomicInteger sequence) throws Exception { public void testGroupFirst(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
AtomicInteger sequenceCounter = new AtomicInteger();
AtomicInteger consumedSequenceCounter = new AtomicInteger();
//Use a queue that IS pre-fixed with GroupFirst so should full under Group First address settings
String queueName = "GroupFirst." + getQueueName();
for (int i = 0; i < ITERATIONS; ++i) {
try (Connection producerConnection = producerConnectionSupplier.createConnection();
Connection consumerConnection = consumerConnectionSupplier.createConnection()) {
sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT, sequenceCounter);
readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT, consumedSequenceCounter, this::groupFirstCheck);
}
}
}
private void groupFirstCheck(int i, Message message) {
try {
if (i == 0) {
assertTrue("Message should be marked with first in Group", message.getBooleanProperty("JMSXFirstInGroupID"));
} else {
assertFalse("Message should NOT be marked with first in Group", message.propertyExists("JMSXFirstInGroupID"));
}
} catch (JMSException e) {
fail(e.getMessage());
}
}
public void testGroupFirstDefaultOff(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
AtomicInteger sequenceCounter = new AtomicInteger();
AtomicInteger consumedSequenceCounter = new AtomicInteger();
//Use a queue that IS NOT pre-fixed with GroupFirst so should full under default address settings.
String queueName = getQueueName();
for (int i = 0; i < ITERATIONS; ++i) {
try (Connection producerConnection = producerConnectionSupplier.createConnection();
Connection consumerConnection = consumerConnectionSupplier.createConnection()) {
sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT, sequenceCounter);
readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT, consumedSequenceCounter, this::groupFirstOffCheck);
}
}
}
private void groupFirstOffCheck(int i, Message message) {
try {
assertFalse("Message should NOT be marked with first in Group", message.propertyExists("JMSXFirstInGroupID"));
} catch (JMSException e) {
fail(e.getMessage());
}
}
protected void readMessagesOnBroker(String queueName, Connection connection, int count, AtomicInteger sequence, BiConsumer<Integer, Message> additionalCheck) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < MESSAGE_COUNT; ++i) { for (int i = 0; i < MESSAGE_COUNT; ++i) {
@ -208,19 +284,19 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
LOG.debug("Message assigned JMSXGroupID := {}", gid); LOG.debug("Message assigned JMSXGroupID := {}", gid);
LOG.debug("Message assigned JMSXGroupSeq := {}", seq); LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
assertEquals("Sequence order should match", sequence.incrementAndGet(), seq); assertEquals("Sequence order should match", sequence.incrementAndGet(), seq);
if (i == 0) { if (additionalCheck != null) {
assertTrue("Message should be marked with first in Group", message.getBooleanProperty("JMSXFirstInGroupID")); additionalCheck.accept(i, message);
} else {
assertFalse("Message should NOT be marked with first in Group", message.propertyExists("JMSXFirstInGroupID"));
} }
} }
session.close(); session.close();
} }
protected void sendMessagesToBroker(Connection connection, int count, AtomicInteger sequence) throws Exception {
protected void sendMessagesToBroker(String queueName, Connection connection, int count, AtomicInteger sequence) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
byte[] buffer = new byte[MESSAGE_SIZE]; byte[] buffer = new byte[MESSAGE_SIZE];

View File

@ -30,7 +30,6 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.util.UUID; import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -521,17 +520,10 @@ public class GroupingTest extends JMSTestBase {
ctx.close(); ctx.close();
} }
@Test
public void testDefaultGroupFirstKey() throws Exception {
testGroupFirstKey(null);
}
@Test @Test
public void testCustomGroupFirstKey() throws Exception { public void testGroupFirstKey() throws Exception {
testGroupFirstKey("my-custom-key"); String customFirstGroupKey = "my-custom-key";
}
private void testGroupFirstKey(String customFirstGroupKey) throws Exception {
ConnectionFactory fact = getCF(); ConnectionFactory fact = getCF();
Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup()); Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup());
Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null); Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null);
@ -570,7 +562,6 @@ public class GroupingTest extends JMSTestBase {
ctx.commit(); ctx.commit();
String firstGroupKey = customFirstGroupKey == null ? ActiveMQDefaultConfiguration.getDefaultGroupFirstKey().toString() : customFirstGroupKey;
//First set of msgs should go to the first consumer only //First set of msgs should go to the first consumer only
for (int j = 0; j < 10; j++) { for (int j = 0; j < 10; j++) {
TextMessage tm = (TextMessage) consumer1.receive(10000); TextMessage tm = (TextMessage) consumer1.receive(10000);
@ -579,9 +570,9 @@ public class GroupingTest extends JMSTestBase {
assertEquals("Message" + j, tm.getText()); assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1); assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
if (j == 0) { if (j == 0) {
assertTrue(tm.getBooleanProperty(firstGroupKey)); assertTrue(tm.getBooleanProperty(customFirstGroupKey));
} else { } else {
assertFalse(tm.getBooleanProperty(firstGroupKey)); assertFalse(tm.propertyExists(customFirstGroupKey));
} }
} }
@ -598,9 +589,9 @@ public class GroupingTest extends JMSTestBase {
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2); assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
if (j == 10) { if (j == 10) {
assertTrue(tm.getBooleanProperty(firstGroupKey)); assertTrue(tm.getBooleanProperty(customFirstGroupKey));
} else { } else {
assertFalse(tm.getBooleanProperty(firstGroupKey)); assertFalse(tm.propertyExists(customFirstGroupKey));
} }
} }
@ -617,9 +608,9 @@ public class GroupingTest extends JMSTestBase {
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3); assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
if (j == 20) { if (j == 20) {
assertTrue(tm.getBooleanProperty(firstGroupKey)); assertTrue(tm.getBooleanProperty(customFirstGroupKey));
} else { } else {
assertFalse(tm.getBooleanProperty(firstGroupKey)); assertFalse(tm.propertyExists(customFirstGroupKey));
} }
} }
ctx.commit(); ctx.commit();