|
|
|
@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.message.impl;
|
|
|
|
|
|
|
|
|
|
import java.io.InputStream;
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
|
|
import java.util.LinkedList;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.function.Predicate;
|
|
|
|
|
import java.util.zip.DataFormatException;
|
|
|
|
|
import java.util.zip.Inflater;
|
|
|
|
|
|
|
|
|
@ -52,6 +52,11 @@ import org.jboss.logging.Logger;
|
|
|
|
|
* consumers */
|
|
|
|
|
public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
// We use properties to establish routing context on clustering.
|
|
|
|
|
// However if the client resends the message after receiving, it needs to be removed
|
|
|
|
|
private static final Predicate<SimpleString> INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER =
|
|
|
|
|
name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) ||
|
|
|
|
|
(name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS));
|
|
|
|
|
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
|
|
|
|
|
|
|
|
|
|
private volatile int memoryEstimate = -1;
|
|
|
|
@ -122,25 +127,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void cleanupInternalProperties() {
|
|
|
|
|
if (properties.hasInternalProperties()) {
|
|
|
|
|
LinkedList<SimpleString> valuesToRemove = null;
|
|
|
|
|
|
|
|
|
|
for (SimpleString name : getPropertyNames()) {
|
|
|
|
|
// We use properties to establish routing context on clustering.
|
|
|
|
|
// However if the client resends the message after receiving, it needs to be removed
|
|
|
|
|
if ((name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) || (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS))) {
|
|
|
|
|
if (valuesToRemove == null) {
|
|
|
|
|
valuesToRemove = new LinkedList<>();
|
|
|
|
|
}
|
|
|
|
|
valuesToRemove.add(name);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (valuesToRemove != null) {
|
|
|
|
|
for (SimpleString removal : valuesToRemove) {
|
|
|
|
|
this.removeProperty(removal);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
final TypedProperties properties = this.properties;
|
|
|
|
|
if (properties != null && properties.removeProperty(INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER)) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -187,8 +176,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
public CoreMessage setReplyTo(SimpleString address) {
|
|
|
|
|
|
|
|
|
|
if (address == null) {
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.removeProperty(MessageUtil.REPLYTO_HEADER_NAME);
|
|
|
|
|
getProperties().removeProperty(MessageUtil.REPLYTO_HEADER_NAME);
|
|
|
|
|
} else {
|
|
|
|
|
putStringProperty(MessageUtil.REPLYTO_HEADER_NAME, address);
|
|
|
|
|
}
|
|
|
|
@ -343,8 +331,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Long getScheduledDeliveryTime() {
|
|
|
|
|
checkProperties();
|
|
|
|
|
Object property = getObjectProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
|
|
|
|
|
Object property = getProperties().getProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
|
|
|
|
|
|
|
|
|
|
if (property != null && property instanceof Number) {
|
|
|
|
|
return ((Number) property).longValue();
|
|
|
|
@ -355,9 +342,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage setScheduledDeliveryTime(Long time) {
|
|
|
|
|
checkProperties();
|
|
|
|
|
if (time == null || time == 0) {
|
|
|
|
|
removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
|
|
|
|
|
getProperties().removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
|
|
|
|
|
} else {
|
|
|
|
|
putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
|
|
|
|
|
}
|
|
|
|
@ -375,7 +361,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
@Override
|
|
|
|
|
public ActiveMQBuffer getBodyBuffer() {
|
|
|
|
|
// if using the writable buffer, we must parse properties
|
|
|
|
|
checkProperties();
|
|
|
|
|
getProperties();
|
|
|
|
|
|
|
|
|
|
internalWritableBuffer();
|
|
|
|
|
|
|
|
|
@ -405,13 +391,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
return endOfBodyPosition;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public TypedProperties getTypedProperties() {
|
|
|
|
|
return checkProperties();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void messageChanged() {
|
|
|
|
|
validBuffer = false;
|
|
|
|
|
//a volatile store is a costly operation: better to check if is necessary
|
|
|
|
|
if (validBuffer) {
|
|
|
|
|
validBuffer = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected CoreMessage(CoreMessage other) {
|
|
|
|
@ -465,13 +450,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
priority = msg.getPriority();
|
|
|
|
|
|
|
|
|
|
if (msg instanceof CoreMessage) {
|
|
|
|
|
properties = ((CoreMessage) msg).getTypedProperties();
|
|
|
|
|
properties = ((CoreMessage) msg).getProperties();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Message copy() {
|
|
|
|
|
checkProperties();
|
|
|
|
|
getProperties();
|
|
|
|
|
checkEncode();
|
|
|
|
|
return new CoreMessage(this);
|
|
|
|
|
}
|
|
|
|
@ -525,7 +510,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage setValidatedUserID(String validatedUserID) {
|
|
|
|
|
putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool()));
|
|
|
|
|
putStringProperty(Message.HDR_VALIDATED_USER, value(validatedUserID));
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -579,38 +564,47 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
/**
|
|
|
|
|
* I am keeping this synchronized as the decode of the Properties is lazy
|
|
|
|
|
*/
|
|
|
|
|
protected TypedProperties checkProperties() {
|
|
|
|
|
public final TypedProperties getProperties() {
|
|
|
|
|
try {
|
|
|
|
|
TypedProperties properties = this.properties;
|
|
|
|
|
if (properties == null) {
|
|
|
|
|
synchronized (this) {
|
|
|
|
|
if (properties == null) {
|
|
|
|
|
TypedProperties properties = new TypedProperties();
|
|
|
|
|
if (buffer != null && propertiesLocation >= 0) {
|
|
|
|
|
final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
|
|
|
|
|
properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
|
|
|
|
|
}
|
|
|
|
|
this.properties = properties;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
properties = getOrInitializeTypedProperties();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return this.properties;
|
|
|
|
|
return properties;
|
|
|
|
|
} catch (Throwable e) {
|
|
|
|
|
// This is not an expected error, hence no specific logger created
|
|
|
|
|
logger.warn("Could not decode properties for CoreMessage[messageID=" + messageID + ",durable=" + durable + ",userID=" + userID + ",priority=" + priority +
|
|
|
|
|
", timestamp=" + timestamp + ",expiration=" + expiration + ",address=" + address + ", propertiesLocation=" + propertiesLocation, e);
|
|
|
|
|
if (buffer != null) {
|
|
|
|
|
ByteBuf duplicatebuffer = buffer.duplicate();
|
|
|
|
|
duplicatebuffer.readerIndex(0);
|
|
|
|
|
logger.warn("Failed message has messageID=" + messageID + " and the following buffer:\n" + ByteBufUtil.prettyHexDump(duplicatebuffer));
|
|
|
|
|
} else {
|
|
|
|
|
logger.warn("Failed message has messageID=" + messageID + " and the buffer was null");
|
|
|
|
|
}
|
|
|
|
|
throw new RuntimeException(e.getMessage(), e);
|
|
|
|
|
|
|
|
|
|
throw onCheckPropertiesError(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private synchronized TypedProperties getOrInitializeTypedProperties() {
|
|
|
|
|
TypedProperties properties = this.properties;
|
|
|
|
|
if (properties == null) {
|
|
|
|
|
properties = new TypedProperties();
|
|
|
|
|
if (buffer != null && propertiesLocation >= 0) {
|
|
|
|
|
final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
|
|
|
|
|
properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
|
|
|
|
|
}
|
|
|
|
|
this.properties = properties;
|
|
|
|
|
}
|
|
|
|
|
return properties;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RuntimeException onCheckPropertiesError(Throwable e) {
|
|
|
|
|
// This is not an expected error, hence no specific logger created
|
|
|
|
|
logger.warn("Could not decode properties for CoreMessage[messageID=" + messageID + ",durable=" + durable + ",userID=" + userID + ",priority=" + priority +
|
|
|
|
|
", timestamp=" + timestamp + ",expiration=" + expiration + ",address=" + address + ", propertiesLocation=" + propertiesLocation, e);
|
|
|
|
|
final ByteBuf buffer = this.buffer;
|
|
|
|
|
if (buffer != null) {
|
|
|
|
|
//risky: a racy modification to buffer indexes could break this duplicate operation
|
|
|
|
|
final ByteBuf duplicatebuffer = buffer.duplicate();
|
|
|
|
|
duplicatebuffer.readerIndex(0);
|
|
|
|
|
logger.warn("Failed message has messageID=" + messageID + " and the following buffer:\n" + ByteBufUtil.prettyHexDump(duplicatebuffer));
|
|
|
|
|
} else {
|
|
|
|
|
logger.warn("Failed message has messageID=" + messageID + " and the buffer was null");
|
|
|
|
|
}
|
|
|
|
|
return new RuntimeException(e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int getMemoryEstimate() {
|
|
|
|
|
if (memoryEstimate == -1) {
|
|
|
|
@ -692,7 +686,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
public synchronized CoreMessage encode() {
|
|
|
|
|
|
|
|
|
|
checkProperties();
|
|
|
|
|
getProperties();
|
|
|
|
|
|
|
|
|
|
if (writableBuffer != null) {
|
|
|
|
|
// The message encode takes into consideration the PacketImpl which is not part of this encoding
|
|
|
|
@ -716,7 +710,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void encodeHeadersAndProperties(final ByteBuf buffer) {
|
|
|
|
|
checkProperties();
|
|
|
|
|
final TypedProperties properties = getProperties();
|
|
|
|
|
messageIDPosition = buffer.writerIndex();
|
|
|
|
|
buffer.writeLong(messageID);
|
|
|
|
|
SimpleString.writeNullableSimpleString(buffer, address);
|
|
|
|
@ -745,7 +739,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
DataConstants./* Expiration */SIZE_LONG +
|
|
|
|
|
DataConstants./* Timestamp */SIZE_LONG +
|
|
|
|
|
DataConstants./* Priority */SIZE_BYTE +
|
|
|
|
|
/* PropertySize and Properties */checkProperties().getEncodeSize();
|
|
|
|
|
/* PropertySize and Properties */getProperties().getEncodeSize();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -819,294 +813,230 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putBooleanProperty(final String key, final boolean value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putBooleanProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putBooleanProperty(final SimpleString key, final boolean value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putBooleanProperty(key, value);
|
|
|
|
|
getProperties().putBooleanProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getBooleanProperty(key);
|
|
|
|
|
return getProperties().getBooleanProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return getBooleanProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putByteProperty(final SimpleString key, final byte value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putByteProperty(key, value);
|
|
|
|
|
getProperties().putByteProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putByteProperty(final String key, final byte value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
|
|
|
|
|
return this;
|
|
|
|
|
return putByteProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getByteProperty(key);
|
|
|
|
|
return getProperties().getByteProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
return getByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return getByteProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putBytesProperty(final SimpleString key, final byte[] value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putBytesProperty(key, value);
|
|
|
|
|
|
|
|
|
|
getProperties().putBytesProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putBytesProperty(final String key, final byte[] value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putBytesProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getBytesProperty(key);
|
|
|
|
|
return getProperties().getBytesProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
return getBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return getBytesProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putCharProperty(SimpleString key, char value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putCharProperty(key, value);
|
|
|
|
|
getProperties().putCharProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putCharProperty(String key, char value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putCharProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putCharProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putShortProperty(final SimpleString key, final short value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putShortProperty(key, value);
|
|
|
|
|
getProperties().putShortProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putShortProperty(final String key, final short value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putShortProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putIntProperty(final SimpleString key, final int value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putIntProperty(key, value);
|
|
|
|
|
getProperties().putIntProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putIntProperty(final String key, final int value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putIntProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putIntProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getIntProperty(key);
|
|
|
|
|
return getProperties().getIntProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
return getIntProperty(SimpleString.toSimpleString(key));
|
|
|
|
|
return getIntProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putLongProperty(final SimpleString key, final long value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putLongProperty(key, value);
|
|
|
|
|
getProperties().putLongProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putLongProperty(final String key, final long value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putLongProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putLongProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getLongProperty(key);
|
|
|
|
|
return getProperties().getLongProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return getLongProperty(SimpleString.toSimpleString(key));
|
|
|
|
|
return getLongProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putFloatProperty(final SimpleString key, final float value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putFloatProperty(key, value);
|
|
|
|
|
getProperties().putFloatProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putFloatProperty(final String key, final float value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putFloatProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putDoubleProperty(final SimpleString key, final double value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putDoubleProperty(key, value);
|
|
|
|
|
getProperties().putDoubleProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putDoubleProperty(final String key, final double value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putDoubleProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putDoubleProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getDoubleProperty(key);
|
|
|
|
|
return getProperties().getDoubleProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return getDoubleProperty(SimpleString.toSimpleString(key));
|
|
|
|
|
return getDoubleProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putStringProperty(final SimpleString key, final SimpleString value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putSimpleStringProperty(key, value);
|
|
|
|
|
getProperties().putSimpleStringProperty(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putStringProperty(final SimpleString key, final String value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putSimpleStringProperty(key, SimpleString.toSimpleString(value, getPropertyValuesPool()));
|
|
|
|
|
return this;
|
|
|
|
|
return putStringProperty(key, value(value));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putStringProperty(final String key, final String value) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
properties.putSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), SimpleString.toSimpleString(value, getPropertyValuesPool()));
|
|
|
|
|
return this;
|
|
|
|
|
return putStringProperty(key(key), value(value));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putObjectProperty(final SimpleString key,
|
|
|
|
|
final Object value) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
messageChanged();
|
|
|
|
|
TypedProperties.setObjectProperty(key, value, properties);
|
|
|
|
|
TypedProperties.setObjectProperty(key, value, getProperties());
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Object getObjectProperty(final String key) {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return getObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return getObjectProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Object getObjectProperty(final SimpleString key) {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getProperty(key);
|
|
|
|
|
return getProperties().getProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
|
|
|
|
|
messageChanged();
|
|
|
|
|
putObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
|
|
|
|
|
return this;
|
|
|
|
|
return putObjectProperty(key(key), value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getShortProperty(key);
|
|
|
|
|
return getProperties().getShortProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return getShortProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getFloatProperty(key);
|
|
|
|
|
return getProperties().getFloatProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return getFloatProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -1122,25 +1052,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
return getStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return getStringProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getSimpleStringProperty(key);
|
|
|
|
|
return getProperties().getSimpleStringProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return getSimpleStringProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Object removeProperty(final SimpleString key) {
|
|
|
|
|
checkProperties();
|
|
|
|
|
Object oldValue = properties.removeProperty(key);
|
|
|
|
|
Object oldValue = getProperties().removeProperty(key);
|
|
|
|
|
if (oldValue != null) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
}
|
|
|
|
@ -1149,31 +1076,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Object removeProperty(final String key) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
checkProperties();
|
|
|
|
|
Object oldValue = properties.removeProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
if (oldValue != null) {
|
|
|
|
|
messageChanged();
|
|
|
|
|
}
|
|
|
|
|
return oldValue;
|
|
|
|
|
return removeProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean containsProperty(final SimpleString key) {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.containsProperty(key);
|
|
|
|
|
return getProperties().containsProperty(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean containsProperty(final String key) {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.containsProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
|
|
|
|
|
return containsProperty(key(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Set<SimpleString> getPropertyNames() {
|
|
|
|
|
checkProperties();
|
|
|
|
|
return properties.getPropertyNames();
|
|
|
|
|
return getProperties().getPropertyNames();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -1244,7 +1162,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
@Override
|
|
|
|
|
public String toString() {
|
|
|
|
|
try {
|
|
|
|
|
checkProperties();
|
|
|
|
|
final TypedProperties properties = getProperties();
|
|
|
|
|
return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
|
|
|
|
|
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
|
|
|
|
|
", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this);
|
|
|
|
@ -1262,6 +1180,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private SimpleString key(String key) {
|
|
|
|
|
return SimpleString.toSimpleString(key, getPropertyKeysPool());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private SimpleString value(String value) {
|
|
|
|
|
return SimpleString.toSimpleString(value, getPropertyValuesPool());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
|
|
|
|
|
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
|
|
|
|
|
}
|
|
|
|
|