ARTEMIS-2142 Refactor of Patchfix ServerJMSMessage

Refactor ServerJMSMessage so it correctly transposes all JMSX headers.
Push common JMSX mappings for JMS to Message Interface mappings into MessageUtil to avoid duplication in ActiveMQMessage and ServerJMSMessage
This commit is contained in:
Michael André Pearce 2018-11-09 15:27:33 +00:00 committed by Clebert Suconic
parent a20d7a0339
commit 5ac87609e7
8 changed files with 161 additions and 139 deletions

View File

@ -250,10 +250,22 @@ public interface Message {
return null;
}
default Message setGroupID(SimpleString groupID) {
return this;
}
default Message setGroupID(String groupID) {
return this;
}
default int getGroupSequence() {
return 0;
}
default Message setGroupSequence(int sequence) {
return this;
}
SimpleString getReplyTo();
Message setReplyTo(SimpleString address);

View File

@ -289,11 +289,26 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
}
@Override
public CoreMessage setGroupID(SimpleString groupId) {
return this.putStringProperty(Message.HDR_GROUP_ID, groupId);
}
@Override
public CoreMessage setGroupID(String groupId) {
return this.setGroupID(SimpleString.toSimpleString(groupId, coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool()));
}
@Override
public int getGroupSequence() {
return containsProperty(Message.HDR_GROUP_SEQUENCE) ? getIntProperty(Message.HDR_GROUP_SEQUENCE) : 0;
}
@Override
public CoreMessage setGroupSequence(int sequence) {
return this.putIntProperty(Message.HDR_GROUP_SEQUENCE, sequence);
}
/**
* @param sendBuffer
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.reader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
@ -175,4 +176,103 @@ public class MessageUtil {
(MessageUtil.JMSXGROUPSEQ.equals(name) && message.containsProperty(Message.HDR_GROUP_SEQUENCE)) ||
(MessageUtil.JMSXUSERID.equals(name) && message.containsProperty(Message.HDR_VALIDATED_USER));
}
public static String getStringProperty(final Message message, final String name) {
if (MessageUtil.JMSXGROUPID.equals(name)) {
return Objects.toString(message.getGroupID(), null);
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return Integer.toString(message.getGroupSequence());
} else if (MessageUtil.JMSXUSERID.equals(name)) {
return message.getValidatedUserID();
} else {
return message.getStringProperty(name);
}
}
public static Object getObjectProperty(final Message message, final String name) {
final Object val;
if (MessageUtil.JMSXGROUPID.equals(name)) {
val = message.getGroupID();
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
val = message.getGroupSequence();
} else if (MessageUtil.JMSXUSERID.equals(name)) {
val = message.getValidatedUserID();
} else {
val = message.getObjectProperty(name);
}
if (val instanceof SimpleString) {
return val.toString();
}
return val;
}
public static long getLongProperty(final Message message, final String name) {
if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return message.getGroupSequence();
} else {
return message.getLongProperty(name);
}
}
public static int getIntProperty(final Message message, final String name) {
if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return message.getGroupSequence();
} else {
return message.getIntProperty(name);
}
}
public static void setIntProperty(final Message message, final String name, final int value) {
if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
message.setGroupSequence(value);
} else {
message.putIntProperty(name, value);
}
}
public static void setLongProperty(final Message message, final String name, final long value) {
if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
message.setGroupSequence((int) value);
} else {
message.putLongProperty(name, value);
}
}
public static void setStringProperty(final Message message, final String name, final String value) {
if (MessageUtil.JMSXGROUPID.equals(name)) {
message.setGroupID(value);
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
message.setGroupSequence(getInteger(value));
} else if (MessageUtil.JMSXUSERID.equals(name)) {
message.setValidatedUserID(value);
} else {
message.putStringProperty(name, value);
}
}
public static void setObjectProperty(final Message message, final String name, final Object value) {
if (MessageUtil.JMSXGROUPID.equals(name)) {
message.setGroupID(value == null ? null : value.toString());
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
message.setGroupSequence(getInteger(value));
} else if (MessageUtil.JMSXUSERID.equals(name)) {
message.setValidatedUserID(value == null ? null : value.toString());
} else {
message.putObjectProperty(name, value);
}
}
private static int getInteger(final Object value) {
Objects.requireNonNull(value);
final int integer;
if (value instanceof Integer) {
integer = (Integer) value;
} else if (value instanceof Number) {
integer = ((Number) value).intValue();
} else {
integer = Integer.parseInt(value.toString());
}
return integer;
}
}

View File

@ -21,7 +21,6 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@ -565,10 +564,8 @@ public class ActiveMQMessage implements javax.jms.Message {
try {
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
return message.getDeliveryCount();
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return message.getGroupSequence();
} else {
return message.getIntProperty(name);
return MessageUtil.getIntProperty(message, name);
}
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
@ -580,10 +577,8 @@ public class ActiveMQMessage implements javax.jms.Message {
try {
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
return message.getDeliveryCount();
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return message.getGroupSequence();
} else {
return message.getLongProperty(name);
return MessageUtil.getLongProperty(message, name);
}
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
@ -613,17 +608,8 @@ public class ActiveMQMessage implements javax.jms.Message {
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
return String.valueOf(message.getDeliveryCount());
}
try {
if (MessageUtil.JMSXGROUPID.equals(name)) {
return Objects.toString(message.getGroupID(), null);
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return Integer.toString(message.getGroupSequence());
} else if (MessageUtil.JMSXUSERID.equals(name)) {
return message.getValidatedUserID();
} else {
return message.getStringProperty(name);
}
return MessageUtil.getStringProperty(message, name);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@ -631,22 +617,10 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public Object getObjectProperty(final String name) throws JMSException {
final Object val;
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
val = message.getDeliveryCount();
} else if (MessageUtil.JMSXGROUPID.equals(name)) {
val = message.getGroupID();
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
val = message.getGroupSequence();
} else if (MessageUtil.JMSXUSERID.equals(name)) {
val = message.getValidatedUserID();
} else {
val = message.getObjectProperty(name);
return message.getDeliveryCount();
}
if (val instanceof SimpleString) {
return val.toString();
}
return val;
return MessageUtil.getObjectProperty(message, name);
}
@Override
@ -676,19 +650,13 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public void setIntProperty(final String name, final int value) throws JMSException {
checkProperty(name);
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
return;
}
message.putIntProperty(name, value);
MessageUtil.setIntProperty(message, name, value);
}
@Override
public void setLongProperty(final String name, final long value) throws JMSException {
checkProperty(name);
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
return;
}
message.putLongProperty(name, value);
MessageUtil.setLongProperty(message, name, value);
}
@Override
@ -706,29 +674,12 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public void setStringProperty(final String name, final String value) throws JMSException {
checkProperty(name);
if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
return;
} else if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
return;
} else if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
return;
} else {
message.putStringProperty(name, value);
}
MessageUtil.setStringProperty(message, name, value);
}
@Override
public void setObjectProperty(final String name, final Object value) throws JMSException {
if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
return;
}
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
return;
}
if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
return;
}
if (ActiveMQJMSConstants.JMS_ACTIVEMQ_OUTPUT_STREAM.equals(name)) {
setOutputStream((OutputStream) value);
@ -749,7 +700,7 @@ public class ActiveMQMessage implements javax.jms.Message {
checkProperty(name);
try {
message.putObjectProperty(name, value);
MessageUtil.setObjectProperty(message, name, value);
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
@ -1003,57 +954,5 @@ public class ActiveMQMessage implements javax.jms.Message {
}
}
private boolean handleCoreIntegerProperty(final String name,
final Object value,
String jmsPropertyName,
SimpleString corePropertyName) {
if (jmsPropertyName.equals(name)) {
return handleCoreIntegerProperty(name, getInteger(value), jmsPropertyName, corePropertyName);
}
return false;
}
private boolean handleCoreIntegerProperty(final String name,
final int value,
String jmsPropertyName,
SimpleString corePropertyName) {
boolean result = false;
if (jmsPropertyName.equals(name)) {
message.putIntProperty(corePropertyName, value);
result = true;
}
return result;
}
private static int getInteger(final Object value) {
Objects.requireNonNull(value);
final int integer;
if (value instanceof Integer) {
integer = (Integer) value;
} else if (value instanceof Number) {
integer = ((Number) value).intValue();
} else {
integer = Integer.parseInt(value.toString());
}
return integer;
}
private boolean handleCoreStringProperty(final String name,
final Object value,
String jmsPropertyName,
SimpleString corePropertyName) {
boolean result = false;
if (jmsPropertyName.equals(name)) {
message.putStringProperty(corePropertyName, value == null ? null : value.toString());
result = true;
}
return result;
}
// Inner classes -------------------------------------------------
}

View File

@ -929,7 +929,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) {
return null;
return this;
}
@Override
@ -1219,6 +1219,8 @@ public class AMQPMessage extends RefCountMessage {
return getConnectionID();
} else if (key.equals(MessageUtil.JMSXGROUPID)) {
return getGroupID();
} else if (key.equals(MessageUtil.JMSXGROUPSEQ)) {
return getGroupSequence();
} else if (key.equals(MessageUtil.JMSXUSERID)) {
return getAMQPUserID();
} else if (key.equals(MessageUtil.CORRELATIONID_HEADER_NAME.toString())) {

View File

@ -209,15 +209,17 @@ public class CoreAmqpConverter {
if (key.startsWith("JMSX")) {
if (key.equals("JMSXUserID")) {
String value = message.getStringProperty(key);
properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
if (value != null) {
properties.setUserId(Binary.create(StandardCharsets.UTF_8.encode(value)));
}
continue;
} else if (key.equals("JMSXGroupID")) {
String value = message.getStringProperty(key);
properties.setGroupId(value);
continue;
} else if (key.equals("JMSXGroupSeq")) {
UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
properties.setGroupSequence(value);
int value = message.getIntProperty(key);
properties.setGroupSequence(UnsignedInteger.valueOf(value));
continue;
}
} else if (key.startsWith(JMS_AMQP_PREFIX)) {
@ -281,10 +283,14 @@ public class CoreAmqpConverter {
footerMap.put(name, message.getObjectProperty(key));
continue;
}
} else if (key.equals("_AMQ_GROUP_ID")) {
} else if (key.equals(Message.HDR_GROUP_ID)) {
String value = message.getStringProperty(key);
properties.setGroupId(value);
continue;
} else if (key.equals(Message.HDR_GROUP_SEQUENCE)) {
int value = message.getIntProperty(key);
properties.setGroupSequence(UnsignedInteger.valueOf(value));
continue;
} else if (key.equals(NATIVE_MESSAGE_ID)) {
// skip..internal use only
continue;

View File

@ -266,16 +266,12 @@ public class ServerJMSMessage implements Message {
@Override
public final int getIntProperty(String name) throws JMSException {
if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return message.getGroupSequence();
} else {
return message.getIntProperty(name);
}
return MessageUtil.getIntProperty(message, name);
}
@Override
public final long getLongProperty(String name) throws JMSException {
return message.getLongProperty(name);
return MessageUtil.getLongProperty(message, name);
}
@Override
@ -290,16 +286,12 @@ public class ServerJMSMessage implements Message {
@Override
public final String getStringProperty(String name) throws JMSException {
return message.getStringProperty(name);
return MessageUtil.getStringProperty(message, name);
}
@Override
public final Object getObjectProperty(String name) throws JMSException {
Object val = message.getObjectProperty(name);
if (val instanceof SimpleString) {
val = ((SimpleString) val).toString();
}
return val;
return MessageUtil.getObjectProperty(message, name);
}
@Override
@ -324,12 +316,12 @@ public class ServerJMSMessage implements Message {
@Override
public final void setIntProperty(String name, int value) throws JMSException {
message.putIntProperty(name, value);
MessageUtil.setIntProperty(message, name, value);
}
@Override
public final void setLongProperty(String name, long value) throws JMSException {
message.putLongProperty(name, value);
MessageUtil.setLongProperty(message, name, value);
}
@Override
@ -344,12 +336,12 @@ public class ServerJMSMessage implements Message {
@Override
public final void setStringProperty(String name, String value) throws JMSException {
message.putStringProperty(name, value);
MessageUtil.setStringProperty(message, name, value);
}
@Override
public final void setObjectProperty(String name, Object value) throws JMSException {
message.putObjectProperty(name, value);
MessageUtil.setObjectProperty(message, name, value);
}
@Override

View File

@ -173,9 +173,9 @@ public final class OpenWireMessageConverter {
}
final String groupId = messageSend.getGroupID();
if (groupId != null) {
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, coreMessageObjectPools.getGroupIdStringSimpleStringPool().getOrCreate(groupId));
coreMessage.setGroupID(groupId);
}
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
coreMessage.setGroupSequence(messageSend.getGroupSequence());
final MessageId messageId = messageSend.getMessageId();
@ -614,11 +614,7 @@ public final class OpenWireMessageConverter {
amqMsg.setGroupID(groupId);
}
Integer groupSequence = (Integer) coreMessage.getObjectProperty(AMQ_MSG_GROUP_SEQUENCE);
if (groupSequence == null) {
groupSequence = 0;
}
amqMsg.setGroupSequence(groupSequence);
amqMsg.setGroupSequence(coreMessage.getGroupSequence());
final MessageId mid;
final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);