This commit is contained in:
Michael Andre Pearce 2019-01-24 08:31:11 +00:00
commit 7ea7b85f1a
10 changed files with 194 additions and 222 deletions

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.utils.collections; package org.apache.activemq.artemis.utils.collections;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -27,6 +26,9 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@ -55,15 +57,19 @@ import static org.apache.activemq.artemis.utils.DataConstants.STRING;
*/ */
public class TypedProperties { public class TypedProperties {
private static final SimpleString AMQ_PROPNAME = new SimpleString("_AMQ_");
private Map<SimpleString, PropertyValue> properties; private Map<SimpleString, PropertyValue> properties;
private int size; private int size;
private final Predicate<SimpleString> internalPropertyPredicate;
private boolean internalProperties; private boolean internalProperties;
public TypedProperties() { public TypedProperties() {
this.internalPropertyPredicate = null;
}
public TypedProperties(Predicate<SimpleString> internalPropertyPredicate) {
this.internalPropertyPredicate = internalPropertyPredicate;
} }
/** /**
@ -85,13 +91,11 @@ public class TypedProperties {
synchronized (other) { synchronized (other) {
properties = other.properties == null ? null : new HashMap<>(other.properties); properties = other.properties == null ? null : new HashMap<>(other.properties);
size = other.size; size = other.size;
internalPropertyPredicate = other.internalPropertyPredicate;
internalProperties = other.internalProperties;
} }
} }
public synchronized boolean hasInternalProperties() {
return internalProperties;
}
public void putBooleanProperty(final SimpleString key, final boolean value) { public void putBooleanProperty(final SimpleString key, final boolean value) {
doPutValue(key, BooleanValue.of(value)); doPutValue(key, BooleanValue.of(value));
} }
@ -318,6 +322,38 @@ public class TypedProperties {
} }
} }
public synchronized boolean clearInternalProperties() {
return internalProperties && removeInternalProperties();
}
private synchronized boolean removeInternalProperties() {
if (internalPropertyPredicate == null) {
return false;
}
if (properties == null) {
return false;
}
if (properties.isEmpty()) {
return false;
}
int removedBytes = 0;
boolean removed = false;
final Iterator<Entry<SimpleString, PropertyValue>> keyNameIterator = properties.entrySet().iterator();
while (keyNameIterator.hasNext()) {
final Entry<SimpleString, PropertyValue> entry = keyNameIterator.next();
final SimpleString propertyName = entry.getKey();
if (internalPropertyPredicate.test(propertyName)) {
final PropertyValue propertyValue = entry.getValue();
removedBytes += propertyName.sizeof() + propertyValue.encodeSize();
keyNameIterator.remove();
removed = true;
}
}
internalProperties = false;
size -= removedBytes;
return removed;
}
public synchronized void forEachKey(Consumer<SimpleString> action) { public synchronized void forEachKey(Consumer<SimpleString> action) {
if (properties != null) { if (properties != null) {
properties.keySet().forEach(action::accept); properties.keySet().forEach(action::accept);
@ -511,7 +547,7 @@ public class TypedProperties {
// Private ------------------------------------------------------------------------------------ // Private ------------------------------------------------------------------------------------
private synchronized void doPutValue(final SimpleString key, final PropertyValue value) { private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
if (key.startsWith(AMQ_PROPNAME)) { if (!internalProperties && internalPropertyPredicate != null && internalPropertyPredicate.test(key)) {
internalProperties = true; internalProperties = true;
} }
@ -541,7 +577,7 @@ public class TypedProperties {
} }
} }
private synchronized Object doGetProperty(final Object key) { private synchronized Object doGetProperty(final SimpleString key) {
if (properties == null) { if (properties == null) {
return null; return null;
} }

View File

@ -226,6 +226,30 @@ public class TypedPropertiesTest {
TypedPropertiesTest.assertEqualsTypeProperties(emptyProps, decodedProps); TypedPropertiesTest.assertEqualsTypeProperties(emptyProps, decodedProps);
} }
private static final SimpleString PROP_NAME = SimpleString.toSimpleString("TEST_PROP");
@Test
public void testCannotClearInternalPropertiesIfEmpty() {
TypedProperties properties = new TypedProperties();
Assert.assertFalse(properties.clearInternalProperties());
}
@Test
public void testClearInternalPropertiesIfAny() {
TypedProperties properties = new TypedProperties(PROP_NAME::equals);
properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean());
Assert.assertTrue(properties.clearInternalProperties());
Assert.assertFalse(properties.containsProperty(PROP_NAME));
}
@Test
public void testCannotClearInternalPropertiesTwiceIfAny() {
TypedProperties properties = new TypedProperties(PROP_NAME::equals);
properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean());
Assert.assertTrue(properties.clearInternalProperties());
Assert.assertFalse(properties.clearInternalProperties());
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
props = new TypedProperties(); props = new TypedProperties();

View File

@ -176,7 +176,7 @@ public interface Message {
/** The message will contain another message persisted through {@link org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil}*/ /** The message will contain another message persisted through {@link org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil}*/
byte EMBEDDED_TYPE = 7; byte EMBEDDED_TYPE = 7;
default void cleanupInternalProperties() { default void clearInternalProperties() {
// only on core // only on core
} }

View File

@ -31,7 +31,6 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
/** /**
* A ClientMessageImpl * A ClientMessageImpl
@ -115,11 +114,6 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
this(type, durable, expiration, timestamp, priority, initialMessageBufferSize, null); this(type, durable, expiration, timestamp, priority, initialMessageBufferSize, null);
} }
@Override
public TypedProperties getProperties() {
return this.checkProperties();
}
@Override @Override
public void onReceipt(final ClientConsumerInternal consumer) { public void onReceipt(final ClientConsumerInternal consumer) {
this.consumer = consumer; this.consumer = consumer;

View File

@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.message.impl;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
import java.util.zip.Inflater; import java.util.zip.Inflater;
@ -52,6 +52,11 @@ import org.jboss.logging.Logger;
* consumers */ * consumers */
public class CoreMessage extends RefCountMessage implements ICoreMessage { public class CoreMessage extends RefCountMessage implements ICoreMessage {
// We use properties to establish routing context on clustering.
// However if the client resends the message after receiving, it needs to be removed, so we mark these internal
private static final Predicate<SimpleString> INTERNAL_PROPERTY_NAMES_PREDICATE =
name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) ||
(name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS));
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
private volatile int memoryEstimate = -1; private volatile int memoryEstimate = -1;
@ -121,26 +126,10 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
@Override @Override
public void cleanupInternalProperties() { public void clearInternalProperties() {
if (properties.hasInternalProperties()) { final TypedProperties properties = this.properties;
LinkedList<SimpleString> valuesToRemove = null; if (properties != null && properties.clearInternalProperties()) {
messageChanged();
for (SimpleString name : getPropertyNames()) {
// We use properties to establish routing context on clustering.
// However if the client resends the message after receiving, it needs to be removed
if ((name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) || (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS))) {
if (valuesToRemove == null) {
valuesToRemove = new LinkedList<>();
}
valuesToRemove.add(name);
}
}
if (valuesToRemove != null) {
for (SimpleString removal : valuesToRemove) {
this.removeProperty(removal);
}
}
} }
} }
@ -187,8 +176,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage setReplyTo(SimpleString address) { public CoreMessage setReplyTo(SimpleString address) {
if (address == null) { if (address == null) {
checkProperties(); getProperties().removeProperty(MessageUtil.REPLYTO_HEADER_NAME);
properties.removeProperty(MessageUtil.REPLYTO_HEADER_NAME);
} else { } else {
putStringProperty(MessageUtil.REPLYTO_HEADER_NAME, address); putStringProperty(MessageUtil.REPLYTO_HEADER_NAME, address);
} }
@ -343,8 +331,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public Long getScheduledDeliveryTime() { public Long getScheduledDeliveryTime() {
checkProperties(); Object property = getProperties().getProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
Object property = getObjectProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
if (property != null && property instanceof Number) { if (property != null && property instanceof Number) {
return ((Number) property).longValue(); return ((Number) property).longValue();
@ -355,9 +342,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public CoreMessage setScheduledDeliveryTime(Long time) { public CoreMessage setScheduledDeliveryTime(Long time) {
checkProperties();
if (time == null || time == 0) { if (time == null || time == 0) {
removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); getProperties().removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
} else { } else {
putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
} }
@ -375,7 +361,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public ActiveMQBuffer getBodyBuffer() { public ActiveMQBuffer getBodyBuffer() {
// if using the writable buffer, we must parse properties // if using the writable buffer, we must parse properties
checkProperties(); getProperties();
internalWritableBuffer(); internalWritableBuffer();
@ -405,14 +391,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return endOfBodyPosition; return endOfBodyPosition;
} }
public TypedProperties getTypedProperties() {
return checkProperties();
}
@Override @Override
public void messageChanged() { public void messageChanged() {
//a volatile store is a costly operation: better to check if is necessary
if (validBuffer) {
validBuffer = false; validBuffer = false;
} }
}
protected CoreMessage(CoreMessage other) { protected CoreMessage(CoreMessage other) {
this(other, other.properties); this(other, other.properties);
@ -465,13 +450,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
priority = msg.getPriority(); priority = msg.getPriority();
if (msg instanceof CoreMessage) { if (msg instanceof CoreMessage) {
properties = ((CoreMessage) msg).getTypedProperties(); properties = ((CoreMessage) msg).getProperties();
} }
} }
@Override @Override
public Message copy() { public Message copy() {
checkProperties(); getProperties();
checkEncode(); checkEncode();
return new CoreMessage(this); return new CoreMessage(this);
} }
@ -525,7 +510,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public CoreMessage setValidatedUserID(String validatedUserID) { public CoreMessage setValidatedUserID(String validatedUserID) {
putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool())); putStringProperty(Message.HDR_VALIDATED_USER, value(validatedUserID));
return this; return this;
} }
@ -579,36 +564,45 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
/** /**
* I am keeping this synchronized as the decode of the Properties is lazy * I am keeping this synchronized as the decode of the Properties is lazy
*/ */
protected TypedProperties checkProperties() { public final TypedProperties getProperties() {
TypedProperties properties = this.properties;
if (properties == null) {
properties = getOrInitializeTypedProperties();
}
return properties;
}
private synchronized TypedProperties getOrInitializeTypedProperties() {
try { try {
TypedProperties properties = this.properties;
if (properties == null) { if (properties == null) {
synchronized (this) { properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
if (properties == null) {
TypedProperties properties = new TypedProperties();
if (buffer != null && propertiesLocation >= 0) { if (buffer != null && propertiesLocation >= 0) {
final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
} }
this.properties = properties; this.properties = properties;
} }
return properties;
} catch (Throwable e) {
throw onCheckPropertiesError(e);
} }
} }
return this.properties; private RuntimeException onCheckPropertiesError(Throwable e) {
} catch (Throwable e) {
// This is not an expected error, hence no specific logger created // This is not an expected error, hence no specific logger created
logger.warn("Could not decode properties for CoreMessage[messageID=" + messageID + ",durable=" + durable + ",userID=" + userID + ",priority=" + priority + logger.warn("Could not decode properties for CoreMessage[messageID=" + messageID + ",durable=" + durable + ",userID=" + userID + ",priority=" + priority +
", timestamp=" + timestamp + ",expiration=" + expiration + ",address=" + address + ", propertiesLocation=" + propertiesLocation, e); ", timestamp=" + timestamp + ",expiration=" + expiration + ",address=" + address + ", propertiesLocation=" + propertiesLocation, e);
final ByteBuf buffer = this.buffer;
if (buffer != null) { if (buffer != null) {
ByteBuf duplicatebuffer = buffer.duplicate(); //risky: a racy modification to buffer indexes could break this duplicate operation
final ByteBuf duplicatebuffer = buffer.duplicate();
duplicatebuffer.readerIndex(0); duplicatebuffer.readerIndex(0);
logger.warn("Failed message has messageID=" + messageID + " and the following buffer:\n" + ByteBufUtil.prettyHexDump(duplicatebuffer)); logger.warn("Failed message has messageID=" + messageID + " and the following buffer:\n" + ByteBufUtil.prettyHexDump(duplicatebuffer));
} else { } else {
logger.warn("Failed message has messageID=" + messageID + " and the buffer was null"); logger.warn("Failed message has messageID=" + messageID + " and the buffer was null");
} }
throw new RuntimeException(e.getMessage(), e); return new RuntimeException(e.getMessage(), e);
}
} }
@Override @Override
@ -685,14 +679,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
properties = null; properties = null;
propertiesLocation = buffer.readerIndex(); propertiesLocation = buffer.readerIndex();
} else { } else {
properties = new TypedProperties(); properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
} }
} }
public synchronized CoreMessage encode() { public synchronized CoreMessage encode() {
checkProperties(); getProperties();
if (writableBuffer != null) { if (writableBuffer != null) {
// The message encode takes into consideration the PacketImpl which is not part of this encoding // The message encode takes into consideration the PacketImpl which is not part of this encoding
@ -716,7 +710,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
public void encodeHeadersAndProperties(final ByteBuf buffer) { public void encodeHeadersAndProperties(final ByteBuf buffer) {
checkProperties(); final TypedProperties properties = getProperties();
messageIDPosition = buffer.writerIndex(); messageIDPosition = buffer.writerIndex();
buffer.writeLong(messageID); buffer.writeLong(messageID);
SimpleString.writeNullableSimpleString(buffer, address); SimpleString.writeNullableSimpleString(buffer, address);
@ -745,7 +739,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
DataConstants./* Expiration */SIZE_LONG + DataConstants./* Expiration */SIZE_LONG +
DataConstants./* Timestamp */SIZE_LONG + DataConstants./* Timestamp */SIZE_LONG +
DataConstants./* Priority */SIZE_BYTE + DataConstants./* Priority */SIZE_BYTE +
/* PropertySize and Properties */checkProperties().getEncodeSize(); /* PropertySize and Properties */getProperties().getEncodeSize();
} }
@Override @Override
@ -819,294 +813,230 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public CoreMessage putBooleanProperty(final String key, final boolean value) { public CoreMessage putBooleanProperty(final String key, final boolean value) {
messageChanged(); return putBooleanProperty(key(key), value);
checkProperties();
properties.putBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public CoreMessage putBooleanProperty(final SimpleString key, final boolean value) { public CoreMessage putBooleanProperty(final SimpleString key, final boolean value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putBooleanProperty(key, value);
properties.putBooleanProperty(key, value);
return this; return this;
} }
@Override @Override
public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getBooleanProperty(key);
return properties.getBooleanProperty(key);
} }
@Override @Override
public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException { public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); return getBooleanProperty(key(key));
return properties.getBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
public CoreMessage putByteProperty(final SimpleString key, final byte value) { public CoreMessage putByteProperty(final SimpleString key, final byte value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putByteProperty(key, value);
properties.putByteProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putByteProperty(final String key, final byte value) { public CoreMessage putByteProperty(final String key, final byte value) {
messageChanged(); return putByteProperty(key(key), value);
checkProperties();
properties.putByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getByteProperty(key);
return properties.getByteProperty(key);
} }
@Override @Override
public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException { public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
return getByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); return getByteProperty(key(key));
} }
@Override @Override
public CoreMessage putBytesProperty(final SimpleString key, final byte[] value) { public CoreMessage putBytesProperty(final SimpleString key, final byte[] value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putBytesProperty(key, value);
properties.putBytesProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putBytesProperty(final String key, final byte[] value) { public CoreMessage putBytesProperty(final String key, final byte[] value) {
messageChanged(); return putBytesProperty(key(key), value);
checkProperties();
properties.putBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getBytesProperty(key);
return properties.getBytesProperty(key);
} }
@Override @Override
public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException { public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
return getBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); return getBytesProperty(key(key));
} }
@Override @Override
public CoreMessage putCharProperty(SimpleString key, char value) { public CoreMessage putCharProperty(SimpleString key, char value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putCharProperty(key, value);
properties.putCharProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putCharProperty(String key, char value) { public CoreMessage putCharProperty(String key, char value) {
messageChanged(); return putCharProperty(key(key), value);
checkProperties();
properties.putCharProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public CoreMessage putShortProperty(final SimpleString key, final short value) { public CoreMessage putShortProperty(final SimpleString key, final short value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putShortProperty(key, value);
properties.putShortProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putShortProperty(final String key, final short value) { public CoreMessage putShortProperty(final String key, final short value) {
messageChanged(); return putShortProperty(key(key), value);
checkProperties();
properties.putShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public CoreMessage putIntProperty(final SimpleString key, final int value) { public CoreMessage putIntProperty(final SimpleString key, final int value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putIntProperty(key, value);
properties.putIntProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putIntProperty(final String key, final int value) { public CoreMessage putIntProperty(final String key, final int value) {
messageChanged(); return putIntProperty(key(key), value);
checkProperties();
properties.putIntProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getIntProperty(key);
return properties.getIntProperty(key);
} }
@Override @Override
public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException { public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException {
return getIntProperty(SimpleString.toSimpleString(key)); return getIntProperty(key(key));
} }
@Override @Override
public CoreMessage putLongProperty(final SimpleString key, final long value) { public CoreMessage putLongProperty(final SimpleString key, final long value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putLongProperty(key, value);
properties.putLongProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putLongProperty(final String key, final long value) { public CoreMessage putLongProperty(final String key, final long value) {
messageChanged(); return putLongProperty(key(key), value);
checkProperties();
properties.putLongProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getLongProperty(key);
return properties.getLongProperty(key);
} }
@Override @Override
public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException { public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); return getLongProperty(key(key));
return getLongProperty(SimpleString.toSimpleString(key));
} }
@Override @Override
public CoreMessage putFloatProperty(final SimpleString key, final float value) { public CoreMessage putFloatProperty(final SimpleString key, final float value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putFloatProperty(key, value);
properties.putFloatProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putFloatProperty(final String key, final float value) { public CoreMessage putFloatProperty(final String key, final float value) {
messageChanged(); return putFloatProperty(key(key), value);
checkProperties();
properties.putFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public CoreMessage putDoubleProperty(final SimpleString key, final double value) { public CoreMessage putDoubleProperty(final SimpleString key, final double value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putDoubleProperty(key, value);
properties.putDoubleProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putDoubleProperty(final String key, final double value) { public CoreMessage putDoubleProperty(final String key, final double value) {
messageChanged(); return putDoubleProperty(key(key), value);
checkProperties();
properties.putDoubleProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getDoubleProperty(key);
return properties.getDoubleProperty(key);
} }
@Override @Override
public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException { public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); return getDoubleProperty(key(key));
return getDoubleProperty(SimpleString.toSimpleString(key));
} }
@Override @Override
public CoreMessage putStringProperty(final SimpleString key, final SimpleString value) { public CoreMessage putStringProperty(final SimpleString key, final SimpleString value) {
messageChanged(); messageChanged();
checkProperties(); getProperties().putSimpleStringProperty(key, value);
properties.putSimpleStringProperty(key, value);
return this; return this;
} }
@Override @Override
public CoreMessage putStringProperty(final SimpleString key, final String value) { public CoreMessage putStringProperty(final SimpleString key, final String value) {
messageChanged(); return putStringProperty(key, value(value));
checkProperties();
properties.putSimpleStringProperty(key, SimpleString.toSimpleString(value, getPropertyValuesPool()));
return this;
} }
@Override @Override
public CoreMessage putStringProperty(final String key, final String value) { public CoreMessage putStringProperty(final String key, final String value) {
messageChanged(); return putStringProperty(key(key), value(value));
checkProperties();
properties.putSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), SimpleString.toSimpleString(value, getPropertyValuesPool()));
return this;
} }
@Override @Override
public CoreMessage putObjectProperty(final SimpleString key, public CoreMessage putObjectProperty(final SimpleString key,
final Object value) throws ActiveMQPropertyConversionException { final Object value) throws ActiveMQPropertyConversionException {
checkProperties();
messageChanged(); messageChanged();
TypedProperties.setObjectProperty(key, value, properties); TypedProperties.setObjectProperty(key, value, getProperties());
return this; return this;
} }
@Override @Override
public Object getObjectProperty(final String key) { public Object getObjectProperty(final String key) {
checkProperties(); return getObjectProperty(key(key));
return getObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
public Object getObjectProperty(final SimpleString key) { public Object getObjectProperty(final SimpleString key) {
checkProperties(); return getProperties().getProperty(key);
return properties.getProperty(key);
} }
@Override @Override
public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException { public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
messageChanged(); return putObjectProperty(key(key), value);
putObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this;
} }
@Override @Override
public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getShortProperty(key);
return properties.getShortProperty(key);
} }
@Override @Override
public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException { public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); return getShortProperty(key(key));
return properties.getShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getFloatProperty(key);
return properties.getFloatProperty(key);
} }
@Override @Override
public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException { public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); return getFloatProperty(key(key));
return properties.getFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -1122,25 +1052,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public String getStringProperty(final String key) throws ActiveMQPropertyConversionException { public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
return getStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); return getStringProperty(key(key));
} }
@Override @Override
public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException { public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties(); return getProperties().getSimpleStringProperty(key);
return properties.getSimpleStringProperty(key);
} }
@Override @Override
public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException { public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); return getSimpleStringProperty(key(key));
return properties.getSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
public Object removeProperty(final SimpleString key) { public Object removeProperty(final SimpleString key) {
checkProperties(); Object oldValue = getProperties().removeProperty(key);
Object oldValue = properties.removeProperty(key);
if (oldValue != null) { if (oldValue != null) {
messageChanged(); messageChanged();
} }
@ -1149,31 +1076,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public Object removeProperty(final String key) { public Object removeProperty(final String key) {
messageChanged(); return removeProperty(key(key));
checkProperties();
Object oldValue = properties.removeProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
if (oldValue != null) {
messageChanged();
}
return oldValue;
} }
@Override @Override
public boolean containsProperty(final SimpleString key) { public boolean containsProperty(final SimpleString key) {
checkProperties(); return getProperties().containsProperty(key);
return properties.containsProperty(key);
} }
@Override @Override
public boolean containsProperty(final String key) { public boolean containsProperty(final String key) {
checkProperties(); return containsProperty(key(key));
return properties.containsProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
public Set<SimpleString> getPropertyNames() { public Set<SimpleString> getPropertyNames() {
checkProperties(); return getProperties().getPropertyNames();
return properties.getPropertyNames();
} }
@Override @Override
@ -1244,7 +1162,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public String toString() { public String toString() {
try { try {
checkProperties(); final TypedProperties properties = getProperties();
return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this); ", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this);
@ -1262,6 +1180,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
} }
private SimpleString key(String key) {
return SimpleString.toSimpleString(key, getPropertyKeysPool());
}
private SimpleString value(String value) {
return SimpleString.toSimpleString(value, getPropertyValuesPool());
}
private SimpleString.StringSimpleStringPool getPropertyKeysPool() { private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool(); return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
} }

View File

@ -692,7 +692,7 @@ public class MessageInternalImpl implements MessageInternal {
@Override @Override
public TypedProperties getTypedProperties() { public TypedProperties getTypedProperties() {
return new TypedProperties(message.getTypedProperties()); return new TypedProperties(message.getProperties());
} }
@Override @Override

View File

@ -363,7 +363,7 @@ public class PacketImpl implements Packet {
size = buffer.readerIndex(); size = buffer.readerIndex();
} }
protected ByteBuf copyMessageBuffer(ByteBuf buffer, int skipBytes) { protected static ByteBuf copyMessageBuffer(ByteBuf buffer, int skipBytes) {
ByteBuf newNettyBuffer = Unpooled.buffer(buffer.capacity() - PACKET_HEADERS_SIZE - skipBytes); ByteBuf newNettyBuffer = Unpooled.buffer(buffer.capacity() - PACKET_HEADERS_SIZE - skipBytes);

View File

@ -881,7 +881,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return RoutingStatus.DUPLICATED_ID; return RoutingStatus.DUPLICATED_ID;
} }
message.cleanupInternalProperties(); message.clearInternalProperties();
Bindings bindings = addressManager.getBindingsForRoutingAddress(address); Bindings bindings = addressManager.getBindingsForRoutingAddress(address);

View File

@ -47,9 +47,6 @@ public interface ServerMessage extends MessageInternal {
PagingStore getPagingStore(); PagingStore getPagingStore();
// Is there any _AMQ_ property being used
boolean hasInternalProperties();
boolean storeIsPaging(); boolean storeIsPaging();
void encodeMessageIDToBuffer(); void encodeMessageIDToBuffer();

View File

@ -166,11 +166,6 @@ public class ServerMessageImpl extends MessageInternalImpl implements ServerMess
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean hasInternalProperties() {
return message.getTypedProperties().hasInternalProperties();
}
@Override @Override
public boolean storeIsPaging() { public boolean storeIsPaging() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();