From 6446d01a151a6e73b359c356b44fcf447961bd5a Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Sun, 11 Nov 2018 10:44:29 +0100 Subject: [PATCH 1/2] ARTEMIS-2170 Optimized CoreMessage check and cleanup methods Any checkProperties(); pattern has been replaced by an atomic checkProperties(). to help both performance and consistency. The cleanup is now performed into CoreTypedProperties both for performance reasons (avoid lock/unlock many times) and consistency, given that the operation is now atomic. --- .../utils/collections/TypedProperties.java | 41 ++- .../artemis/utils/TypedPropertiesTest.java | 23 ++ .../core/client/impl/ClientMessageImpl.java | 6 - .../core/message/impl/CoreMessage.java | 304 +++++++----------- .../message/impl/MessageInternalImpl.java | 2 +- .../core/protocol/core/impl/PacketImpl.java | 2 +- .../artemis/core/server/ServerMessage.java | 3 - .../server/transformer/ServerMessageImpl.java | 5 - 8 files changed, 168 insertions(+), 218 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index 4cf4805824..4c9f1f05a6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.utils.collections; -import io.netty.buffer.ByteBuf; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; @@ -24,9 +23,13 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Predicate; + +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; @@ -55,14 +58,10 @@ import static org.apache.activemq.artemis.utils.DataConstants.STRING; */ public class TypedProperties { - private static final SimpleString AMQ_PROPNAME = new SimpleString("_AMQ_"); - private Map properties; private int size; - private boolean internalProperties; - public TypedProperties() { } @@ -88,10 +87,6 @@ public class TypedProperties { } } - public synchronized boolean hasInternalProperties() { - return internalProperties; - } - public void putBooleanProperty(final SimpleString key, final boolean value) { doPutValue(key, BooleanValue.of(value)); } @@ -318,6 +313,30 @@ public class TypedProperties { } } + public synchronized boolean removeProperty(Predicate propertyNamePredicate) { + Objects.requireNonNull(propertyNamePredicate, "propertyNamePredicate cannot be null"); + if (properties == null) { + return false; + } + if (properties.isEmpty()) { + return false; + } + int removedBytes = 0; + boolean removed = false; + for (Iterator> keyNameIterator = properties.entrySet().iterator(); keyNameIterator.hasNext(); ) { + final Entry entry = keyNameIterator.next(); + final SimpleString propertyName = entry.getKey(); + if (propertyNamePredicate.test(propertyName)) { + final PropertyValue propertyValue = entry.getValue(); + removedBytes += propertyName.sizeof() + propertyValue.encodeSize(); + keyNameIterator.remove(); + removed = true; + } + } + size -= removedBytes; + return removed; + } + public synchronized void forEachKey(Consumer action) { if (properties != null) { properties.keySet().forEach(action::accept); @@ -511,10 +530,6 @@ public class TypedProperties { // Private ------------------------------------------------------------------------------------ private synchronized void doPutValue(final SimpleString key, final PropertyValue value) { - if (key.startsWith(AMQ_PROPNAME)) { - internalProperties = true; - } - if (properties == null) { properties = new HashMap<>(); } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java index efc51c6125..c04f4ccc8c 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java @@ -226,6 +226,29 @@ public class TypedPropertiesTest { TypedPropertiesTest.assertEqualsTypeProperties(emptyProps, decodedProps); } + private static final SimpleString PROP_NAME = SimpleString.toSimpleString("TEST_PROP"); + + @Test + public void testRemovePropertyIfEmpty() { + TypedProperties properties = new TypedProperties(); + Assert.assertFalse(properties.removeProperty(PROP_NAME::equals)); + } + + @Test + public void testRemovePropertyWithoutMatch() { + TypedProperties properties = new TypedProperties(); + properties.putBooleanProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBoolean()); + Assert.assertFalse(properties.removeProperty(PROP_NAME::equals)); + } + + @Test + public void testRemovePropertyWithMatch() { + TypedProperties properties = new TypedProperties(); + properties.putBooleanProperty(PROP_NAME, true); + Assert.assertTrue(properties.removeProperty(PROP_NAME::equals)); + Assert.assertFalse(properties.containsProperty(PROP_NAME)); + } + @Before public void setUp() throws Exception { props = new TypedProperties(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java index 52ceb992dd..da104244d9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java @@ -31,7 +31,6 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; -import org.apache.activemq.artemis.utils.collections.TypedProperties; /** * A ClientMessageImpl @@ -115,11 +114,6 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter this(type, durable, expiration, timestamp, priority, initialMessageBufferSize, null); } - @Override - public TypedProperties getProperties() { - return this.checkProperties(); - } - @Override public void onReceipt(final ClientConsumerInternal consumer) { this.consumer = consumer; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index a53c4f2deb..5f86343071 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.message.impl; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.LinkedList; import java.util.Set; +import java.util.function.Predicate; import java.util.zip.DataFormatException; import java.util.zip.Inflater; @@ -52,6 +52,11 @@ import org.jboss.logging.Logger; * consumers */ public class CoreMessage extends RefCountMessage implements ICoreMessage { + // We use properties to establish routing context on clustering. + // However if the client resends the message after receiving, it needs to be removed + private static final Predicate INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER = + name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) || + (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS)); public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; private volatile int memoryEstimate = -1; @@ -122,25 +127,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public void cleanupInternalProperties() { - if (properties.hasInternalProperties()) { - LinkedList valuesToRemove = null; - - for (SimpleString name : getPropertyNames()) { - // We use properties to establish routing context on clustering. - // However if the client resends the message after receiving, it needs to be removed - if ((name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) || (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS))) { - if (valuesToRemove == null) { - valuesToRemove = new LinkedList<>(); - } - valuesToRemove.add(name); - } - } - - if (valuesToRemove != null) { - for (SimpleString removal : valuesToRemove) { - this.removeProperty(removal); - } - } + final TypedProperties properties = this.properties; + if (properties != null && properties.removeProperty(INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER)) { + messageChanged(); } } @@ -187,8 +176,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage setReplyTo(SimpleString address) { if (address == null) { - checkProperties(); - properties.removeProperty(MessageUtil.REPLYTO_HEADER_NAME); + getProperties().removeProperty(MessageUtil.REPLYTO_HEADER_NAME); } else { putStringProperty(MessageUtil.REPLYTO_HEADER_NAME, address); } @@ -343,8 +331,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Long getScheduledDeliveryTime() { - checkProperties(); - Object property = getObjectProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + Object property = getProperties().getProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); if (property != null && property instanceof Number) { return ((Number) property).longValue(); @@ -355,9 +342,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage setScheduledDeliveryTime(Long time) { - checkProperties(); if (time == null || time == 0) { - removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + getProperties().removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); } else { putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time); } @@ -375,7 +361,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public ActiveMQBuffer getBodyBuffer() { // if using the writable buffer, we must parse properties - checkProperties(); + getProperties(); internalWritableBuffer(); @@ -405,13 +391,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return endOfBodyPosition; } - public TypedProperties getTypedProperties() { - return checkProperties(); - } - @Override public void messageChanged() { - validBuffer = false; + //a volatile store is a costly operation: better to check if is necessary + if (validBuffer) { + validBuffer = false; + } } protected CoreMessage(CoreMessage other) { @@ -465,13 +450,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { priority = msg.getPriority(); if (msg instanceof CoreMessage) { - properties = ((CoreMessage) msg).getTypedProperties(); + properties = ((CoreMessage) msg).getProperties(); } } @Override public Message copy() { - checkProperties(); + getProperties(); checkEncode(); return new CoreMessage(this); } @@ -525,7 +510,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage setValidatedUserID(String validatedUserID) { - putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool())); + putStringProperty(Message.HDR_VALIDATED_USER, value(validatedUserID)); return this; } @@ -579,38 +564,47 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { /** * I am keeping this synchronized as the decode of the Properties is lazy */ - protected TypedProperties checkProperties() { + public final TypedProperties getProperties() { try { + TypedProperties properties = this.properties; if (properties == null) { - synchronized (this) { - if (properties == null) { - TypedProperties properties = new TypedProperties(); - if (buffer != null && propertiesLocation >= 0) { - final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); - properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); - } - this.properties = properties; - } - } + properties = getOrInitializeTypedProperties(); } - - return this.properties; + return properties; } catch (Throwable e) { - // This is not an expected error, hence no specific logger created - logger.warn("Could not decode properties for CoreMessage[messageID=" + messageID + ",durable=" + durable + ",userID=" + userID + ",priority=" + priority + - ", timestamp=" + timestamp + ",expiration=" + expiration + ",address=" + address + ", propertiesLocation=" + propertiesLocation, e); - if (buffer != null) { - ByteBuf duplicatebuffer = buffer.duplicate(); - duplicatebuffer.readerIndex(0); - logger.warn("Failed message has messageID=" + messageID + " and the following buffer:\n" + ByteBufUtil.prettyHexDump(duplicatebuffer)); - } else { - logger.warn("Failed message has messageID=" + messageID + " and the buffer was null"); - } - throw new RuntimeException(e.getMessage(), e); - + throw onCheckPropertiesError(e); } } + private synchronized TypedProperties getOrInitializeTypedProperties() { + TypedProperties properties = this.properties; + if (properties == null) { + properties = new TypedProperties(); + if (buffer != null && propertiesLocation >= 0) { + final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); + properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); + } + this.properties = properties; + } + return properties; + } + + private RuntimeException onCheckPropertiesError(Throwable e) { + // This is not an expected error, hence no specific logger created + logger.warn("Could not decode properties for CoreMessage[messageID=" + messageID + ",durable=" + durable + ",userID=" + userID + ",priority=" + priority + + ", timestamp=" + timestamp + ",expiration=" + expiration + ",address=" + address + ", propertiesLocation=" + propertiesLocation, e); + final ByteBuf buffer = this.buffer; + if (buffer != null) { + //risky: a racy modification to buffer indexes could break this duplicate operation + final ByteBuf duplicatebuffer = buffer.duplicate(); + duplicatebuffer.readerIndex(0); + logger.warn("Failed message has messageID=" + messageID + " and the following buffer:\n" + ByteBufUtil.prettyHexDump(duplicatebuffer)); + } else { + logger.warn("Failed message has messageID=" + messageID + " and the buffer was null"); + } + return new RuntimeException(e.getMessage(), e); + } + @Override public int getMemoryEstimate() { if (memoryEstimate == -1) { @@ -692,7 +686,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public synchronized CoreMessage encode() { - checkProperties(); + getProperties(); if (writableBuffer != null) { // The message encode takes into consideration the PacketImpl which is not part of this encoding @@ -716,7 +710,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } public void encodeHeadersAndProperties(final ByteBuf buffer) { - checkProperties(); + final TypedProperties properties = getProperties(); messageIDPosition = buffer.writerIndex(); buffer.writeLong(messageID); SimpleString.writeNullableSimpleString(buffer, address); @@ -745,7 +739,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { DataConstants./* Expiration */SIZE_LONG + DataConstants./* Timestamp */SIZE_LONG + DataConstants./* Priority */SIZE_BYTE + - /* PropertySize and Properties */checkProperties().getEncodeSize(); + /* PropertySize and Properties */getProperties().getEncodeSize(); } @Override @@ -819,294 +813,230 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage putBooleanProperty(final String key, final boolean value) { - messageChanged(); - checkProperties(); - properties.putBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putBooleanProperty(key(key), value); } @Override public CoreMessage putBooleanProperty(final SimpleString key, final boolean value) { messageChanged(); - checkProperties(); - properties.putBooleanProperty(key, value); + getProperties().putBooleanProperty(key, value); return this; } @Override public Boolean getBooleanProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getBooleanProperty(key); + return getProperties().getBooleanProperty(key); } @Override public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return getBooleanProperty(key(key)); } @Override public CoreMessage putByteProperty(final SimpleString key, final byte value) { messageChanged(); - checkProperties(); - properties.putByteProperty(key, value); + getProperties().putByteProperty(key, value); return this; } @Override public CoreMessage putByteProperty(final String key, final byte value) { - messageChanged(); - checkProperties(); - properties.putByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - - return this; + return putByteProperty(key(key), value); } @Override public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getByteProperty(key); + return getProperties().getByteProperty(key); } @Override public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException { - return getByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return getByteProperty(key(key)); } @Override public CoreMessage putBytesProperty(final SimpleString key, final byte[] value) { messageChanged(); - checkProperties(); - properties.putBytesProperty(key, value); - + getProperties().putBytesProperty(key, value); return this; } @Override public CoreMessage putBytesProperty(final String key, final byte[] value) { - messageChanged(); - checkProperties(); - properties.putBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putBytesProperty(key(key), value); } @Override public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getBytesProperty(key); + return getProperties().getBytesProperty(key); } @Override public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException { - return getBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return getBytesProperty(key(key)); } @Override public CoreMessage putCharProperty(SimpleString key, char value) { messageChanged(); - checkProperties(); - properties.putCharProperty(key, value); + getProperties().putCharProperty(key, value); return this; } @Override public CoreMessage putCharProperty(String key, char value) { - messageChanged(); - checkProperties(); - properties.putCharProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putCharProperty(key(key), value); } @Override public CoreMessage putShortProperty(final SimpleString key, final short value) { messageChanged(); - checkProperties(); - properties.putShortProperty(key, value); + getProperties().putShortProperty(key, value); return this; } @Override public CoreMessage putShortProperty(final String key, final short value) { - messageChanged(); - checkProperties(); - properties.putShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putShortProperty(key(key), value); } @Override public CoreMessage putIntProperty(final SimpleString key, final int value) { messageChanged(); - checkProperties(); - properties.putIntProperty(key, value); + getProperties().putIntProperty(key, value); return this; } @Override public CoreMessage putIntProperty(final String key, final int value) { - messageChanged(); - checkProperties(); - properties.putIntProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putIntProperty(key(key), value); } @Override public Integer getIntProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getIntProperty(key); + return getProperties().getIntProperty(key); } @Override public Integer getIntProperty(final String key) throws ActiveMQPropertyConversionException { - return getIntProperty(SimpleString.toSimpleString(key)); + return getIntProperty(key(key)); } @Override public CoreMessage putLongProperty(final SimpleString key, final long value) { messageChanged(); - checkProperties(); - properties.putLongProperty(key, value); + getProperties().putLongProperty(key, value); return this; } @Override public CoreMessage putLongProperty(final String key, final long value) { - messageChanged(); - checkProperties(); - properties.putLongProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putLongProperty(key(key), value); } @Override public Long getLongProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getLongProperty(key); + return getProperties().getLongProperty(key); } @Override public Long getLongProperty(final String key) throws ActiveMQPropertyConversionException { - checkProperties(); - return getLongProperty(SimpleString.toSimpleString(key)); + return getLongProperty(key(key)); } @Override public CoreMessage putFloatProperty(final SimpleString key, final float value) { messageChanged(); - checkProperties(); - properties.putFloatProperty(key, value); + getProperties().putFloatProperty(key, value); return this; } @Override public CoreMessage putFloatProperty(final String key, final float value) { - messageChanged(); - checkProperties(); - properties.putFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putFloatProperty(key(key), value); } @Override public CoreMessage putDoubleProperty(final SimpleString key, final double value) { messageChanged(); - checkProperties(); - properties.putDoubleProperty(key, value); + getProperties().putDoubleProperty(key, value); return this; } @Override public CoreMessage putDoubleProperty(final String key, final double value) { - messageChanged(); - checkProperties(); - properties.putDoubleProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putDoubleProperty(key(key), value); } @Override public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getDoubleProperty(key); + return getProperties().getDoubleProperty(key); } @Override public Double getDoubleProperty(final String key) throws ActiveMQPropertyConversionException { - checkProperties(); - return getDoubleProperty(SimpleString.toSimpleString(key)); + return getDoubleProperty(key(key)); } @Override public CoreMessage putStringProperty(final SimpleString key, final SimpleString value) { messageChanged(); - checkProperties(); - properties.putSimpleStringProperty(key, value); + getProperties().putSimpleStringProperty(key, value); return this; } @Override public CoreMessage putStringProperty(final SimpleString key, final String value) { - messageChanged(); - checkProperties(); - properties.putSimpleStringProperty(key, SimpleString.toSimpleString(value, getPropertyValuesPool())); - return this; + return putStringProperty(key, value(value)); } - @Override public CoreMessage putStringProperty(final String key, final String value) { - messageChanged(); - checkProperties(); - properties.putSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), SimpleString.toSimpleString(value, getPropertyValuesPool())); - return this; + return putStringProperty(key(key), value(value)); } @Override public CoreMessage putObjectProperty(final SimpleString key, final Object value) throws ActiveMQPropertyConversionException { - checkProperties(); messageChanged(); - TypedProperties.setObjectProperty(key, value, properties); + TypedProperties.setObjectProperty(key, value, getProperties()); return this; } @Override public Object getObjectProperty(final String key) { - checkProperties(); - return getObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return getObjectProperty(key(key)); } @Override public Object getObjectProperty(final SimpleString key) { - checkProperties(); - return properties.getProperty(key); + return getProperties().getProperty(key); } @Override public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException { - messageChanged(); - putObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); - return this; + return putObjectProperty(key(key), value); } @Override public Short getShortProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getShortProperty(key); + return getProperties().getShortProperty(key); } @Override public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return getShortProperty(key(key)); } @Override public Float getFloatProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getFloatProperty(key); + return getProperties().getFloatProperty(key); } @Override public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return getFloatProperty(key(key)); } @Override @@ -1122,25 +1052,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public String getStringProperty(final String key) throws ActiveMQPropertyConversionException { - return getStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return getStringProperty(key(key)); } @Override public SimpleString getSimpleStringProperty(final SimpleString key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getSimpleStringProperty(key); + return getProperties().getSimpleStringProperty(key); } @Override public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException { - checkProperties(); - return properties.getSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return getSimpleStringProperty(key(key)); } @Override public Object removeProperty(final SimpleString key) { - checkProperties(); - Object oldValue = properties.removeProperty(key); + Object oldValue = getProperties().removeProperty(key); if (oldValue != null) { messageChanged(); } @@ -1149,31 +1076,22 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Object removeProperty(final String key) { - messageChanged(); - checkProperties(); - Object oldValue = properties.removeProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); - if (oldValue != null) { - messageChanged(); - } - return oldValue; + return removeProperty(key(key)); } @Override public boolean containsProperty(final SimpleString key) { - checkProperties(); - return properties.containsProperty(key); + return getProperties().containsProperty(key); } @Override public boolean containsProperty(final String key) { - checkProperties(); - return properties.containsProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); + return containsProperty(key(key)); } @Override public Set getPropertyNames() { - checkProperties(); - return properties.getPropertyNames(); + return getProperties().getPropertyNames(); } @Override @@ -1244,7 +1162,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public String toString() { try { - checkProperties(); + final TypedProperties properties = getProperties(); return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this); @@ -1262,6 +1180,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } } + private SimpleString key(String key) { + return SimpleString.toSimpleString(key, getPropertyKeysPool()); + } + + private SimpleString value(String value) { + return SimpleString.toSimpleString(value, getPropertyValuesPool()); + } + private SimpleString.StringSimpleStringPool getPropertyKeysPool() { return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java index 17cb828766..e1dcf97042 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java @@ -692,7 +692,7 @@ public class MessageInternalImpl implements MessageInternal { @Override public TypedProperties getTypedProperties() { - return new TypedProperties(message.getTypedProperties()); + return new TypedProperties(message.getProperties()); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 62bf423dfe..41c2dc495d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -363,7 +363,7 @@ public class PacketImpl implements Packet { size = buffer.readerIndex(); } - protected ByteBuf copyMessageBuffer(ByteBuf buffer, int skipBytes) { + protected static ByteBuf copyMessageBuffer(ByteBuf buffer, int skipBytes) { ByteBuf newNettyBuffer = Unpooled.buffer(buffer.capacity() - PACKET_HEADERS_SIZE - skipBytes); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java index 60e8035221..8e5665a5fe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java @@ -47,9 +47,6 @@ public interface ServerMessage extends MessageInternal { PagingStore getPagingStore(); - // Is there any _AMQ_ property being used - boolean hasInternalProperties(); - boolean storeIsPaging(); void encodeMessageIDToBuffer(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/transformer/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/transformer/ServerMessageImpl.java index fe8e9ec7e6..0e0220eb3d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/transformer/ServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/transformer/ServerMessageImpl.java @@ -166,11 +166,6 @@ public class ServerMessageImpl extends MessageInternalImpl implements ServerMess throw new UnsupportedOperationException(); } - @Override - public boolean hasInternalProperties() { - return message.getTypedProperties().hasInternalProperties(); - } - @Override public boolean storeIsPaging() { throw new UnsupportedOperationException(); From cf65912bccf59fe381b155011d957ea04f80be96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Mon, 21 Jan 2019 21:23:57 +0000 Subject: [PATCH 2/2] ARTEMIS-2170 Optimized CoreMessage clearInternalProperties Ensure only iterate properties, if internal property is set. --- .../utils/collections/TypedProperties.java | 33 +++++++++++++--- .../artemis/utils/TypedPropertiesTest.java | 27 ++++++------- .../activemq/artemis/api/core/Message.java | 2 +- .../core/message/impl/CoreMessage.java | 38 +++++++++---------- .../core/postoffice/impl/PostOfficeImpl.java | 2 +- 5 files changed, 62 insertions(+), 40 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index 4c9f1f05a6..c6b513d190 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -62,7 +61,15 @@ public class TypedProperties { private int size; + private final Predicate internalPropertyPredicate; + private boolean internalProperties; + public TypedProperties() { + this.internalPropertyPredicate = null; + } + + public TypedProperties(Predicate internalPropertyPredicate) { + this.internalPropertyPredicate = internalPropertyPredicate; } /** @@ -84,6 +91,8 @@ public class TypedProperties { synchronized (other) { properties = other.properties == null ? null : new HashMap<>(other.properties); size = other.size; + internalPropertyPredicate = other.internalPropertyPredicate; + internalProperties = other.internalProperties; } } @@ -313,8 +322,14 @@ public class TypedProperties { } } - public synchronized boolean removeProperty(Predicate propertyNamePredicate) { - Objects.requireNonNull(propertyNamePredicate, "propertyNamePredicate cannot be null"); + public synchronized boolean clearInternalProperties() { + return internalProperties && removeInternalProperties(); + } + + private synchronized boolean removeInternalProperties() { + if (internalPropertyPredicate == null) { + return false; + } if (properties == null) { return false; } @@ -323,16 +338,18 @@ public class TypedProperties { } int removedBytes = 0; boolean removed = false; - for (Iterator> keyNameIterator = properties.entrySet().iterator(); keyNameIterator.hasNext(); ) { + final Iterator> keyNameIterator = properties.entrySet().iterator(); + while (keyNameIterator.hasNext()) { final Entry entry = keyNameIterator.next(); final SimpleString propertyName = entry.getKey(); - if (propertyNamePredicate.test(propertyName)) { + if (internalPropertyPredicate.test(propertyName)) { final PropertyValue propertyValue = entry.getValue(); removedBytes += propertyName.sizeof() + propertyValue.encodeSize(); keyNameIterator.remove(); removed = true; } } + internalProperties = false; size -= removedBytes; return removed; } @@ -530,6 +547,10 @@ public class TypedProperties { // Private ------------------------------------------------------------------------------------ private synchronized void doPutValue(final SimpleString key, final PropertyValue value) { + if (!internalProperties && internalPropertyPredicate != null && internalPropertyPredicate.test(key)) { + internalProperties = true; + } + if (properties == null) { properties = new HashMap<>(); } @@ -556,7 +577,7 @@ public class TypedProperties { } } - private synchronized Object doGetProperty(final Object key) { + private synchronized Object doGetProperty(final SimpleString key) { if (properties == null) { return null; } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java index c04f4ccc8c..e876f00085 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java @@ -229,26 +229,27 @@ public class TypedPropertiesTest { private static final SimpleString PROP_NAME = SimpleString.toSimpleString("TEST_PROP"); @Test - public void testRemovePropertyIfEmpty() { + public void testCannotClearInternalPropertiesIfEmpty() { TypedProperties properties = new TypedProperties(); - Assert.assertFalse(properties.removeProperty(PROP_NAME::equals)); + Assert.assertFalse(properties.clearInternalProperties()); } @Test - public void testRemovePropertyWithoutMatch() { - TypedProperties properties = new TypedProperties(); - properties.putBooleanProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBoolean()); - Assert.assertFalse(properties.removeProperty(PROP_NAME::equals)); - } - - @Test - public void testRemovePropertyWithMatch() { - TypedProperties properties = new TypedProperties(); - properties.putBooleanProperty(PROP_NAME, true); - Assert.assertTrue(properties.removeProperty(PROP_NAME::equals)); + public void testClearInternalPropertiesIfAny() { + TypedProperties properties = new TypedProperties(PROP_NAME::equals); + properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean()); + Assert.assertTrue(properties.clearInternalProperties()); Assert.assertFalse(properties.containsProperty(PROP_NAME)); } + @Test + public void testCannotClearInternalPropertiesTwiceIfAny() { + TypedProperties properties = new TypedProperties(PROP_NAME::equals); + properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean()); + Assert.assertTrue(properties.clearInternalProperties()); + Assert.assertFalse(properties.clearInternalProperties()); + } + @Before public void setUp() throws Exception { props = new TypedProperties(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 5752e7b957..98e7f80a16 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -176,7 +176,7 @@ public interface Message { /** The message will contain another message persisted through {@link org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil}*/ byte EMBEDDED_TYPE = 7; - default void cleanupInternalProperties() { + default void clearInternalProperties() { // only on core } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 5f86343071..9fdd05b7b5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -53,8 +53,8 @@ import org.jboss.logging.Logger; public class CoreMessage extends RefCountMessage implements ICoreMessage { // We use properties to establish routing context on clustering. - // However if the client resends the message after receiving, it needs to be removed - private static final Predicate INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER = + // However if the client resends the message after receiving, it needs to be removed, so we mark these internal + private static final Predicate INTERNAL_PROPERTY_NAMES_PREDICATE = name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) || (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS)); public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; @@ -126,9 +126,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override - public void cleanupInternalProperties() { + public void clearInternalProperties() { final TypedProperties properties = this.properties; - if (properties != null && properties.removeProperty(INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER)) { + if (properties != null && properties.clearInternalProperties()) { messageChanged(); } } @@ -565,10 +565,23 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { * I am keeping this synchronized as the decode of the Properties is lazy */ public final TypedProperties getProperties() { + TypedProperties properties = this.properties; + if (properties == null) { + properties = getOrInitializeTypedProperties(); + } + return properties; + } + + private synchronized TypedProperties getOrInitializeTypedProperties() { try { TypedProperties properties = this.properties; if (properties == null) { - properties = getOrInitializeTypedProperties(); + properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE); + if (buffer != null && propertiesLocation >= 0) { + final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); + properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); + } + this.properties = properties; } return properties; } catch (Throwable e) { @@ -576,19 +589,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } } - private synchronized TypedProperties getOrInitializeTypedProperties() { - TypedProperties properties = this.properties; - if (properties == null) { - properties = new TypedProperties(); - if (buffer != null && propertiesLocation >= 0) { - final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); - properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); - } - this.properties = properties; - } - return properties; - } - private RuntimeException onCheckPropertiesError(Throwable e) { // This is not an expected error, hence no specific logger created logger.warn("Could not decode properties for CoreMessage[messageID=" + messageID + ",durable=" + durable + ",userID=" + userID + ",priority=" + priority + @@ -679,7 +679,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { properties = null; propertiesLocation = buffer.readerIndex(); } else { - properties = new TypedProperties(); + properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE); properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index f78af72d38..605e43ee95 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -881,7 +881,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return RoutingStatus.DUPLICATED_ID; } - message.cleanupInternalProperties(); + message.clearInternalProperties(); Bindings bindings = addressManager.getBindingsForRoutingAddress(address);