ARTEMIS-2170 Optimized CoreMessage clearInternalProperties
Ensure only iterate properties, if internal property is set.
This commit is contained in:
parent
6446d01a15
commit
cf65912bcc
|
@ -23,7 +23,6 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -62,7 +61,15 @@ public class TypedProperties {
|
|||
|
||||
private int size;
|
||||
|
||||
private final Predicate<SimpleString> internalPropertyPredicate;
|
||||
private boolean internalProperties;
|
||||
|
||||
public TypedProperties() {
|
||||
this.internalPropertyPredicate = null;
|
||||
}
|
||||
|
||||
public TypedProperties(Predicate<SimpleString> internalPropertyPredicate) {
|
||||
this.internalPropertyPredicate = internalPropertyPredicate;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -84,6 +91,8 @@ public class TypedProperties {
|
|||
synchronized (other) {
|
||||
properties = other.properties == null ? null : new HashMap<>(other.properties);
|
||||
size = other.size;
|
||||
internalPropertyPredicate = other.internalPropertyPredicate;
|
||||
internalProperties = other.internalProperties;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -313,8 +322,14 @@ public class TypedProperties {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized boolean removeProperty(Predicate<SimpleString> propertyNamePredicate) {
|
||||
Objects.requireNonNull(propertyNamePredicate, "propertyNamePredicate cannot be null");
|
||||
public synchronized boolean clearInternalProperties() {
|
||||
return internalProperties && removeInternalProperties();
|
||||
}
|
||||
|
||||
private synchronized boolean removeInternalProperties() {
|
||||
if (internalPropertyPredicate == null) {
|
||||
return false;
|
||||
}
|
||||
if (properties == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -323,16 +338,18 @@ public class TypedProperties {
|
|||
}
|
||||
int removedBytes = 0;
|
||||
boolean removed = false;
|
||||
for (Iterator<Entry<SimpleString, PropertyValue>> keyNameIterator = properties.entrySet().iterator(); keyNameIterator.hasNext(); ) {
|
||||
final Iterator<Entry<SimpleString, PropertyValue>> keyNameIterator = properties.entrySet().iterator();
|
||||
while (keyNameIterator.hasNext()) {
|
||||
final Entry<SimpleString, PropertyValue> entry = keyNameIterator.next();
|
||||
final SimpleString propertyName = entry.getKey();
|
||||
if (propertyNamePredicate.test(propertyName)) {
|
||||
if (internalPropertyPredicate.test(propertyName)) {
|
||||
final PropertyValue propertyValue = entry.getValue();
|
||||
removedBytes += propertyName.sizeof() + propertyValue.encodeSize();
|
||||
keyNameIterator.remove();
|
||||
removed = true;
|
||||
}
|
||||
}
|
||||
internalProperties = false;
|
||||
size -= removedBytes;
|
||||
return removed;
|
||||
}
|
||||
|
@ -530,6 +547,10 @@ public class TypedProperties {
|
|||
// Private ------------------------------------------------------------------------------------
|
||||
|
||||
private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
|
||||
if (!internalProperties && internalPropertyPredicate != null && internalPropertyPredicate.test(key)) {
|
||||
internalProperties = true;
|
||||
}
|
||||
|
||||
if (properties == null) {
|
||||
properties = new HashMap<>();
|
||||
}
|
||||
|
@ -556,7 +577,7 @@ public class TypedProperties {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized Object doGetProperty(final Object key) {
|
||||
private synchronized Object doGetProperty(final SimpleString key) {
|
||||
if (properties == null) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -229,26 +229,27 @@ public class TypedPropertiesTest {
|
|||
private static final SimpleString PROP_NAME = SimpleString.toSimpleString("TEST_PROP");
|
||||
|
||||
@Test
|
||||
public void testRemovePropertyIfEmpty() {
|
||||
public void testCannotClearInternalPropertiesIfEmpty() {
|
||||
TypedProperties properties = new TypedProperties();
|
||||
Assert.assertFalse(properties.removeProperty(PROP_NAME::equals));
|
||||
Assert.assertFalse(properties.clearInternalProperties());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemovePropertyWithoutMatch() {
|
||||
TypedProperties properties = new TypedProperties();
|
||||
properties.putBooleanProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBoolean());
|
||||
Assert.assertFalse(properties.removeProperty(PROP_NAME::equals));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemovePropertyWithMatch() {
|
||||
TypedProperties properties = new TypedProperties();
|
||||
properties.putBooleanProperty(PROP_NAME, true);
|
||||
Assert.assertTrue(properties.removeProperty(PROP_NAME::equals));
|
||||
public void testClearInternalPropertiesIfAny() {
|
||||
TypedProperties properties = new TypedProperties(PROP_NAME::equals);
|
||||
properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean());
|
||||
Assert.assertTrue(properties.clearInternalProperties());
|
||||
Assert.assertFalse(properties.containsProperty(PROP_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCannotClearInternalPropertiesTwiceIfAny() {
|
||||
TypedProperties properties = new TypedProperties(PROP_NAME::equals);
|
||||
properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean());
|
||||
Assert.assertTrue(properties.clearInternalProperties());
|
||||
Assert.assertFalse(properties.clearInternalProperties());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
props = new TypedProperties();
|
||||
|
|
|
@ -176,7 +176,7 @@ public interface Message {
|
|||
/** The message will contain another message persisted through {@link org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil}*/
|
||||
byte EMBEDDED_TYPE = 7;
|
||||
|
||||
default void cleanupInternalProperties() {
|
||||
default void clearInternalProperties() {
|
||||
// only on core
|
||||
}
|
||||
|
||||
|
|
|
@ -53,8 +53,8 @@ import org.jboss.logging.Logger;
|
|||
public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
||||
|
||||
// We use properties to establish routing context on clustering.
|
||||
// However if the client resends the message after receiving, it needs to be removed
|
||||
private static final Predicate<SimpleString> INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER =
|
||||
// However if the client resends the message after receiving, it needs to be removed, so we mark these internal
|
||||
private static final Predicate<SimpleString> INTERNAL_PROPERTY_NAMES_PREDICATE =
|
||||
name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) ||
|
||||
(name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS));
|
||||
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
|
||||
|
@ -126,9 +126,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cleanupInternalProperties() {
|
||||
public void clearInternalProperties() {
|
||||
final TypedProperties properties = this.properties;
|
||||
if (properties != null && properties.removeProperty(INTERNAL_PROPERTY_NAMES_CLEANUP_FILTER)) {
|
||||
if (properties != null && properties.clearInternalProperties()) {
|
||||
messageChanged();
|
||||
}
|
||||
}
|
||||
|
@ -565,21 +565,18 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
* I am keeping this synchronized as the decode of the Properties is lazy
|
||||
*/
|
||||
public final TypedProperties getProperties() {
|
||||
try {
|
||||
TypedProperties properties = this.properties;
|
||||
if (properties == null) {
|
||||
properties = getOrInitializeTypedProperties();
|
||||
}
|
||||
return properties;
|
||||
} catch (Throwable e) {
|
||||
throw onCheckPropertiesError(e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized TypedProperties getOrInitializeTypedProperties() {
|
||||
try {
|
||||
TypedProperties properties = this.properties;
|
||||
if (properties == null) {
|
||||
properties = new TypedProperties();
|
||||
properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
|
||||
if (buffer != null && propertiesLocation >= 0) {
|
||||
final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
|
||||
properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
|
||||
|
@ -587,6 +584,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
this.properties = properties;
|
||||
}
|
||||
return properties;
|
||||
} catch (Throwable e) {
|
||||
throw onCheckPropertiesError(e);
|
||||
}
|
||||
}
|
||||
|
||||
private RuntimeException onCheckPropertiesError(Throwable e) {
|
||||
|
@ -679,7 +679,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
properties = null;
|
||||
propertiesLocation = buffer.readerIndex();
|
||||
} else {
|
||||
properties = new TypedProperties();
|
||||
properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
|
||||
properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -881,7 +881,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
return RoutingStatus.DUPLICATED_ID;
|
||||
}
|
||||
|
||||
message.cleanupInternalProperties();
|
||||
message.clearInternalProperties();
|
||||
|
||||
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
|
||||
|
||||
|
|
Loading…
Reference in New Issue