This commit is contained in:
Clebert Suconic 2018-09-26 14:48:45 -04:00
commit 10ecb358cb
7 changed files with 256 additions and 99 deletions

View File

@ -16,15 +16,17 @@
*/
package org.apache.activemq.artemis.utils.collections;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@ -57,7 +59,7 @@ public class TypedProperties {
private Map<SimpleString, PropertyValue> properties;
private volatile int size;
private int size;
private boolean internalProperties;
@ -67,11 +69,11 @@ public class TypedProperties {
/**
* Return the number of properties
* */
public int size() {
return properties.size();
public synchronized int size() {
return properties == null ? 0 : properties.size();
}
public int getMemoryOffset() {
public synchronized int getMemoryOffset() {
// The estimate is basically the encode size + 2 object references for each entry in the map
// Note we don't include the attributes or anything else since they already included in the memory estimate
// of the ServerMessage
@ -86,75 +88,60 @@ public class TypedProperties {
}
}
public boolean hasInternalProperties() {
public synchronized boolean hasInternalProperties() {
return internalProperties;
}
public void putBooleanProperty(final SimpleString key, final boolean value) {
checkCreateProperties();
doPutValue(key, BooleanValue.of(value));
}
public void putByteProperty(final SimpleString key, final byte value) {
checkCreateProperties();
doPutValue(key, ByteValue.valueOf(value));
}
public void putBytesProperty(final SimpleString key, final byte[] value) {
checkCreateProperties();
doPutValue(key, value == null ? NullValue.INSTANCE : new BytesValue(value));
}
public void putShortProperty(final SimpleString key, final short value) {
checkCreateProperties();
doPutValue(key, new ShortValue(value));
}
public void putIntProperty(final SimpleString key, final int value) {
checkCreateProperties();
doPutValue(key, new IntValue(value));
}
public void putLongProperty(final SimpleString key, final long value) {
checkCreateProperties();
doPutValue(key, new LongValue(value));
}
public void putFloatProperty(final SimpleString key, final float value) {
checkCreateProperties();
doPutValue(key, new FloatValue(value));
}
public void putDoubleProperty(final SimpleString key, final double value) {
checkCreateProperties();
doPutValue(key, new DoubleValue(value));
}
public void putSimpleStringProperty(final SimpleString key, final SimpleString value) {
checkCreateProperties();
doPutValue(key, value == null ? NullValue.INSTANCE : new StringValue(value));
}
public void putNullValue(final SimpleString key) {
checkCreateProperties();
doPutValue(key, NullValue.INSTANCE);
}
public void putCharProperty(final SimpleString key, final char value) {
checkCreateProperties();
doPutValue(key, new CharValue(value));
}
public void putTypedProperties(final TypedProperties otherProps) {
if (otherProps == null || otherProps.properties == null) {
if (otherProps == null || otherProps == this || otherProps.properties == null) {
return;
}
checkCreateProperties();
Set<Entry<SimpleString, PropertyValue>> otherEntries = otherProps.properties.entrySet();
for (Entry<SimpleString, PropertyValue> otherEntry : otherEntries) {
doPutValue(otherEntry.getKey(), otherEntry.getValue());
}
otherProps.forEachInternal(this::doPutValue);
}
public Object getProperty(final SimpleString key) {
@ -315,29 +302,46 @@ public class TypedProperties {
return doRemoveProperty(key);
}
public boolean containsProperty(final SimpleString key) {
if (size == 0) {
public synchronized boolean containsProperty(final SimpleString key) {
if (properties == null) {
return false;
} else {
return properties.containsKey(key);
}
}
public Set<SimpleString> getPropertyNames() {
if (size == 0) {
public synchronized Set<SimpleString> getPropertyNames() {
if (properties == null) {
return Collections.emptySet();
} else {
return properties.keySet();
return new HashSet<>(properties.keySet());
}
}
public synchronized void forEachKey(Consumer<SimpleString> action) {
if (properties != null) {
properties.keySet().forEach(action::accept);
}
}
public synchronized void forEach(BiConsumer<SimpleString, Object> action) {
if (properties != null) {
properties.forEach((k, v) -> action.accept(k, v.getValue()));
}
}
private synchronized void forEachInternal(BiConsumer<SimpleString, PropertyValue> action) {
if (properties != null) {
properties.forEach(action::accept);
}
}
public synchronized void decode(final ByteBuf buffer,
final TypedPropertiesDecoderPools keyValuePools) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
properties = null;
size = 0;
} else {
int numHeaders = buffer.readInt();
@ -416,12 +420,13 @@ public class TypedProperties {
}
}
public synchronized void decode(final ByteBuf buffer) {
public void decode(final ByteBuf buffer) {
decode(buffer, null);
}
public synchronized void encode(final ByteBuf buffer) {
if (properties == null) {
if (properties == null || size == 0) {
buffer.writeByte(DataConstants.NULL);
} else {
buffer.writeByte(DataConstants.NOT_NULL);
@ -438,26 +443,26 @@ public class TypedProperties {
}
}
public int getEncodeSize() {
if (properties == null) {
public synchronized int getEncodeSize() {
if (properties == null || size == 0) {
return DataConstants.SIZE_BYTE;
} else {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
}
}
public void clear() {
public synchronized void clear() {
if (properties != null) {
properties.clear();
}
size = 0;
}
@Override
public String toString() {
public synchronized String toString() {
StringBuilder sb = new StringBuilder("TypedProperties[");
if (properties != null) {
Iterator<Entry<SimpleString, PropertyValue>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
@ -505,17 +510,15 @@ public class TypedProperties {
// Private ------------------------------------------------------------------------------------
private void checkCreateProperties() {
if (properties == null) {
properties = new HashMap<>();
}
}
private synchronized void doPutValue(final SimpleString key, final PropertyValue value) {
if (key.startsWith(AMQ_PROPNAME)) {
internalProperties = true;
}
if (properties == null) {
properties = new HashMap<>();
}
PropertyValue oldValue = properties.put(key, value);
if (oldValue != null) {
size += value.encodeSize() - oldValue.encodeSize();
@ -530,23 +533,20 @@ public class TypedProperties {
}
PropertyValue val = properties.remove(key);
if (val == null) {
return null;
} else {
size -= SimpleString.sizeofString(key) + val.encodeSize();
return val.getValue();
}
}
private synchronized Object doGetProperty(final Object key) {
if (size == 0) {
if (properties == null) {
return null;
}
PropertyValue val = properties.get(key);
if (val == null) {
return null;
} else {
@ -1003,21 +1003,41 @@ public class TypedProperties {
}
}
public boolean isEmpty() {
return properties.isEmpty();
public synchronized boolean isEmpty() {
if (properties == null) {
return true;
} else {
return properties.isEmpty();
}
}
public Map<String, Object> getMap() {
Map<String, Object> m = new HashMap<>();
for (Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
Object val = entry.getValue().getValue();
if (val instanceof SimpleString) {
m.put(entry.getKey().toString(), ((SimpleString) val).toString());
} else {
m.put(entry.getKey().toString(), val);
public synchronized Set<String> getMapNames() {
if (properties == null) {
return Collections.emptySet();
} else {
Set<String> names = new HashSet<>(properties.size());
for (SimpleString name : properties.keySet()) {
names.add(name.toString());
}
return names;
}
}
public synchronized Map<String, Object> getMap() {
if (properties == null) {
return Collections.emptyMap();
} else {
Map<String, Object> m = new HashMap<>(properties.size());
for (Entry<SimpleString, PropertyValue> entry : properties.entrySet()) {
Object val = entry.getValue().getValue();
if (val instanceof SimpleString) {
m.put(entry.getKey().toString(), ((SimpleString) val).toString());
} else {
m.put(entry.getKey().toString(), val);
}
}
return m;
}
return m;
}
/**

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.ConcurrentModificationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class TypedPropertiesConcurrencyTest {
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
private SimpleString key = SimpleString.toSimpleString("key");
@Test
public void testClearAndToString() throws Exception {
TypedProperties props = new TypedProperties();
ExecutorService executorService = Executors.newFixedThreadPool(1000);
AtomicBoolean hasError = new AtomicBoolean();
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10000; i++) {
int g = i;
executorService.submit(() -> {
try {
countDownLatch.await();
for (int h = 0; h < 100; h++) {
props.putSimpleStringProperty(SimpleString.toSimpleString("S" + h), SimpleString.toSimpleString("hello"));
}
props.clear();
} catch (ConcurrentModificationException t) {
hasError.set(true);
t.printStackTrace();
} catch (InterruptedException e) {
}
});
}
for (int i = 0; i < 10; i++) {
executorService.submit( () -> {
try {
countDownLatch.await();
for (int k = 0; k < 1000; k++) {
props.toString();
}
} catch (ConcurrentModificationException t) {
hasError.set(true);
t.printStackTrace();
} catch (InterruptedException e) {
}
});
}
countDownLatch.countDown();
Thread.sleep(1000);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdown();
Assert.assertFalse(hasError.get());
}
@Test
public void testGetPropertyNamesClearAndToString() throws Exception {
TypedProperties props = new TypedProperties();
ExecutorService executorService = Executors.newFixedThreadPool(1000);
AtomicBoolean hasError = new AtomicBoolean();
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10000; i++) {
int g = i;
executorService.submit(() -> {
try {
countDownLatch.await();
for (int h = 0; h < 100; h++) {
props.putSimpleStringProperty(SimpleString.toSimpleString("S" + h), SimpleString.toSimpleString("hello"));
}
props.getPropertyNames().clear();
} catch (UnsupportedOperationException uoe) {
//Catch this as this would be acceptable, as the set is meant to be like an enumeration so a user should not modify and should expect an implementation to protect itself..
} catch (ConcurrentModificationException t) {
hasError.set(true);
t.printStackTrace();
} catch (InterruptedException e) {
}
});
}
for (int i = 0; i < 10; i++) {
executorService.submit( () -> {
try {
countDownLatch.await();
for (int k = 0; k < 1000; k++) {
props.toString();
}
} catch (ConcurrentModificationException t) {
hasError.set(true);
t.printStackTrace();
} catch (InterruptedException e) {
}
});
}
countDownLatch.countDown();
Thread.sleep(1000);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdown();
Assert.assertFalse(hasError.get());
}
@Test
public void testEncodedSizeAfterClearIsSameAsNewTypedProperties() throws Exception {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(SimpleString.toSimpleString("helllllloooooo"), SimpleString.toSimpleString("raaaaaaaaaaaaaaaaaaaaaaaa"));
props.clear();
assertEquals(new TypedProperties().getEncodeSize(), props.getEncodeSize());
}
@Test
public void testMemoryOffsetAfterClearIsSameAsNewTypedProperties() throws Exception {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(SimpleString.toSimpleString("helllllloooooo"), SimpleString.toSimpleString("raaaaaaaaaaaaaaaaaaaaaaaa"));
props.clear();
assertEquals(new TypedProperties().getMemoryOffset(), props.getMemoryOffset());
}
}

View File

@ -31,7 +31,6 @@ import javax.jms.TextMessage;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -110,9 +109,13 @@ public final class ActiveMQJMSProducer implements JMSProducer {
* @throws JMSException
*/
private void setProperties(Message message) throws JMSException {
for (SimpleString name : properties.getPropertyNames()) {
message.setObjectProperty(name.toString(), properties.getProperty(name));
}
properties.forEach((k, v) -> {
try {
message.setObjectProperty(k.toString(), v);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
});
}
@Override
@ -511,13 +514,7 @@ public final class ActiveMQJMSProducer implements JMSProducer {
@Override
public Set<String> getPropertyNames() {
try {
Set<SimpleString> simplePropNames = properties.getPropertyNames();
Set<String> propNames = new HashSet<>(simplePropNames.size());
for (SimpleString str : simplePropNames) {
propNames.add(str.toString());
}
return propNames;
return properties.getMapNames();
} catch (ActiveMQPropertyConversionException ce) {
throw new MessageFormatRuntimeException(ce.getMessage());
} catch (RuntimeException e) {

View File

@ -21,8 +21,6 @@ import javax.jms.MapMessage;
import javax.jms.MessageFormatException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
@ -301,14 +299,7 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
@Override
public Enumeration getMapNames() throws JMSException {
Set<SimpleString> simplePropNames = map.getPropertyNames();
Set<String> propNames = new HashSet<>(simplePropNames.size());
for (SimpleString str : simplePropNames) {
propNames.add(str.toString());
}
return Collections.enumeration(propNames);
return Collections.enumeration(map.getMapNames());
}
@Override

View File

@ -60,7 +60,6 @@ import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
@ -298,12 +297,11 @@ public class AmqpCoreConverter {
private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) {
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
continue;
properties.forEach((k, v) -> {
if (!k.equals(AMQPMessage.ADDRESS_PROPERTY)) {
jms.getInnerMessage().putObjectProperty(k, v);
}
jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
}
});
}
return jms;

View File

@ -21,8 +21,6 @@ import javax.jms.MapMessage;
import javax.jms.MessageFormatException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -251,14 +249,7 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe
@Override
public Enumeration getMapNames() throws JMSException {
Set<SimpleString> simplePropNames = map.getPropertyNames();
Set<String> propNames = new HashSet<>(simplePropNames.size());
for (SimpleString str : simplePropNames) {
propNames.add(str.toString());
}
return Collections.enumeration(propNames);
return Collections.enumeration(map.getMapNames());
}
@Override

View File

@ -679,9 +679,7 @@ public class ManagementServiceImpl implements ManagementService {
if (notification.getProperties() != null) {
TypedProperties props = notification.getProperties();
for (SimpleString name : notification.getProperties().getPropertyNames()) {
notificationMessage.putObjectProperty(name, props.getProperty(name));
}
props.forEach(notificationMessage::putObjectProperty);
}
notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));