From 8e40b2d4f4f242271d3dfcda4f9b96d3f94cee1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Tue, 25 Sep 2018 22:57:12 +0100 Subject: [PATCH] ARTEMIS-2095 - Typed Properties ThreadSafety Add Concurrency Test to expose concurrency errors seen in logs. Add Fix to ensure TypedProperties to ensure threadsafety Add forEach and forEachKey to allow for provide a thread safe way of iterating through keys and values, without needing to duplicate the collection. Add getMapNames method to remove code duplication and to ensure thread safe --- .../utils/collections/TypedProperties.java | 138 ++++++++------- .../utils/TypedPropertiesConcurrencyTest.java | 162 ++++++++++++++++++ .../jms/client/ActiveMQJMSProducer.java | 19 +- .../jms/client/ActiveMQMapMessage.java | 11 +- .../amqp/converter/AmqpCoreConverter.java | 10 +- .../converter/jms/ServerJMSMapMessage.java | 11 +- .../impl/ManagementServiceImpl.java | 4 +- 7 files changed, 256 insertions(+), 99 deletions(-) create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesConcurrencyTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index aa2d551834..4cf4805824 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -16,15 +16,17 @@ */ package org.apache.activemq.artemis.utils.collections; +import io.netty.buffer.ByteBuf; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; - -import io.netty.buffer.ByteBuf; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; @@ -57,7 +59,7 @@ public class TypedProperties { private Map properties; - private volatile int size; + private int size; private boolean internalProperties; @@ -67,11 +69,11 @@ public class TypedProperties { /** * Return the number of properties * */ - public int size() { - return properties.size(); + public synchronized int size() { + return properties == null ? 0 : properties.size(); } - public int getMemoryOffset() { + public synchronized int getMemoryOffset() { // The estimate is basically the encode size + 2 object references for each entry in the map // Note we don't include the attributes or anything else since they already included in the memory estimate // of the ServerMessage @@ -86,75 +88,60 @@ public class TypedProperties { } } - public boolean hasInternalProperties() { + public synchronized boolean hasInternalProperties() { return internalProperties; } public void putBooleanProperty(final SimpleString key, final boolean value) { - checkCreateProperties(); doPutValue(key, BooleanValue.of(value)); } public void putByteProperty(final SimpleString key, final byte value) { - checkCreateProperties(); doPutValue(key, ByteValue.valueOf(value)); } public void putBytesProperty(final SimpleString key, final byte[] value) { - checkCreateProperties(); doPutValue(key, value == null ? NullValue.INSTANCE : new BytesValue(value)); } public void putShortProperty(final SimpleString key, final short value) { - checkCreateProperties(); doPutValue(key, new ShortValue(value)); } public void putIntProperty(final SimpleString key, final int value) { - checkCreateProperties(); doPutValue(key, new IntValue(value)); } public void putLongProperty(final SimpleString key, final long value) { - checkCreateProperties(); doPutValue(key, new LongValue(value)); } public void putFloatProperty(final SimpleString key, final float value) { - checkCreateProperties(); doPutValue(key, new FloatValue(value)); } public void putDoubleProperty(final SimpleString key, final double value) { - checkCreateProperties(); doPutValue(key, new DoubleValue(value)); } public void putSimpleStringProperty(final SimpleString key, final SimpleString value) { - checkCreateProperties(); doPutValue(key, value == null ? NullValue.INSTANCE : new StringValue(value)); } public void putNullValue(final SimpleString key) { - checkCreateProperties(); doPutValue(key, NullValue.INSTANCE); } public void putCharProperty(final SimpleString key, final char value) { - checkCreateProperties(); doPutValue(key, new CharValue(value)); } public void putTypedProperties(final TypedProperties otherProps) { - if (otherProps == null || otherProps.properties == null) { + if (otherProps == null || otherProps == this || otherProps.properties == null) { return; } - checkCreateProperties(); - Set> otherEntries = otherProps.properties.entrySet(); - for (Entry otherEntry : otherEntries) { - doPutValue(otherEntry.getKey(), otherEntry.getValue()); - } + otherProps.forEachInternal(this::doPutValue); } public Object getProperty(final SimpleString key) { @@ -315,29 +302,46 @@ public class TypedProperties { return doRemoveProperty(key); } - public boolean containsProperty(final SimpleString key) { - if (size == 0) { + public synchronized boolean containsProperty(final SimpleString key) { + if (properties == null) { return false; } else { return properties.containsKey(key); } } - - public Set getPropertyNames() { - if (size == 0) { + public synchronized Set getPropertyNames() { + if (properties == null) { return Collections.emptySet(); } else { - return properties.keySet(); + return new HashSet<>(properties.keySet()); + } + } + + public synchronized void forEachKey(Consumer action) { + if (properties != null) { + properties.keySet().forEach(action::accept); + } + } + + public synchronized void forEach(BiConsumer action) { + if (properties != null) { + properties.forEach((k, v) -> action.accept(k, v.getValue())); + } + } + + private synchronized void forEachInternal(BiConsumer action) { + if (properties != null) { + properties.forEach(action::accept); } } public synchronized void decode(final ByteBuf buffer, final TypedPropertiesDecoderPools keyValuePools) { byte b = buffer.readByte(); - if (b == DataConstants.NULL) { properties = null; + size = 0; } else { int numHeaders = buffer.readInt(); @@ -416,12 +420,13 @@ public class TypedProperties { } } - public synchronized void decode(final ByteBuf buffer) { + public void decode(final ByteBuf buffer) { decode(buffer, null); } + public synchronized void encode(final ByteBuf buffer) { - if (properties == null) { + if (properties == null || size == 0) { buffer.writeByte(DataConstants.NULL); } else { buffer.writeByte(DataConstants.NOT_NULL); @@ -438,26 +443,26 @@ public class TypedProperties { } } - public int getEncodeSize() { - if (properties == null) { + public synchronized int getEncodeSize() { + if (properties == null || size == 0) { return DataConstants.SIZE_BYTE; } else { return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size; } } - public void clear() { + public synchronized void clear() { if (properties != null) { properties.clear(); } + size = 0; } @Override - public String toString() { + public synchronized String toString() { StringBuilder sb = new StringBuilder("TypedProperties["); if (properties != null) { - Iterator> iter = properties.entrySet().iterator(); while (iter.hasNext()) { @@ -505,17 +510,15 @@ public class TypedProperties { // Private ------------------------------------------------------------------------------------ - private void checkCreateProperties() { - if (properties == null) { - properties = new HashMap<>(); - } - } - private synchronized void doPutValue(final SimpleString key, final PropertyValue value) { if (key.startsWith(AMQ_PROPNAME)) { internalProperties = true; } + if (properties == null) { + properties = new HashMap<>(); + } + PropertyValue oldValue = properties.put(key, value); if (oldValue != null) { size += value.encodeSize() - oldValue.encodeSize(); @@ -530,23 +533,20 @@ public class TypedProperties { } PropertyValue val = properties.remove(key); - if (val == null) { return null; } else { size -= SimpleString.sizeofString(key) + val.encodeSize(); - return val.getValue(); } } private synchronized Object doGetProperty(final Object key) { - if (size == 0) { + if (properties == null) { return null; } PropertyValue val = properties.get(key); - if (val == null) { return null; } else { @@ -1003,21 +1003,41 @@ public class TypedProperties { } } - public boolean isEmpty() { - return properties.isEmpty(); + public synchronized boolean isEmpty() { + if (properties == null) { + return true; + } else { + return properties.isEmpty(); + } } - public Map getMap() { - Map m = new HashMap<>(); - for (Entry entry : properties.entrySet()) { - Object val = entry.getValue().getValue(); - if (val instanceof SimpleString) { - m.put(entry.getKey().toString(), ((SimpleString) val).toString()); - } else { - m.put(entry.getKey().toString(), val); + public synchronized Set getMapNames() { + if (properties == null) { + return Collections.emptySet(); + } else { + Set names = new HashSet<>(properties.size()); + for (SimpleString name : properties.keySet()) { + names.add(name.toString()); } + return names; + } + } + + public synchronized Map getMap() { + if (properties == null) { + return Collections.emptyMap(); + } else { + Map m = new HashMap<>(properties.size()); + for (Entry entry : properties.entrySet()) { + Object val = entry.getValue().getValue(); + if (val instanceof SimpleString) { + m.put(entry.getKey().toString(), ((SimpleString) val).toString()); + } else { + m.put(entry.getKey().toString(), val); + } + } + return m; } - return m; } /** diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesConcurrencyTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesConcurrencyTest.java new file mode 100644 index 0000000000..9e9e86ac71 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesConcurrencyTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + + +import java.util.ConcurrentModificationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.utils.collections.TypedProperties; +import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +public class TypedPropertiesConcurrencyTest { + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + + private SimpleString key = SimpleString.toSimpleString("key"); + + @Test + public void testClearAndToString() throws Exception { + TypedProperties props = new TypedProperties(); + + ExecutorService executorService = Executors.newFixedThreadPool(1000); + + AtomicBoolean hasError = new AtomicBoolean(); + CountDownLatch countDownLatch = new CountDownLatch(1); + for (int i = 0; i < 10000; i++) { + int g = i; + executorService.submit(() -> { + try { + countDownLatch.await(); + for (int h = 0; h < 100; h++) { + props.putSimpleStringProperty(SimpleString.toSimpleString("S" + h), SimpleString.toSimpleString("hello")); + } + props.clear(); + } catch (ConcurrentModificationException t) { + hasError.set(true); + t.printStackTrace(); + } catch (InterruptedException e) { + } + }); + } + for (int i = 0; i < 10; i++) { + executorService.submit( () -> { + try { + countDownLatch.await(); + for (int k = 0; k < 1000; k++) { + props.toString(); + } + } catch (ConcurrentModificationException t) { + hasError.set(true); + t.printStackTrace(); + } catch (InterruptedException e) { + } + + }); + } + + countDownLatch.countDown(); + Thread.sleep(1000); + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + executorService.shutdown(); + Assert.assertFalse(hasError.get()); + } + + + @Test + public void testGetPropertyNamesClearAndToString() throws Exception { + TypedProperties props = new TypedProperties(); + + ExecutorService executorService = Executors.newFixedThreadPool(1000); + + AtomicBoolean hasError = new AtomicBoolean(); + CountDownLatch countDownLatch = new CountDownLatch(1); + for (int i = 0; i < 10000; i++) { + int g = i; + executorService.submit(() -> { + try { + countDownLatch.await(); + for (int h = 0; h < 100; h++) { + props.putSimpleStringProperty(SimpleString.toSimpleString("S" + h), SimpleString.toSimpleString("hello")); + } + props.getPropertyNames().clear(); + } catch (UnsupportedOperationException uoe) { + //Catch this as this would be acceptable, as the set is meant to be like an enumeration so a user should not modify and should expect an implementation to protect itself.. + } catch (ConcurrentModificationException t) { + hasError.set(true); + t.printStackTrace(); + } catch (InterruptedException e) { + } + }); + } + for (int i = 0; i < 10; i++) { + executorService.submit( () -> { + try { + countDownLatch.await(); + for (int k = 0; k < 1000; k++) { + props.toString(); + } + } catch (ConcurrentModificationException t) { + hasError.set(true); + t.printStackTrace(); + } catch (InterruptedException e) { + } + + }); + } + + countDownLatch.countDown(); + Thread.sleep(1000); + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + executorService.shutdown(); + Assert.assertFalse(hasError.get()); + } + + + @Test + public void testEncodedSizeAfterClearIsSameAsNewTypedProperties() throws Exception { + TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(SimpleString.toSimpleString("helllllloooooo"), SimpleString.toSimpleString("raaaaaaaaaaaaaaaaaaaaaaaa")); + + props.clear(); + + assertEquals(new TypedProperties().getEncodeSize(), props.getEncodeSize()); + + } + + @Test + public void testMemoryOffsetAfterClearIsSameAsNewTypedProperties() throws Exception { + TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(SimpleString.toSimpleString("helllllloooooo"), SimpleString.toSimpleString("raaaaaaaaaaaaaaaaaaaaaaaa")); + + props.clear(); + + assertEquals(new TypedProperties().getMemoryOffset(), props.getMemoryOffset()); + + } +} diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSProducer.java index 965eefd361..e97a3c2102 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSProducer.java @@ -31,7 +31,6 @@ import javax.jms.TextMessage; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -110,9 +109,13 @@ public final class ActiveMQJMSProducer implements JMSProducer { * @throws JMSException */ private void setProperties(Message message) throws JMSException { - for (SimpleString name : properties.getPropertyNames()) { - message.setObjectProperty(name.toString(), properties.getProperty(name)); - } + properties.forEach((k, v) -> { + try { + message.setObjectProperty(k.toString(), v); + } catch (JMSException e) { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + }); } @Override @@ -511,13 +514,7 @@ public final class ActiveMQJMSProducer implements JMSProducer { @Override public Set getPropertyNames() { try { - Set simplePropNames = properties.getPropertyNames(); - Set propNames = new HashSet<>(simplePropNames.size()); - - for (SimpleString str : simplePropNames) { - propNames.add(str.toString()); - } - return propNames; + return properties.getMapNames(); } catch (ActiveMQPropertyConversionException ce) { throw new MessageFormatRuntimeException(ce.getMessage()); } catch (RuntimeException e) { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java index 557b0b8256..e0249bff79 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMapMessage.java @@ -21,8 +21,6 @@ import javax.jms.MapMessage; import javax.jms.MessageFormatException; import java.util.Collections; import java.util.Enumeration; -import java.util.HashSet; -import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; @@ -301,14 +299,7 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage { @Override public Enumeration getMapNames() throws JMSException { - Set simplePropNames = map.getPropertyNames(); - Set propNames = new HashSet<>(simplePropNames.size()); - - for (SimpleString str : simplePropNames) { - propNames.add(str.toString()); - } - - return Collections.enumeration(propNames); + return Collections.enumeration(map.getMapNames()); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index e14768764e..1fcd9ab79c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -60,7 +60,6 @@ import javax.jms.DeliveryMode; import javax.jms.JMSException; import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; @@ -298,12 +297,11 @@ public class AmqpCoreConverter { private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) { if (properties != null) { - for (SimpleString str : properties.getPropertyNames()) { - if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) { - continue; + properties.forEach((k, v) -> { + if (!k.equals(AMQPMessage.ADDRESS_PROPERTY)) { + jms.getInnerMessage().putObjectProperty(k, v); } - jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str)); - } + }); } return jms; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java index 588b10e292..9ee86af41a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java @@ -21,8 +21,6 @@ import javax.jms.MapMessage; import javax.jms.MessageFormatException; import java.util.Collections; import java.util.Enumeration; -import java.util.HashSet; -import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -251,14 +249,7 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe @Override public Enumeration getMapNames() throws JMSException { - Set simplePropNames = map.getPropertyNames(); - Set propNames = new HashSet<>(simplePropNames.size()); - - for (SimpleString str : simplePropNames) { - propNames.add(str.toString()); - } - - return Collections.enumeration(propNames); + return Collections.enumeration(map.getMapNames()); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index ad888d0077..ecb8d5522a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -679,9 +679,7 @@ public class ManagementServiceImpl implements ManagementService { if (notification.getProperties() != null) { TypedProperties props = notification.getProperties(); - for (SimpleString name : notification.getProperties().getPropertyNames()) { - notificationMessage.putObjectProperty(name, props.getProperty(name)); - } + props.forEach(notificationMessage::putObjectProperty); } notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));