ARTEMIS-2095 - Typed Properties ThreadSafety

Add Concurrency Test to expose concurrency errors seen in logs.
Add Fix to ensure TypedProperties to ensure threadsafety
Add forEach and forEachKey to allow for provide a thread safe way of iterating through keys and values, without needing to duplicate the collection.
Add getMapNames method to remove code duplication and to ensure thread safe
This commit is contained in:
Michael André Pearce 2018-09-25 22:57:12 +01:00
parent a5b19ad1e7
commit 8e40b2d4f4
7 changed files with 256 additions and 99 deletions

View File

@ -16,15 +16,17 @@
*/ */
package org.apache.activemq.artemis.utils.collections; package org.apache.activemq.artemis.utils.collections;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.function.BiConsumer;
import io.netty.buffer.ByteBuf; import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@ -57,7 +59,7 @@ public class TypedProperties {
private Map<SimpleString, PropertyValue> properties; private Map<SimpleString, PropertyValue> properties;
private volatile int size; private int size;
private boolean internalProperties; private boolean internalProperties;
@ -67,11 +69,11 @@ public class TypedProperties {
/** /**
* Return the number of properties * Return the number of properties
* */ * */
public int size() { public synchronized int size() {
return properties.size(); return properties == null ? 0 : properties.size();
} }
public int getMemoryOffset() { public synchronized int getMemoryOffset() {
// The estimate is basically the encode size + 2 object references for each entry in the map // The estimate is basically the encode size + 2 object references for each entry in the map
// Note we don't include the attributes or anything else since they already included in the memory estimate // Note we don't include the attributes or anything else since they already included in the memory estimate
// of the ServerMessage // of the ServerMessage
@ -86,75 +88,60 @@ public class TypedProperties {
} }
} }
public boolean hasInternalProperties() { public synchronized boolean hasInternalProperties() {
return internalProperties; return internalProperties;
} }
public void putBooleanProperty(final SimpleString key, final boolean value) { public void putBooleanProperty(final SimpleString key, final boolean value) {
checkCreateProperties();
doPutValue(key, BooleanValue.of(value)); doPutValue(key, BooleanValue.of(value));
} }
public void putByteProperty(final SimpleString key, final byte value) { public void putByteProperty(final SimpleString key, final byte value) {
checkCreateProperties();
doPutValue(key, ByteValue.valueOf(value)); doPutValue(key, ByteValue.valueOf(value));
} }
public void putBytesProperty(final SimpleString key, final byte[] value) { public void putBytesProperty(final SimpleString key, final byte[] value) {
checkCreateProperties();
doPutValue(key, value == null ? NullValue.INSTANCE : new BytesValue(value)); doPutValue(key, value == null ? NullValue.INSTANCE : new BytesValue(value));
} }
public void putShortProperty(final SimpleString key, final short value) { public void putShortProperty(final SimpleString key, final short value) {
checkCreateProperties();
doPutValue(key, new ShortValue(value)); doPutValue(key, new ShortValue(value));
} }
public void putIntProperty(final SimpleString key, final int value) { public void putIntProperty(final SimpleString key, final int value) {
checkCreateProperties();
doPutValue(key, new IntValue(value)); doPutValue(key, new IntValue(value));
} }
public void putLongProperty(final SimpleString key, final long value) { public void putLongProperty(final SimpleString key, final long value) {
checkCreateProperties();
doPutValue(key, new LongValue(value)); doPutValue(key, new LongValue(value));
} }
public void putFloatProperty(final SimpleString key, final float value) { public void putFloatProperty(final SimpleString key, final float value) {
checkCreateProperties();
doPutValue(key, new FloatValue(value)); doPutValue(key, new FloatValue(value));
} }
public void putDoubleProperty(final SimpleString key, final double value) { public void putDoubleProperty(final SimpleString key, final double value) {
checkCreateProperties();
doPutValue(key, new DoubleValue(value)); doPutValue(key, new DoubleValue(value));
} }
public void putSimpleStringProperty(final SimpleString key, final SimpleString value) { public void putSimpleStringProperty(final SimpleString key, final SimpleString value) {
checkCreateProperties();
doPutValue(key, value == null ? NullValue.INSTANCE : new StringValue(value)); doPutValue(key, value == null ? NullValue.INSTANCE : new StringValue(value));
} }
public void putNullValue(final SimpleString key) { public void putNullValue(final SimpleString key) {
checkCreateProperties();
doPutValue(key, NullValue.INSTANCE); doPutValue(key, NullValue.INSTANCE);
} }
public void putCharProperty(final SimpleString key, final char value) { public void putCharProperty(final SimpleString key, final char value) {
checkCreateProperties();
doPutValue(key, new CharValue(value)); doPutValue(key, new CharValue(value));
} }
public void putTypedProperties(final TypedProperties otherProps) { public void putTypedProperties(final TypedProperties otherProps) {
if (otherProps == null || otherProps.properties == null) { if (otherProps == null || otherProps == this || otherProps.properties == null) {
return; return;
} }
checkCreateProperties(); otherProps.forEachInternal(this::doPutValue);
Set<Entry<SimpleString, PropertyValue>> otherEntries = otherProps.properties.entrySet();
for (Entry<SimpleString, PropertyValue> otherEntry : otherEntries) {
doPutValue(otherEntry.getKey(), otherEntry.getValue());
}
} }
public Object getProperty(final SimpleString key) { public Object getProperty(final SimpleString key) {
@ -315,29 +302,46 @@ public class TypedProperties {
return doRemoveProperty(key); return doRemoveProperty(key);
} }
public boolean containsProperty(final SimpleString key) { public synchronized boolean containsProperty(final SimpleString key) {
if (size == 0) { if (properties == null) {
return false; return false;
} else { } else {
return properties.containsKey(key); return properties.containsKey(key);
} }
} }
public synchronized Set<SimpleString> getPropertyNames() {
public Set<SimpleString> getPropertyNames() { if (properties == null) {
if (size == 0) {
return Collections.emptySet(); return Collections.emptySet();
} else { } else {
return properties.keySet(); return new HashSet<>(properties.keySet());
}
}
public synchronized void forEachKey(Consumer<SimpleString> action) {
if (properties != null) {
properties.keySet().forEach(action::accept);
}
}
public synchronized void forEach(BiConsumer<SimpleString, Object> action) {
if (properties != null) {
properties.forEach((k, v) -> action.accept(k, v.getValue()));
}
}
private synchronized void forEachInternal(BiConsumer<SimpleString, PropertyValue> action) {
if (properties != null) {
properties.forEach(action::accept);
} }
} }
public synchronized void decode(final ByteBuf buffer, public synchronized void decode(final ByteBuf buffer,
final TypedPropertiesDecoderPools keyValuePools) { final TypedPropertiesDecoderPools keyValuePools) {
byte b = buffer.readByte(); byte b = buffer.readByte();
if (b == DataConstants.NULL) { if (b == DataConstants.NULL) {
properties = null; properties = null;
size = 0;
} else { } else {
int numHeaders = buffer.readInt(); int numHeaders = buffer.readInt();
@ -416,12 +420,13 @@ public class TypedProperties {
} }
} }
public synchronized void decode(final ByteBuf buffer) { public void decode(final ByteBuf buffer) {
decode(buffer, null); decode(buffer, null);
} }
public synchronized void encode(final ByteBuf buffer) { public synchronized void encode(final ByteBuf buffer) {
if (properties == null) { if (properties == null || size == 0) {
buffer.writeByte(DataConstants.NULL); buffer.writeByte(DataConstants.NULL);
} else { } else {
buffer.writeByte(DataConstants.NOT_NULL); buffer.writeByte(DataConstants.NOT_NULL);
@ -438,26 +443,26 @@ public class TypedProperties {
} }
} }
public int getEncodeSize() { public synchronized int getEncodeSize() {
if (properties == null) { if (properties == null || size == 0) {
return DataConstants.SIZE_BYTE; return DataConstants.SIZE_BYTE;
} else { } else {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size; return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
} }
} }
public void clear() { public synchronized void clear() {
if (properties != null) { if (properties != null) {
properties.clear(); properties.clear();
} }
size = 0;
} }
@Override @Override
public String toString() { public synchronized String toString() {
StringBuilder sb = new StringBuilder("TypedProperties["); StringBuilder sb = new StringBuilder("TypedProperties[");
if (properties != null) { if (properties != null) {
Iterator<Entry<SimpleString, PropertyValue>> iter = properties.entrySet().iterator(); Iterator<Entry<SimpleString, PropertyValue>> iter = properties.entrySet().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
@ -505,17 +510,15 @@ public class TypedProperties {
// Private ------------------------------------------------------------------------------------ // Private ------------------------------------------------------------------------------------
private void checkCreateProperties() {
if (properties == null) {
properties = new HashMap<>();
}
}
private synchronized void doPutValue(final SimpleString key, final PropertyValue value) { private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
if (key.startsWith(AMQ_PROPNAME)) { if (key.startsWith(AMQ_PROPNAME)) {
internalProperties = true; internalProperties = true;
} }
if (properties == null) {
properties = new HashMap<>();
}
PropertyValue oldValue = properties.put(key, value); PropertyValue oldValue = properties.put(key, value);
if (oldValue != null) { if (oldValue != null) {
size += value.encodeSize() - oldValue.encodeSize(); size += value.encodeSize() - oldValue.encodeSize();
@ -530,23 +533,20 @@ public class TypedProperties {
} }
PropertyValue val = properties.remove(key); PropertyValue val = properties.remove(key);
if (val == null) { if (val == null) {
return null; return null;
} else { } else {
size -= SimpleString.sizeofString(key) + val.encodeSize(); size -= SimpleString.sizeofString(key) + val.encodeSize();
return val.getValue(); return val.getValue();
} }
} }
private synchronized Object doGetProperty(final Object key) { private synchronized Object doGetProperty(final Object key) {
if (size == 0) { if (properties == null) {
return null; return null;
} }
PropertyValue val = properties.get(key); PropertyValue val = properties.get(key);
if (val == null) { if (val == null) {
return null; return null;
} else { } else {
@ -1003,21 +1003,41 @@ public class TypedProperties {
} }
} }
public boolean isEmpty() { public synchronized boolean isEmpty() {
return properties.isEmpty(); if (properties == null) {
return true;
} else {
return properties.isEmpty();
}
} }
public Map<String, Object> getMap() { public synchronized Set<String> getMapNames() {
Map<String, Object> m = new HashMap<>(); if (properties == null) {
for (Entry<SimpleString, PropertyValue> entry : properties.entrySet()) { return Collections.emptySet();
Object val = entry.getValue().getValue(); } else {
if (val instanceof SimpleString) { Set<String> names = new HashSet<>(properties.size());
m.put(entry.getKey().toString(), ((SimpleString) val).toString()); for (SimpleString name : properties.keySet()) {
} else { names.add(name.toString());
m.put(entry.getKey().toString(), val);
} }
return names;
}
}
public synchronized Map<String, Object> getMap() {
if (properties == null) {
return Collections.emptyMap();
} else {
Map<String, Object> m = new HashMap<>(properties.size());
for (Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
Object val = entry.getValue().getValue();
if (val instanceof SimpleString) {
m.put(entry.getKey().toString(), ((SimpleString) val).toString());
} else {
m.put(entry.getKey().toString(), val);
}
}
return m;
} }
return m;
} }
/** /**

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.ConcurrentModificationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class TypedPropertiesConcurrencyTest {
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
private SimpleString key = SimpleString.toSimpleString("key");
@Test
public void testClearAndToString() throws Exception {
TypedProperties props = new TypedProperties();
ExecutorService executorService = Executors.newFixedThreadPool(1000);
AtomicBoolean hasError = new AtomicBoolean();
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10000; i++) {
int g = i;
executorService.submit(() -> {
try {
countDownLatch.await();
for (int h = 0; h < 100; h++) {
props.putSimpleStringProperty(SimpleString.toSimpleString("S" + h), SimpleString.toSimpleString("hello"));
}
props.clear();
} catch (ConcurrentModificationException t) {
hasError.set(true);
t.printStackTrace();
} catch (InterruptedException e) {
}
});
}
for (int i = 0; i < 10; i++) {
executorService.submit( () -> {
try {
countDownLatch.await();
for (int k = 0; k < 1000; k++) {
props.toString();
}
} catch (ConcurrentModificationException t) {
hasError.set(true);
t.printStackTrace();
} catch (InterruptedException e) {
}
});
}
countDownLatch.countDown();
Thread.sleep(1000);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdown();
Assert.assertFalse(hasError.get());
}
@Test
public void testGetPropertyNamesClearAndToString() throws Exception {
TypedProperties props = new TypedProperties();
ExecutorService executorService = Executors.newFixedThreadPool(1000);
AtomicBoolean hasError = new AtomicBoolean();
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10000; i++) {
int g = i;
executorService.submit(() -> {
try {
countDownLatch.await();
for (int h = 0; h < 100; h++) {
props.putSimpleStringProperty(SimpleString.toSimpleString("S" + h), SimpleString.toSimpleString("hello"));
}
props.getPropertyNames().clear();
} catch (UnsupportedOperationException uoe) {
//Catch this as this would be acceptable, as the set is meant to be like an enumeration so a user should not modify and should expect an implementation to protect itself..
} catch (ConcurrentModificationException t) {
hasError.set(true);
t.printStackTrace();
} catch (InterruptedException e) {
}
});
}
for (int i = 0; i < 10; i++) {
executorService.submit( () -> {
try {
countDownLatch.await();
for (int k = 0; k < 1000; k++) {
props.toString();
}
} catch (ConcurrentModificationException t) {
hasError.set(true);
t.printStackTrace();
} catch (InterruptedException e) {
}
});
}
countDownLatch.countDown();
Thread.sleep(1000);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdown();
Assert.assertFalse(hasError.get());
}
@Test
public void testEncodedSizeAfterClearIsSameAsNewTypedProperties() throws Exception {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(SimpleString.toSimpleString("helllllloooooo"), SimpleString.toSimpleString("raaaaaaaaaaaaaaaaaaaaaaaa"));
props.clear();
assertEquals(new TypedProperties().getEncodeSize(), props.getEncodeSize());
}
@Test
public void testMemoryOffsetAfterClearIsSameAsNewTypedProperties() throws Exception {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(SimpleString.toSimpleString("helllllloooooo"), SimpleString.toSimpleString("raaaaaaaaaaaaaaaaaaaaaaaa"));
props.clear();
assertEquals(new TypedProperties().getMemoryOffset(), props.getMemoryOffset());
}
}

View File

@ -31,7 +31,6 @@ import javax.jms.TextMessage;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -110,9 +109,13 @@ public final class ActiveMQJMSProducer implements JMSProducer {
* @throws JMSException * @throws JMSException
*/ */
private void setProperties(Message message) throws JMSException { private void setProperties(Message message) throws JMSException {
for (SimpleString name : properties.getPropertyNames()) { properties.forEach((k, v) -> {
message.setObjectProperty(name.toString(), properties.getProperty(name)); try {
} message.setObjectProperty(k.toString(), v);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
});
} }
@Override @Override
@ -511,13 +514,7 @@ public final class ActiveMQJMSProducer implements JMSProducer {
@Override @Override
public Set<String> getPropertyNames() { public Set<String> getPropertyNames() {
try { try {
Set<SimpleString> simplePropNames = properties.getPropertyNames(); return properties.getMapNames();
Set<String> propNames = new HashSet<>(simplePropNames.size());
for (SimpleString str : simplePropNames) {
propNames.add(str.toString());
}
return propNames;
} catch (ActiveMQPropertyConversionException ce) { } catch (ActiveMQPropertyConversionException ce) {
throw new MessageFormatRuntimeException(ce.getMessage()); throw new MessageFormatRuntimeException(ce.getMessage());
} catch (RuntimeException e) { } catch (RuntimeException e) {

View File

@ -21,8 +21,6 @@ import javax.jms.MapMessage;
import javax.jms.MessageFormatException; import javax.jms.MessageFormatException;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
@ -301,14 +299,7 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
@Override @Override
public Enumeration getMapNames() throws JMSException { public Enumeration getMapNames() throws JMSException {
Set<SimpleString> simplePropNames = map.getPropertyNames(); return Collections.enumeration(map.getMapNames());
Set<String> propNames = new HashSet<>(simplePropNames.size());
for (SimpleString str : simplePropNames) {
propNames.add(str.toString());
}
return Collections.enumeration(propNames);
} }
@Override @Override

View File

@ -60,7 +60,6 @@ import javax.jms.DeliveryMode;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
@ -298,12 +297,11 @@ public class AmqpCoreConverter {
private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) { private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) {
if (properties != null) { if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) { properties.forEach((k, v) -> {
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) { if (!k.equals(AMQPMessage.ADDRESS_PROPERTY)) {
continue; jms.getInnerMessage().putObjectProperty(k, v);
} }
jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str)); });
}
} }
return jms; return jms;

View File

@ -21,8 +21,6 @@ import javax.jms.MapMessage;
import javax.jms.MessageFormatException; import javax.jms.MessageFormatException;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -251,14 +249,7 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe
@Override @Override
public Enumeration getMapNames() throws JMSException { public Enumeration getMapNames() throws JMSException {
Set<SimpleString> simplePropNames = map.getPropertyNames(); return Collections.enumeration(map.getMapNames());
Set<String> propNames = new HashSet<>(simplePropNames.size());
for (SimpleString str : simplePropNames) {
propNames.add(str.toString());
}
return Collections.enumeration(propNames);
} }
@Override @Override

View File

@ -679,9 +679,7 @@ public class ManagementServiceImpl implements ManagementService {
if (notification.getProperties() != null) { if (notification.getProperties() != null) {
TypedProperties props = notification.getProperties(); TypedProperties props = notification.getProperties();
for (SimpleString name : notification.getProperties().getPropertyNames()) { props.forEach(notificationMessage::putObjectProperty);
notificationMessage.putObjectProperty(name, props.getProperty(name));
}
} }
notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString())); notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));