diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index 4c9f1f05a6..c6b513d190 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -62,7 +61,15 @@ public class TypedProperties { private int size; + private final Predicate internalPropertyPredicate; + private boolean internalProperties; + public TypedProperties() { + this.internalPropertyPredicate = null; + } + + public TypedProperties(Predicate internalPropertyPredicate) { + this.internalPropertyPredicate = internalPropertyPredicate; } /** @@ -84,6 +91,8 @@ public class TypedProperties { synchronized (other) { properties = other.properties == null ? null : new HashMap<>(other.properties); size = other.size; + internalPropertyPredicate = other.internalPropertyPredicate; + internalProperties = other.internalProperties; } } @@ -313,8 +322,14 @@ public class TypedProperties { } } - public synchronized boolean removeProperty(Predicate propertyNamePredicate) { - Objects.requireNonNull(propertyNamePredicate, "propertyNamePredicate cannot be null"); + public synchronized boolean clearInternalProperties() { + return internalProperties && removeInternalProperties(); + } + + private synchronized boolean removeInternalProperties() { + if (internalPropertyPredicate == null) { + return false; + } if (properties == null) { return false; } @@ -323,16 +338,18 @@ public class TypedProperties { } int removedBytes = 0; boolean removed = false; - for (Iterator> keyNameIterator = properties.entrySet().iterator(); keyNameIterator.hasNext(); ) { + final Iterator> keyNameIterator = properties.entrySet().iterator(); + while (keyNameIterator.hasNext()) { final Entry entry = keyNameIterator.next(); final SimpleString propertyName = entry.getKey(); - if (propertyNamePredicate.test(propertyName)) { + if (internalPropertyPredicate.test(propertyName)) { final PropertyValue propertyValue = entry.getValue(); removedBytes += propertyName.sizeof() + propertyValue.encodeSize(); keyNameIterator.remove(); removed = true; } } + internalProperties = false; size -= removedBytes; return removed; } @@ -530,6 +547,10 @@ public class TypedProperties { // Private ------------------------------------------------------------------------------------ private synchronized void doPutValue(final SimpleString key, final PropertyValue value) { + if (!internalProperties && internalPropertyPredicate != null && internalPropertyPredicate.test(key)) { + internalProperties = true; + } + if (properties == null) { properties = new HashMap<>(); } @@ -556,7 +577,7 @@ public class TypedProperties { } } - private synchronized Object doGetProperty(final Object key) { + private synchronized Object doGetProperty(final SimpleString key) { if (properties == null) { return null; } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java index c04f4ccc8c..e876f00085 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java @@ -229,26 +229,27 @@ public class TypedPropertiesTest { private static final SimpleString PROP_NAME = SimpleString.toSimpleString("TEST_PROP"); @Test - public void testRemovePropertyIfEmpty() { + public void testCannotClearInternalPropertiesIfEmpty() { TypedProperties properties = new TypedProperties(); - Assert.assertFalse(properties.removeProperty(PROP_NAME::equals)); + Assert.assertFalse(properties.clearInternalProperties()); } @Test - public void testRemovePropertyWithoutMatch() { - TypedProperties properties = new TypedProperties(); - properties.putBooleanProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBoolean()); - Assert.assertFalse(properties.removeProperty(PROP_NAME::equals)); - } - - @Test - public void testRemovePropertyWithMatch() { - TypedProperties properties = new TypedProperties(); - properties.putBooleanProperty(PROP_NAME, true); - Assert.assertTrue(properties.removeProperty(PROP_NAME::equals)); + public void testClearInternalPropertiesIfAny() { + TypedProperties properties = new TypedProperties(PROP_NAME::equals); + properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean()); + Assert.assertTrue(properties.clearInternalProperties()); Assert.assertFalse(properties.containsProperty(PROP_NAME)); } + @Test + public void testCannotClearInternalPropertiesTwiceIfAny() { + TypedProperties properties = new TypedProperties(PROP_NAME::equals); + properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean()); + Assert.assertTrue(properties.clearInternalProperties()); + Assert.assertFalse(properties.clearInternalProperties()); + } + @Before public void setUp() throws Exception { props = new TypedProperties(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 5752e7b957..98e7f80a16 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -176,7 +176,7 @@ public interface Message { /** The message will contain another message persisted through {@link org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil}*/ byte EMBEDDED_TYPE = 7; - default void cleanupInternalProperties() { + default void clearInternalProperties() { // only on core } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 5f86343071..9fdd05b7b5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -53,8 +53,8 @@ import org.jboss.logging.Logger; public class CoreMessage extends RefCountMessage implements ICoreMessage { // We use properties to establish routing context on clustering. - // However if the client resends the message after receiving, it needs to be removed - private static final Predicate INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER = + // However if the client resends the message after receiving, it needs to be removed, so we mark these internal + private static final Predicate INTERNAL_PROPERTY_NAMES_PREDICATE = name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) || (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS)); public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; @@ -126,9 +126,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override - public void cleanupInternalProperties() { + public void clearInternalProperties() { final TypedProperties properties = this.properties; - if (properties != null && properties.removeProperty(INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER)) { + if (properties != null && properties.clearInternalProperties()) { messageChanged(); } } @@ -565,10 +565,23 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { * I am keeping this synchronized as the decode of the Properties is lazy */ public final TypedProperties getProperties() { + TypedProperties properties = this.properties; + if (properties == null) { + properties = getOrInitializeTypedProperties(); + } + return properties; + } + + private synchronized TypedProperties getOrInitializeTypedProperties() { try { TypedProperties properties = this.properties; if (properties == null) { - properties = getOrInitializeTypedProperties(); + properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE); + if (buffer != null && propertiesLocation >= 0) { + final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); + properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); + } + this.properties = properties; } return properties; } catch (Throwable e) { @@ -576,19 +589,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } } - private synchronized TypedProperties getOrInitializeTypedProperties() { - TypedProperties properties = this.properties; - if (properties == null) { - properties = new TypedProperties(); - if (buffer != null && propertiesLocation >= 0) { - final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); - properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); - } - this.properties = properties; - } - return properties; - } - private RuntimeException onCheckPropertiesError(Throwable e) { // This is not an expected error, hence no specific logger created logger.warn("Could not decode properties for CoreMessage[messageID=" + messageID + ",durable=" + durable + ",userID=" + userID + ",priority=" + priority + @@ -679,7 +679,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { properties = null; propertiesLocation = buffer.readerIndex(); } else { - properties = new TypedProperties(); + properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE); properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index f78af72d38..605e43ee95 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -881,7 +881,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return RoutingStatus.DUPLICATED_ID; } - message.cleanupInternalProperties(); + message.clearInternalProperties(); Bindings bindings = addressManager.getBindingsForRoutingAddress(address);