ARTEMIS-2142 Support JMSXGroupSeq -1 to close/reset group.
Add test cases Add GroupSequence to Message Interface Implement Support closing/reset group in queue impl Update Documentation (copy from activemq5) Change/Fix OpenWireMessageConverter to use default of 0 if not set, for OpenWire as per documentation http://activemq.apache.org/activemq-message-properties.html
This commit is contained in:
parent
8bf549b7c6
commit
f30ca44c82
|
@ -113,6 +113,8 @@ public interface Message {
|
||||||
*/
|
*/
|
||||||
SimpleString HDR_GROUP_ID = new SimpleString("_AMQ_GROUP_ID");
|
SimpleString HDR_GROUP_ID = new SimpleString("_AMQ_GROUP_ID");
|
||||||
|
|
||||||
|
SimpleString HDR_GROUP_SEQUENCE = new SimpleString("_AMQ_GROUP_SEQUENCE");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* to determine if the Large Message was compressed.
|
* to determine if the Large Message was compressed.
|
||||||
*/
|
*/
|
||||||
|
@ -248,6 +250,10 @@ public interface Message {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default int getGroupSequence() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SimpleString getReplyTo();
|
SimpleString getReplyTo();
|
||||||
|
|
||||||
Message setReplyTo(SimpleString address);
|
Message setReplyTo(SimpleString address);
|
||||||
|
|
|
@ -289,6 +289,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
||||||
return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
|
return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getGroupSequence() {
|
||||||
|
final Integer integer = this.getIntProperty(Message.HDR_GROUP_SEQUENCE);
|
||||||
|
return integer == null ? 0 : integer;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param sendBuffer
|
* @param sendBuffer
|
||||||
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
|
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
|
||||||
|
|
|
@ -20,7 +20,6 @@ import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
@ -49,6 +48,8 @@ public class MessageUtil {
|
||||||
|
|
||||||
public static final String JMSXGROUPID = "JMSXGroupID";
|
public static final String JMSXGROUPID = "JMSXGroupID";
|
||||||
|
|
||||||
|
public static final String JMSXGROUPSEQ = "JMSXGroupSeq";
|
||||||
|
|
||||||
public static final String JMSXUSERID = "JMSXUserID";
|
public static final String JMSXUSERID = "JMSXUserID";
|
||||||
|
|
||||||
public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID");
|
public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID");
|
||||||
|
@ -154,6 +155,8 @@ public class MessageUtil {
|
||||||
for (SimpleString propName : message.getPropertyNames()) {
|
for (SimpleString propName : message.getPropertyNames()) {
|
||||||
if (propName.equals(Message.HDR_GROUP_ID)) {
|
if (propName.equals(Message.HDR_GROUP_ID)) {
|
||||||
set.add(MessageUtil.JMSXGROUPID);
|
set.add(MessageUtil.JMSXGROUPID);
|
||||||
|
} else if (propName.equals(Message.HDR_GROUP_SEQUENCE)) {
|
||||||
|
set.add(MessageUtil.JMSXGROUPSEQ);
|
||||||
} else if (propName.equals(Message.HDR_VALIDATED_USER)) {
|
} else if (propName.equals(Message.HDR_VALIDATED_USER)) {
|
||||||
set.add(MessageUtil.JMSXUSERID);
|
set.add(MessageUtil.JMSXUSERID);
|
||||||
} else if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE) && !propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
|
} else if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE) && !propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
|
||||||
|
@ -169,6 +172,7 @@ public class MessageUtil {
|
||||||
public static boolean propertyExists(Message message, String name) {
|
public static boolean propertyExists(Message message, String name) {
|
||||||
return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) ||
|
return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) ||
|
||||||
(MessageUtil.JMSXGROUPID.equals(name) && message.containsProperty(Message.HDR_GROUP_ID)) ||
|
(MessageUtil.JMSXGROUPID.equals(name) && message.containsProperty(Message.HDR_GROUP_ID)) ||
|
||||||
|
(MessageUtil.JMSXGROUPSEQ.equals(name) && message.containsProperty(Message.HDR_GROUP_SEQUENCE)) ||
|
||||||
(MessageUtil.JMSXUSERID.equals(name) && message.containsProperty(Message.HDR_VALIDATED_USER));
|
(MessageUtil.JMSXUSERID.equals(name) && message.containsProperty(Message.HDR_VALIDATED_USER));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.jms.client;
|
package org.apache.activemq.artemis.jms.client;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.IllegalStateException;
|
import javax.jms.IllegalStateException;
|
||||||
|
@ -30,11 +36,6 @@ import javax.management.openmbean.CompositeDataSupport;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.ObjectInputStream;
|
import java.io.ObjectInputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Enumeration;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -561,12 +562,14 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getIntProperty(final String name) throws JMSException {
|
public int getIntProperty(final String name) throws JMSException {
|
||||||
|
try {
|
||||||
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
|
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
|
||||||
return message.getDeliveryCount();
|
return message.getDeliveryCount();
|
||||||
}
|
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
|
||||||
|
return message.getGroupSequence();
|
||||||
try {
|
} else {
|
||||||
return message.getIntProperty(name);
|
return message.getIntProperty(name);
|
||||||
|
}
|
||||||
} catch (ActiveMQPropertyConversionException e) {
|
} catch (ActiveMQPropertyConversionException e) {
|
||||||
throw new MessageFormatException(e.getMessage());
|
throw new MessageFormatException(e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -574,12 +577,14 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getLongProperty(final String name) throws JMSException {
|
public long getLongProperty(final String name) throws JMSException {
|
||||||
|
try {
|
||||||
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
|
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
|
||||||
return message.getDeliveryCount();
|
return message.getDeliveryCount();
|
||||||
}
|
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
|
||||||
|
return message.getGroupSequence();
|
||||||
try {
|
} else {
|
||||||
return message.getLongProperty(name);
|
return message.getLongProperty(name);
|
||||||
|
}
|
||||||
} catch (ActiveMQPropertyConversionException e) {
|
} catch (ActiveMQPropertyConversionException e) {
|
||||||
throw new MessageFormatException(e.getMessage());
|
throw new MessageFormatException(e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -611,7 +616,9 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (MessageUtil.JMSXGROUPID.equals(name)) {
|
if (MessageUtil.JMSXGROUPID.equals(name)) {
|
||||||
return message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID);
|
return Objects.toString(message.getGroupID(), null);
|
||||||
|
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
|
||||||
|
return Integer.toString(message.getGroupSequence());
|
||||||
} else if (MessageUtil.JMSXUSERID.equals(name)) {
|
} else if (MessageUtil.JMSXUSERID.equals(name)) {
|
||||||
return message.getValidatedUserID();
|
return message.getValidatedUserID();
|
||||||
} else {
|
} else {
|
||||||
|
@ -624,13 +631,20 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object getObjectProperty(final String name) throws JMSException {
|
public Object getObjectProperty(final String name) throws JMSException {
|
||||||
|
final Object val;
|
||||||
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
|
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
|
||||||
return String.valueOf(message.getDeliveryCount());
|
val = message.getDeliveryCount();
|
||||||
|
} else if (MessageUtil.JMSXGROUPID.equals(name)) {
|
||||||
|
val = message.getGroupID();
|
||||||
|
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
|
||||||
|
val = message.getGroupSequence();
|
||||||
|
} else if (MessageUtil.JMSXUSERID.equals(name)) {
|
||||||
|
val = message.getValidatedUserID();
|
||||||
|
} else {
|
||||||
|
val = message.getObjectProperty(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
Object val = message.getObjectProperty(name);
|
|
||||||
if (val instanceof SimpleString) {
|
if (val instanceof SimpleString) {
|
||||||
val = val.toString();
|
return val.toString();
|
||||||
}
|
}
|
||||||
return val;
|
return val;
|
||||||
}
|
}
|
||||||
|
@ -662,12 +676,18 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
@Override
|
@Override
|
||||||
public void setIntProperty(final String name, final int value) throws JMSException {
|
public void setIntProperty(final String name, final int value) throws JMSException {
|
||||||
checkProperty(name);
|
checkProperty(name);
|
||||||
|
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
message.putIntProperty(name, value);
|
message.putIntProperty(name, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setLongProperty(final String name, final long value) throws JMSException {
|
public void setLongProperty(final String name, final long value) throws JMSException {
|
||||||
checkProperty(name);
|
checkProperty(name);
|
||||||
|
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
message.putLongProperty(name, value);
|
message.putLongProperty(name, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -687,9 +707,11 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
public void setStringProperty(final String name, final String value) throws JMSException {
|
public void setStringProperty(final String name, final String value) throws JMSException {
|
||||||
checkProperty(name);
|
checkProperty(name);
|
||||||
|
|
||||||
if (handleCoreProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
|
if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
|
||||||
return;
|
return;
|
||||||
} else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
|
} else if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
|
||||||
|
return;
|
||||||
|
} else if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
message.putStringProperty(name, value);
|
message.putStringProperty(name, value);
|
||||||
|
@ -698,11 +720,13 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setObjectProperty(final String name, final Object value) throws JMSException {
|
public void setObjectProperty(final String name, final Object value) throws JMSException {
|
||||||
if (handleCoreProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
|
if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
|
||||||
if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
|
return;
|
||||||
|
}
|
||||||
|
if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -716,14 +740,14 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
checkProperty(name);
|
|
||||||
|
|
||||||
if (ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM.equals(name)) {
|
if (ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM.equals(name)) {
|
||||||
setInputStream((InputStream) value);
|
setInputStream((InputStream) value);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkProperty(name);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
message.putObjectProperty(name, value);
|
message.putObjectProperty(name, value);
|
||||||
} catch (ActiveMQPropertyConversionException e) {
|
} catch (ActiveMQPropertyConversionException e) {
|
||||||
|
@ -979,7 +1003,44 @@ public class ActiveMQMessage implements javax.jms.Message {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean handleCoreProperty(final String name,
|
private boolean handleCoreIntegerProperty(final String name,
|
||||||
|
final Object value,
|
||||||
|
String jmsPropertyName,
|
||||||
|
SimpleString corePropertyName) {
|
||||||
|
if (jmsPropertyName.equals(name)) {
|
||||||
|
return handleCoreIntegerProperty(name, getInteger(value), jmsPropertyName, corePropertyName);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean handleCoreIntegerProperty(final String name,
|
||||||
|
final int value,
|
||||||
|
String jmsPropertyName,
|
||||||
|
SimpleString corePropertyName) {
|
||||||
|
boolean result = false;
|
||||||
|
|
||||||
|
if (jmsPropertyName.equals(name)) {
|
||||||
|
message.putIntProperty(corePropertyName, value);
|
||||||
|
result = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getInteger(final Object value) {
|
||||||
|
Objects.requireNonNull(value);
|
||||||
|
final int integer;
|
||||||
|
if (value instanceof Integer) {
|
||||||
|
integer = (Integer) value;
|
||||||
|
} else if (value instanceof Number) {
|
||||||
|
integer = ((Number) value).intValue();
|
||||||
|
} else {
|
||||||
|
integer = Integer.parseInt(value.toString());
|
||||||
|
}
|
||||||
|
return integer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean handleCoreStringProperty(final String name,
|
||||||
final Object value,
|
final Object value,
|
||||||
String jmsPropertyName,
|
String jmsPropertyName,
|
||||||
SimpleString corePropertyName) {
|
SimpleString corePropertyName) {
|
||||||
|
|
|
@ -1099,6 +1099,17 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getGroupSequence() {
|
||||||
|
ensureMessageDataScanned();
|
||||||
|
|
||||||
|
if (properties != null && properties.getGroupSequence() != null) {
|
||||||
|
return properties.getGroupSequence().intValue();
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long getScheduledDeliveryTime() {
|
public Long getScheduledDeliveryTime() {
|
||||||
if (scheduledTime < 0) {
|
if (scheduledTime < 0) {
|
||||||
|
|
|
@ -83,7 +83,7 @@ public final class OpenWireMessageConverter {
|
||||||
private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
|
private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
|
||||||
private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
|
private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
|
||||||
private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
|
private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
|
||||||
private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE");
|
private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE;
|
||||||
private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
|
private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
|
||||||
private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
|
private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
|
||||||
private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
|
private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
|
||||||
|
@ -616,7 +616,7 @@ public final class OpenWireMessageConverter {
|
||||||
|
|
||||||
Integer groupSequence = (Integer) coreMessage.getObjectProperty(AMQ_MSG_GROUP_SEQUENCE);
|
Integer groupSequence = (Integer) coreMessage.getObjectProperty(AMQ_MSG_GROUP_SEQUENCE);
|
||||||
if (groupSequence == null) {
|
if (groupSequence == null) {
|
||||||
groupSequence = -1;
|
groupSequence = 0;
|
||||||
}
|
}
|
||||||
amqMsg.setGroupSequence(groupSequence);
|
amqMsg.setGroupSequence(groupSequence);
|
||||||
|
|
||||||
|
|
|
@ -1613,6 +1613,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
format = Message.Format.MESSAGE_FORMAT)
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
void problemAddingConfigReloadCallback(String propertyName, @Cause Exception e);
|
void problemAddingConfigReloadCallback(String propertyName, @Cause Exception e);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 222278, value = "Unable to extract GroupSequence from message", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void unableToExtractGroupSequence(@Cause Throwable e);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.ERROR)
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void initializationError(@Cause Throwable e);
|
void initializationError(@Cause Throwable e);
|
||||||
|
|
|
@ -2531,8 +2531,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
removeMessageReference(holder, ref);
|
removeMessageReference(holder, ref);
|
||||||
|
|
||||||
if (groupID != null && groupConsumer == null && redistributor == null) {
|
if (redistributor == null) {
|
||||||
groups.put(groupID, consumer);
|
handleMessageGroup(ref, consumer, groupConsumer, groupID);
|
||||||
}
|
}
|
||||||
|
|
||||||
handled++;
|
handled++;
|
||||||
|
@ -2635,6 +2635,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int extractGroupSequence(MessageReference ref) {
|
||||||
|
if (internalQueue) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
// But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
|
||||||
|
return ref.getMessage().getGroupSequence();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.unableToExtractGroupSequence(e);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void refRemoved(MessageReference ref) {
|
protected void refRemoved(MessageReference ref) {
|
||||||
queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
|
queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
|
||||||
pendingMetrics.decrementMetrics(ref);
|
pendingMetrics.decrementMetrics(ref);
|
||||||
|
@ -3110,8 +3124,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
HandleStatus status = handle(ref, consumer);
|
HandleStatus status = handle(ref, consumer);
|
||||||
|
|
||||||
if (status == HandleStatus.HANDLED) {
|
if (status == HandleStatus.HANDLED) {
|
||||||
if (groupID != null && groupConsumer == null && redistributor == null) {
|
|
||||||
groups.put(groupID, consumer);
|
if (redistributor == null) {
|
||||||
|
handleMessageGroup(ref, consumer, groupConsumer, groupID);
|
||||||
}
|
}
|
||||||
|
|
||||||
messagesAdded.incrementAndGet();
|
messagesAdded.incrementAndGet();
|
||||||
|
@ -3130,6 +3145,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
|
||||||
|
if (groupID != null) {
|
||||||
|
if (extractGroupSequence(ref) == -1) {
|
||||||
|
groups.remove(groupID);
|
||||||
|
}
|
||||||
|
if (groupConsumer == null) {
|
||||||
|
groups.put(groupID, consumer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void proceedDeliver(Consumer consumer, MessageReference reference) {
|
private void proceedDeliver(Consumer consumer, MessageReference reference) {
|
||||||
try {
|
try {
|
||||||
consumer.proceedDeliver(reference);
|
consumer.proceedDeliver(reference);
|
||||||
|
|
|
@ -81,6 +81,23 @@ java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialCont
|
||||||
connectionFactory.myConnectionFactory=tcp://localhost:61616?groupID=Group-0
|
connectionFactory.myConnectionFactory=tcp://localhost:61616?groupID=Group-0
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
#### Closing a Message Group
|
||||||
|
You generally don't need to close a message group, you just keep using it.
|
||||||
|
|
||||||
|
However if you really do want to close a group you can add a negative sequence number.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
```java
|
||||||
|
Mesasge message = session.createTextMessage("<foo>hey</foo>");
|
||||||
|
message.setStringProperty("JMSXGroupID", "Group-0");
|
||||||
|
message.setIntProperty("JMSXGroupSeq", -1);
|
||||||
|
...
|
||||||
|
producer.send(message);
|
||||||
|
```
|
||||||
|
|
||||||
|
This then closes the message group so if another message is sent in the future with the same message group ID it will be reassigned to a new consumer.
|
||||||
|
|
||||||
## Example
|
## Example
|
||||||
|
|
||||||
See the [Message Group Example](examples.md#message-group) which shows how
|
See the [Message Group Example](examples.md#message-group) which shows how
|
||||||
|
|
|
@ -21,12 +21,15 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -38,25 +41,160 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
|
||||||
private static final int ITERATIONS = 10;
|
private static final int ITERATIONS = 10;
|
||||||
private static final int MESSAGE_COUNT = 10;
|
private static final int MESSAGE_COUNT = 10;
|
||||||
private static final int MESSAGE_SIZE = 10 * 1024;
|
private static final int MESSAGE_SIZE = 10 * 1024;
|
||||||
private static final int RECEIVE_TIMEOUT = 3000;
|
private static final int RECEIVE_TIMEOUT = 1000;
|
||||||
private static final String JMSX_GROUP_ID = "JmsGroupsTest";
|
private static final String JMSX_GROUP_ID = "JmsGroupsTest";
|
||||||
|
|
||||||
|
private ConnectionSupplier AMQPConnection = () -> createConnection();
|
||||||
|
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
|
||||||
|
private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getConfiguredProtocols() {
|
||||||
|
return "AMQP,OPENWIRE,CORE";
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testGroupSeqIsNeverLost() throws Exception {
|
public void testMessageGroupsAMQPProducerAMQPConsumer() throws Exception {
|
||||||
|
testMessageGroups(AMQPConnection, AMQPConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageGroupsCoreProducerCoreConsumer() throws Exception {
|
||||||
|
testMessageGroups(CoreConnection, CoreConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageGroupsCoreProducerAMQPConsumer() throws Exception {
|
||||||
|
testMessageGroups(CoreConnection, AMQPConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageGroupsAMQPProducerCoreConsumer() throws Exception {
|
||||||
|
testMessageGroups(AMQPConnection, CoreConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageGroupsOpenWireProducerOpenWireConsumer() throws Exception {
|
||||||
|
testMessageGroups(OpenWireConnection, OpenWireConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageGroupsCoreProducerOpenWireConsumer() throws Exception {
|
||||||
|
testMessageGroups(CoreConnection, OpenWireConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageGroupsOpenWireProducerCoreConsumer() throws Exception {
|
||||||
|
testMessageGroups(OpenWireConnection, CoreConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageGroupsAMQPProducerOpenWireConsumer() throws Exception {
|
||||||
|
testMessageGroups(AMQPConnection, OpenWireConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageGroupsOpenWireProducerAMQPConsumer() throws Exception {
|
||||||
|
testMessageGroups(OpenWireConnection, AMQPConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testMessageGroups(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
|
||||||
|
testGroupSeqIsNeverLost(producerConnectionSupplier, consumerConnectionSupplier);
|
||||||
|
testGroupSeqCloseGroup(producerConnectionSupplier, consumerConnectionSupplier);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testGroupSeqCloseGroup(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
|
||||||
|
final QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(getQueueName()));
|
||||||
|
|
||||||
|
try (Connection producerConnection = producerConnectionSupplier.createConnection();
|
||||||
|
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = producerSession.createProducer(producerSession.createQueue(getQueueName()));
|
||||||
|
|
||||||
|
Connection consumerConnection = producerConnectionSupplier.createConnection();
|
||||||
|
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer1 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
|
||||||
|
MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
|
||||||
|
MessageConsumer consumer3 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()))) {
|
||||||
|
|
||||||
|
producerConnection.start();
|
||||||
|
consumerConnection.start();
|
||||||
|
|
||||||
|
//Ensure group and close group, ensuring group is closed
|
||||||
|
sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer1, consumer2, consumer3, queueBinding);
|
||||||
|
|
||||||
|
//Ensure round robin on group to consumer assignment (consumer2 now), then close group again
|
||||||
|
sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer2, consumer3, consumer1, queueBinding);
|
||||||
|
|
||||||
|
//Ensure round robin on group to consumer assignment (consumer3 now), then close group again
|
||||||
|
sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer3, consumer1, consumer1, queueBinding);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendAndConsumeAndThenCloseGroup(Session producerSession, MessageProducer producer, MessageConsumer expectedGroupConsumer, MessageConsumer consumerA, MessageConsumer consumerB, QueueBinding queueBinding) throws JMSException {
|
||||||
|
|
||||||
|
for (int j = 1; j <= MESSAGE_COUNT; j++) {
|
||||||
|
TextMessage message = producerSession.createTextMessage();
|
||||||
|
message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
|
||||||
|
message.setIntProperty("JMSXGroupSeq", j);
|
||||||
|
message.setText("Message" + j);
|
||||||
|
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Group should have been reset and next consumer chosen, as such all msgs should now go to the second consumer (round robin'd)
|
||||||
|
for (int j = 1; j <= MESSAGE_COUNT; j++) {
|
||||||
|
TextMessage tm = (TextMessage) expectedGroupConsumer.receive(RECEIVE_TIMEOUT);
|
||||||
|
assertNotNull(tm);
|
||||||
|
assertEquals(JMSX_GROUP_ID, tm.getStringProperty("JMSXGroupID"));
|
||||||
|
assertEquals(j, tm.getIntProperty("JMSXGroupSeq"));
|
||||||
|
assertEquals("Message" + j, tm.getText());
|
||||||
|
|
||||||
|
assertNull(consumerA.receiveNoWait());
|
||||||
|
assertNull(consumerB.receiveNoWait());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(1, queueBinding.getQueue().getGroupCount());
|
||||||
|
|
||||||
|
TextMessage message = producerSession.createTextMessage();
|
||||||
|
message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
|
||||||
|
//Close Group using -1 JMSXGroupSeq
|
||||||
|
message.setIntProperty("JMSXGroupSeq", -1);
|
||||||
|
message.setText("Message" + " group close");
|
||||||
|
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
TextMessage receivedGroupCloseMessage = (TextMessage) expectedGroupConsumer.receive(RECEIVE_TIMEOUT);
|
||||||
|
assertNotNull(receivedGroupCloseMessage);
|
||||||
|
assertEquals(JMSX_GROUP_ID, receivedGroupCloseMessage.getStringProperty("JMSXGroupID"));
|
||||||
|
assertEquals(-1, receivedGroupCloseMessage.getIntProperty("JMSXGroupSeq"));
|
||||||
|
assertEquals("group close should goto the existing group consumer", "Message" + " group close", receivedGroupCloseMessage.getText());
|
||||||
|
|
||||||
|
assertNull(consumerA.receiveNoWait());
|
||||||
|
assertNull(consumerB.receiveNoWait());
|
||||||
|
|
||||||
|
assertEquals(0, queueBinding.getQueue().getGroupCount());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testGroupSeqIsNeverLost(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
|
||||||
AtomicInteger sequenceCounter = new AtomicInteger();
|
AtomicInteger sequenceCounter = new AtomicInteger();
|
||||||
|
AtomicInteger consumedSequenceCounter = new AtomicInteger();
|
||||||
|
|
||||||
for (int i = 0; i < ITERATIONS; ++i) {
|
for (int i = 0; i < ITERATIONS; ++i) {
|
||||||
Connection connection = createConnection();
|
try (Connection producerConnection = producerConnectionSupplier.createConnection();
|
||||||
try {
|
Connection consumerConnection = producerConnectionSupplier.createConnection()) {
|
||||||
sendMessagesToBroker(connection, MESSAGE_COUNT, sequenceCounter);
|
sendMessagesToBroker(producerConnection, MESSAGE_COUNT, sequenceCounter);
|
||||||
readMessagesOnBroker(connection, MESSAGE_COUNT);
|
readMessagesOnBroker(consumerConnection, MESSAGE_COUNT, consumedSequenceCounter);
|
||||||
} finally {
|
|
||||||
connection.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void readMessagesOnBroker(Connection connection, int count) throws Exception {
|
protected void readMessagesOnBroker(Connection connection, int count, AtomicInteger sequence) throws Exception {
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
Queue queue = session.createQueue(getQueueName());
|
Queue queue = session.createQueue(getQueueName());
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
@ -66,9 +204,10 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
|
||||||
assertNotNull(message);
|
assertNotNull(message);
|
||||||
LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
|
LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
|
||||||
String gid = message.getStringProperty("JMSXGroupID");
|
String gid = message.getStringProperty("JMSXGroupID");
|
||||||
String seq = message.getStringProperty("JMSXGroupSeq");
|
int seq = message.getIntProperty("JMSXGroupSeq");
|
||||||
LOG.debug("Message assigned JMSXGroupID := {}", gid);
|
LOG.debug("Message assigned JMSXGroupID := {}", gid);
|
||||||
LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
|
LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
|
||||||
|
assertEquals("Sequence order should match", sequence.incrementAndGet(), seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
session.close();
|
session.close();
|
||||||
|
|
Loading…
Reference in New Issue