From 98028cdecc6bd31c86a7d6decfed1961e46be7b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Wed, 10 Jan 2018 08:48:14 +0000 Subject: [PATCH] ARTEMIS-1586 Refactor to make more generic * Move byte util code into ByteUtil * Re-use the new equals method in SimpleString * Apply same pools/interners to client decode * Create String to SimpleString pools/interners for property access via String keys (producer and consumer benefits) * Lazy init the pools on withing the get methods of CoreMessageObjectPools to get the specific pool, to avoid having this scattered every where. * reduce SimpleString creation in conversion to/from core message methods with JMS wrapper. * reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, MQTT. --- .../artemis/api/core/SimpleString.java | 237 ++++++++---------- ...Interner.java => AbstractByteBufPool.java} | 21 +- .../activemq/artemis/utils/AbstractPool.java | 89 +++++++ .../activemq/artemis/utils/ByteUtil.java | 122 +++++++++ .../utils/collections/TypedProperties.java | 146 +++++++---- .../activemq/artemis/api/core/Message.java | 6 + .../core/client/impl/ClientMessageImpl.java | 23 +- .../core/client/impl/ClientSessionImpl.java | 5 +- .../core/message/impl/CoreMessage.java | 107 ++++---- .../message/impl/CoreMessageObjectPools.java | 55 ++++ .../core/protocol/ClientPacketDecoder.java | 11 +- .../impl/ActiveMQClientProtocolManager.java | 2 +- .../activemq/artemis/reader/MessageUtil.java | 10 +- .../artemis/message/CoreMessageTest.java | 2 +- .../jms/client/ActiveMQDestination.java | 20 +- .../artemis/jms/client/ActiveMQMessage.java | 55 ++-- .../artemis/jms/client/ActiveMQQueue.java | 8 +- .../artemis/jms/client/ActiveMQSession.java | 6 +- .../jms/client/ActiveMQStreamMessage.java | 2 +- .../artemis/jms/client/ActiveMQTopic.java | 8 +- .../protocol/amqp/broker/AMQPMessage.java | 74 ++++-- .../amqp/broker/AMQPSessionCallback.java | 105 ++++---- .../amqp/converter/AMQPConverter.java | 5 +- .../amqp/converter/AMQPMessageSupport.java | 49 ++-- .../amqp/converter/AmqpCoreConverter.java | 37 +-- .../proton/ProtonServerReceiverContext.java | 16 +- .../proton/ProtonServerSenderContext.java | 54 ++-- .../JMSMappingOutboundTransformerTest.java | 4 +- .../core/protocol/mqtt/MQTTSession.java | 8 +- .../protocol/mqtt/MQTTSessionCallback.java | 2 +- .../artemis/core/protocol/mqtt/MQTTUtil.java | 19 +- .../protocol/openwire/OpenWireConnection.java | 2 +- .../openwire/OpenWireMessageConverter.java | 8 +- .../protocol/openwire/OpenwireMessage.java | 11 + .../protocol/openwire/amq/AMQConsumer.java | 2 +- .../protocol/openwire/amq/AMQSession.java | 11 +- .../core/protocol/stomp/StompSession.java | 2 +- .../ra/inflow/ActiveMQMessageHandler.java | 2 +- .../core/protocol/ServerPacketDecoder.java | 25 +- .../core/impl/CoreSessionCallback.java | 11 +- .../core/server/impl/ServerConsumerImpl.java | 2 +- .../spi/core/protocol/MessageConverter.java | 3 +- .../spi/core/protocol/SessionCallback.java | 2 +- .../impl/ScheduledDeliveryHandlerTest.java | 11 + .../integration/client/AcknowledgeTest.java | 11 + .../integration/client/HangConsumerTest.java | 2 +- .../persistence/XmlImportExportTest.java | 2 +- 47 files changed, 910 insertions(+), 505 deletions(-) rename artemis-commons/src/main/java/org/apache/activemq/artemis/utils/{AbstractInterner.java => AbstractByteBufPool.java} (91%) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java index e24e245f81..dbf74680b6 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java @@ -21,8 +21,9 @@ import java.util.ArrayList; import java.util.List; import io.netty.buffer.ByteBuf; -import io.netty.util.internal.PlatformDependent; -import org.apache.activemq.artemis.utils.AbstractInterner; +import org.apache.activemq.artemis.utils.AbstractByteBufPool; +import org.apache.activemq.artemis.utils.AbstractPool; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; /** @@ -33,129 +34,6 @@ import org.apache.activemq.artemis.utils.DataConstants; */ public final class SimpleString implements CharSequence, Serializable, Comparable { - public static final class Interner extends AbstractInterner { - - private final int maxLength; - - public Interner(final int capacity, final int maxCharsLength) { - super(capacity); - this.maxLength = maxCharsLength; - } - - @Override - protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) { - return SimpleString.isEqual(entry, byteBuf, offset, length); - } - - @Override - protected boolean canIntern(final ByteBuf byteBuf, final int length) { - assert length % 2 == 0 : "length must be a multiple of 2"; - final int expectedStringLength = length >> 1; - return expectedStringLength <= maxLength; - } - - @Override - protected SimpleString create(final ByteBuf byteBuf, final int length) { - return readSimpleString(byteBuf, length); - } - } - - /** - * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s}, - * {@code false} otherwise. - *

- * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the - * length field. - */ - public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) { - if (s == null) { - return false; - } - final byte[] chars = s.getData(); - if (chars.length != length) - return false; - if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { - if ((offset + length) > bytes.writerIndex()) { - throw new IndexOutOfBoundsException(); - } - if (bytes.hasArray()) { - return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length); - } else if (bytes.hasMemoryAddress()) { - return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length); - } - } - return byteBufIsEqual(chars, bytes, offset, length); - } - - private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) { - for (int i = 0; i < length; i++) - if (chars[i] != bytes.getByte(offset + i)) - return false; - return true; - } - - private static boolean batchOnHeapIsEqual(final byte[] chars, - final byte[] array, - final int arrayOffset, - final int length) { - final int longCount = length >>> 3; - final int bytesCount = length & 7; - int bytesIndex = arrayOffset; - int charsIndex = 0; - for (int i = 0; i < longCount; i++) { - final long charsLong = PlatformDependent.getLong(chars, charsIndex); - final long bytesLong = PlatformDependent.getLong(array, bytesIndex); - if (charsLong != bytesLong) { - return false; - - } - bytesIndex += 8; - charsIndex += 8; - } - for (int i = 0; i < bytesCount; i++) { - final byte charsByte = PlatformDependent.getByte(chars, charsIndex); - final byte bytesByte = PlatformDependent.getByte(array, bytesIndex); - if (charsByte != bytesByte) { - return false; - - } - bytesIndex++; - charsIndex++; - } - return true; - } - - private static boolean batchOffHeapIsEqual(final byte[] chars, - final long address, - final int offset, - final int length) { - final int longCount = length >>> 3; - final int bytesCount = length & 7; - long bytesAddress = address + offset; - int charsIndex = 0; - for (int i = 0; i < longCount; i++) { - final long charsLong = PlatformDependent.getLong(chars, charsIndex); - final long bytesLong = PlatformDependent.getLong(bytesAddress); - if (charsLong != bytesLong) { - return false; - - } - bytesAddress += 8; - charsIndex += 8; - } - for (int i = 0; i < bytesCount; i++) { - final byte charsByte = PlatformDependent.getByte(chars, charsIndex); - final byte bytesByte = PlatformDependent.getByte(bytesAddress); - if (charsByte != bytesByte) { - return false; - - } - bytesAddress++; - charsIndex++; - } - return true; - } - private static final long serialVersionUID = 4204223851422244307L; // Attributes @@ -185,6 +63,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl return new SimpleString(string); } + public static SimpleString toSimpleString(final String string, StringSimpleStringPool pool) { + if (pool == null) { + return toSimpleString(string); + } + return pool.getOrCreate(string); + } + // Constructors // ---------------------------------------------------------------------- @@ -236,6 +121,10 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl data[1] = high; } + public boolean isEmpty() { + return data.length == 0; + } + // CharSequence implementation // --------------------------------------------------------------------------- @@ -267,11 +156,26 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl return readSimpleString(buffer); } + public static SimpleString readNullableSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) { + int b = buffer.readByte(); + if (b == DataConstants.NULL) { + return null; + } + return readSimpleString(buffer, pool); + } + public static SimpleString readSimpleString(ByteBuf buffer) { int len = buffer.readInt(); return readSimpleString(buffer, len); } + public static SimpleString readSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) { + if (pool == null) { + return readSimpleString(buffer); + } + return pool.getOrCreate(buffer); + } + public static SimpleString readSimpleString(final ByteBuf buffer, final int length) { byte[] data = new byte[length]; buffer.readBytes(data); @@ -381,22 +285,23 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl if (other instanceof SimpleString) { SimpleString s = (SimpleString) other; - if (data.length != s.data.length) { - return false; - } - - for (int i = 0; i < data.length; i++) { - if (data[i] != s.data[i]) { - return false; - } - } - - return true; + return ByteUtil.equals(data, s.data); } else { return false; } } + /** + * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s}, + * {@code false} otherwise. + *

+ * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the + * length field. + */ + public boolean equals(final ByteBuf byteBuf, final int offset, final int length) { + return ByteUtil.equals(data, byteBuf, offset, length); + } + @Override public int hashCode() { if (hash == 0) { @@ -575,4 +480,64 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl dst[d++] = (char) (low | high); } } + + public static final class ByteBufSimpleStringPool extends AbstractByteBufPool { + + private static final int UUID_LENGTH = 36; + + private final int maxLength; + + public ByteBufSimpleStringPool() { + this.maxLength = UUID_LENGTH; + } + + public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) { + super(capacity); + this.maxLength = maxCharsLength; + } + + @Override + protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) { + if (entry == null) { + return false; + } + return entry.equals(byteBuf, offset, length); + } + + @Override + protected boolean canPool(final ByteBuf byteBuf, final int length) { + assert length % 2 == 0 : "length must be a multiple of 2"; + final int expectedStringLength = length >> 1; + return expectedStringLength <= maxLength; + } + + @Override + protected SimpleString create(final ByteBuf byteBuf, final int length) { + return readSimpleString(byteBuf, length); + } + } + + public static final class StringSimpleStringPool extends AbstractPool { + + public StringSimpleStringPool() { + super(); + } + + public StringSimpleStringPool(final int capacity) { + super(capacity); + } + + @Override + protected SimpleString create(String value) { + return toSimpleString(value); + } + + @Override + protected boolean isEqual(SimpleString entry, String value) { + if (entry == null) { + return false; + } + return entry.toString().equals(value); + } + } } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java similarity index 91% rename from artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java index 7e1fe404a5..87c1b6ff76 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractByteBufPool.java @@ -28,13 +28,19 @@ import io.netty.util.internal.PlatformDependent; * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie * the same entry could be allocated multiple times by concurrent calls. */ -public abstract class AbstractInterner { +public abstract class AbstractByteBufPool { + + public static final int DEFAULT_POOL_CAPACITY = 32; private final T[] entries; private final int mask; private final int shift; - public AbstractInterner(final int capacity) { + public AbstractByteBufPool() { + this(DEFAULT_POOL_CAPACITY); + } + + public AbstractByteBufPool(final int capacity) { entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)]; mask = entries.length - 1; //log2 of entries.length @@ -105,10 +111,10 @@ public abstract class AbstractInterner { } /** - * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned, + * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be pooled, * {@code false} otherwise. */ - protected abstract boolean canIntern(ByteBuf byteBuf, int length); + protected abstract boolean canPool(ByteBuf byteBuf, int length); /** * Create a new entry. @@ -122,12 +128,13 @@ public abstract class AbstractInterner { protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length); /** - * Returns and interned entry if possible, a new one otherwise. + * Returns a pooled entry if possible, a new one otherwise. *

* The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it. */ - public final T intern(final ByteBuf byteBuf, final int length) { - if (!canIntern(byteBuf, length)) { + public final T getOrCreate(final ByteBuf byteBuf) { + final int length = byteBuf.readInt(); + if (!canPool(byteBuf, length)) { return create(byteBuf, length); } else { if (!byteBuf.isReadable(length)) { diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java new file mode 100644 index 0000000000..cc42e8f747 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractPool.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.MathUtil; + +/** + * Thread-safe {@code } interner. + *

+ * Differently from {@link String#intern()} it contains a fixed amount of entries and + * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie + * the same entry could be allocated multiple times by concurrent calls. + */ +public abstract class AbstractPool { + + public static final int DEFAULT_POOL_CAPACITY = 32; + + private final O[] entries; + private final int mask; + private final int shift; + + public AbstractPool() { + this(DEFAULT_POOL_CAPACITY); + } + + public AbstractPool(final int capacity) { + entries = (O[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)]; + mask = entries.length - 1; + //log2 of entries.length + shift = 31 - Integer.numberOfLeadingZeros(entries.length); + } + + /** + * Create a new entry. + */ + protected abstract O create(I value); + + /** + * Returns {@code true} if the {@code entry} content is equal to {@code value}; + */ + protected abstract boolean isEqual(O entry, I value); + + protected int hashCode(I value) { + return value.hashCode(); + } + + /** + * Returns and interned entry if possible, a new one otherwise. + *

+ * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it. + */ + public final O getOrCreate(final I value) { + if (value == null) { + return null; + } + final int hashCode = hashCode(value); + //fast % operation with power of 2 entries.length + final int firstIndex = hashCode & mask; + final O firstEntry = entries[firstIndex]; + if (isEqual(firstEntry, value)) { + return firstEntry; + } + final int secondIndex = (hashCode >> shift) & mask; + final O secondEntry = entries[secondIndex]; + if (isEqual(secondEntry, value)) { + return secondEntry; + } + final O internedEntry = create(value); + final int entryIndex = firstEntry == null ? firstIndex : secondIndex; + entries[entryIndex] = internedEntry; + return internedEntry; + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index e70891dda5..8835797ab5 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -22,6 +22,7 @@ import java.util.regex.Pattern; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; @@ -207,4 +208,125 @@ public class ByteUtil { throw ActiveMQUtilBundle.BUNDLE.failedToParseLong(text); } } + + public static boolean equals(final byte[] left, final byte[] right) { + return equals(left, right, 0, right.length); + } + + public static boolean equals(final byte[] left, + final byte[] right, + final int rightOffset, + final int rightLength) { + if (left == right) + return true; + if (left == null || right == null) + return false; + if (left.length != rightLength) + return false; + if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { + return equalsUnsafe(left, right, rightOffset, rightLength); + } else { + return equalsSafe(left, right, rightOffset, rightLength); + } + } + + private static boolean equalsSafe(byte[] left, byte[] right, int rightOffset, int rightLength) { + for (int i = 0; i < rightLength; i++) + if (left[i] != right[rightOffset + i]) + return false; + return true; + } + + private static boolean equalsUnsafe(final byte[] left, + final byte[] right, + final int rightOffset, + final int rightLength) { + final int longCount = rightLength >>> 3; + final int bytesCount = rightLength & 7; + int bytesIndex = rightOffset; + int charsIndex = 0; + for (int i = 0; i < longCount; i++) { + final long charsLong = PlatformDependent.getLong(left, charsIndex); + final long bytesLong = PlatformDependent.getLong(right, bytesIndex); + if (charsLong != bytesLong) { + return false; + } + bytesIndex += 8; + charsIndex += 8; + } + for (int i = 0; i < bytesCount; i++) { + final byte charsByte = PlatformDependent.getByte(left, charsIndex); + final byte bytesByte = PlatformDependent.getByte(right, bytesIndex); + if (charsByte != bytesByte) { + return false; + } + bytesIndex++; + charsIndex++; + } + return true; + } + + + /** + * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s}, + * {@code false} otherwise. + *

+ * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the + * length field. + */ + public static boolean equals(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) { + if (bytes.length != length) + return false; + if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { + if ((offset + length) > byteBuf.writerIndex()) { + throw new IndexOutOfBoundsException(); + } + if (byteBuf.hasArray()) { + return equals(bytes, byteBuf.array(), byteBuf.arrayOffset() + offset, length); + } else if (byteBuf.hasMemoryAddress()) { + return equalsOffHeap(bytes, byteBuf.memoryAddress(), offset, length); + } + } + return equalsOnHeap(bytes, byteBuf, offset, length); + } + + private static boolean equalsOnHeap(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) { + if (bytes.length != length) + return false; + for (int i = 0; i < length; i++) + if (bytes[i] != byteBuf.getByte(offset + i)) + return false; + return true; + } + + private static boolean equalsOffHeap(final byte[] bytes, + final long address, + final int offset, + final int length) { + final int longCount = length >>> 3; + final int bytesCount = length & 7; + long bytesAddress = address + offset; + int charsIndex = 0; + for (int i = 0; i < longCount; i++) { + final long charsLong = PlatformDependent.getLong(bytes, charsIndex); + final long bytesLong = PlatformDependent.getLong(bytesAddress); + if (charsLong != bytesLong) { + return false; + + } + bytesAddress += 8; + charsIndex += 8; + } + for (int i = 0; i < bytesCount; i++) { + final byte charsByte = PlatformDependent.getByte(bytes, charsIndex); + final byte bytesByte = PlatformDependent.getByte(bytesAddress); + if (charsByte != bytesByte) { + return false; + + } + bytesAddress++; + charsIndex++; + } + return true; + } } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index a3e4876bdb..56beb768e5 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -28,7 +28,7 @@ import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; -import org.apache.activemq.artemis.utils.AbstractInterner; +import org.apache.activemq.artemis.utils.AbstractByteBufPool; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; @@ -332,8 +332,7 @@ public class TypedProperties { } public synchronized void decode(final ByteBuf buffer, - final SimpleString.Interner keyInterner, - final StringValue.Interner valueInterner) { + final TypedPropertiesDecoderPools keyValuePools) { byte b = buffer.readByte(); if (b == DataConstants.NULL) { @@ -346,15 +345,7 @@ public class TypedProperties { size = 0; for (int i = 0; i < numHeaders; i++) { - final SimpleString key; - int len = buffer.readInt(); - if (keyInterner != null) { - key = keyInterner.intern(buffer, len); - } else { - byte[] data = new byte[len]; - buffer.readBytes(data); - key = new SimpleString(data); - } + final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool()); byte type = buffer.readByte(); @@ -412,12 +403,7 @@ public class TypedProperties { break; } case STRING: { - if (valueInterner != null) { - final int length = buffer.readInt(); - val = valueInterner.intern(buffer, length); - } else { - val = new StringValue(buffer); - } + val = StringValue.readStringValue(buffer, keyValuePools == null ? null : keyValuePools.getPropertyValuesPool()); doPutValue(key, val); break; } @@ -430,7 +416,7 @@ public class TypedProperties { } public synchronized void decode(final ByteBuf buffer) { - decode(buffer, null, null); + decode(buffer, null); } public synchronized void encode(final ByteBuf buffer) { @@ -901,44 +887,18 @@ public class TypedProperties { public static final class StringValue extends PropertyValue { - public static final class Interner extends AbstractInterner { - - private final int maxLength; - - public Interner(final int capacity, final int maxCharsLength) { - super(capacity); - this.maxLength = maxCharsLength; - } - - @Override - protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) { - if (entry == null) { - return false; - } - return SimpleString.isEqual(entry.val, byteBuf, offset, length); - } - - @Override - protected boolean canIntern(final ByteBuf byteBuf, final int length) { - assert length % 2 == 0 : "length must be a multiple of 2"; - final int expectedStringLength = length >> 1; - return expectedStringLength <= maxLength; - } - - @Override - protected StringValue create(final ByteBuf byteBuf, final int length) { - return new StringValue(SimpleString.readSimpleString(byteBuf, length)); - } - } - final SimpleString val; private StringValue(final SimpleString val) { this.val = val; } - private StringValue(final ByteBuf buffer) { - val = SimpleString.readSimpleString(buffer); + static StringValue readStringValue(final ByteBuf byteBuf, ByteBufStringValuePool pool) { + if (pool == null) { + return new StringValue(SimpleString.readSimpleString(byteBuf)); + } else { + return pool.getOrCreate(byteBuf); + } } @Override @@ -956,6 +916,90 @@ public class TypedProperties { public int encodeSize() { return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val); } + + public static final class ByteBufStringValuePool extends AbstractByteBufPool { + + private static final int UUID_LENGTH = 36; + + private final int maxLength; + + public ByteBufStringValuePool() { + this.maxLength = UUID_LENGTH; + } + + public ByteBufStringValuePool(final int capacity, final int maxCharsLength) { + super(capacity); + this.maxLength = maxCharsLength; + } + + @Override + protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) { + if (entry == null || entry.val == null) { + return false; + } + return entry.val.equals(byteBuf, offset, length); + } + + @Override + protected boolean canPool(final ByteBuf byteBuf, final int length) { + assert length % 2 == 0 : "length must be a multiple of 2"; + final int expectedStringLength = length >> 1; + return expectedStringLength <= maxLength; + } + + @Override + protected StringValue create(final ByteBuf byteBuf, final int length) { + return new StringValue(SimpleString.readSimpleString(byteBuf, length)); + } + } + } + + public static class TypedPropertiesDecoderPools { + + private SimpleString.ByteBufSimpleStringPool propertyKeysPool; + private TypedProperties.StringValue.ByteBufStringValuePool propertyValuesPool; + + public TypedPropertiesDecoderPools() { + this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(); + this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(); + } + + public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) { + this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength); + this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength); + } + + public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() { + return propertyKeysPool; + } + + public TypedProperties.StringValue.ByteBufStringValuePool getPropertyValuesPool() { + return propertyValuesPool; + } + } + + public static class TypedPropertiesStringSimpleStringPools { + + private SimpleString.StringSimpleStringPool propertyKeysPool; + private SimpleString.StringSimpleStringPool propertyValuesPool; + + public TypedPropertiesStringSimpleStringPools() { + this.propertyKeysPool = new SimpleString.StringSimpleStringPool(); + this.propertyValuesPool = new SimpleString.StringSimpleStringPool(); + } + + public TypedPropertiesStringSimpleStringPools(int keyPoolCapacity, int valuePoolCapacity) { + this.propertyKeysPool = new SimpleString.StringSimpleStringPool(keyPoolCapacity); + this.propertyValuesPool = new SimpleString.StringSimpleStringPool(valuePoolCapacity); + } + + public SimpleString.StringSimpleStringPool getPropertyKeysPool() { + return propertyKeysPool; + } + + public SimpleString.StringSimpleStringPool getPropertyValuesPool() { + return propertyValuesPool; + } } public boolean isEmpty() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index ddb8a3b7c4..d24cd9545e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; /** @@ -587,6 +588,8 @@ public interface Message { Message putStringProperty(SimpleString key, SimpleString value); + Message putStringProperty(SimpleString key, String value); + /** * Returns the size of the encoded message. */ @@ -649,6 +652,9 @@ public interface Message { /** This should make you convert your message into Core format. */ ICoreMessage toCore(); + /** This should make you convert your message into Core format. */ + ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools); + int getMemoryEstimate(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java index 91fb6caeda..8068aa9438 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -59,6 +60,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter public ClientMessageImpl() { } + public ClientMessageImpl(CoreMessageObjectPools coreMessageObjectPools) { + super(coreMessageObjectPools); + } + protected ClientMessageImpl(ClientMessageImpl other) { super(other); } @@ -96,11 +101,22 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter final long expiration, final long timestamp, final byte priority, - final int initialMessageBufferSize) { + final int initialMessageBufferSize, + final CoreMessageObjectPools coreMessageObjectPools) { + super(coreMessageObjectPools); this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable). setPriority(priority).initBuffer(initialMessageBufferSize); } + public ClientMessageImpl(final byte type, + final boolean durable, + final long expiration, + final long timestamp, + final byte priority, + final int initialMessageBufferSize) { + this(type, durable, expiration, timestamp, priority, initialMessageBufferSize, null); + } + @Override public TypedProperties getProperties() { return this.checkProperties(); @@ -285,6 +301,11 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter return (ClientMessageImpl) super.putStringProperty(key, value); } + @Override + public ClientMessageImpl putStringProperty(final SimpleString key, final String value) { + return (ClientMessageImpl) super.putStringProperty(key, value); + } + @Override public ClientMessageImpl putObjectProperty(final SimpleString key, final Object value) throws ActiveMQPropertyConversionException { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 61784ad5e7..b5f8a1b67d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -148,6 +149,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private final Executor closeExecutor; + private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); + ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory, final String name, final String username, @@ -869,7 +872,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final long expiration, final long timestamp, final byte priority) { - return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize); + return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize, coreMessageObjectPools); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 4ebf97ed25..888b78556b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -93,18 +93,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { protected volatile TypedProperties properties; - private final SimpleString.Interner keysInterner; - private final TypedProperties.StringValue.Interner valuesInterner; + private final CoreMessageObjectPools coreMessageObjectPools; - public CoreMessage(final SimpleString.Interner keysInterner, - final TypedProperties.StringValue.Interner valuesInterner) { - this.keysInterner = keysInterner; - this.valuesInterner = valuesInterner; + public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) { + this.coreMessageObjectPools = coreMessageObjectPools; } public CoreMessage() { - this.keysInterner = null; - this.valuesInterner = null; + this.coreMessageObjectPools = null; } /** On core there's no delivery annotation */ @@ -326,10 +322,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } public CoreMessage(long id, int bufferSize) { + this(id, bufferSize, null); + } + + public CoreMessage(long id, int bufferSize, CoreMessageObjectPools coreMessageObjectPools) { this.initBuffer(bufferSize); this.setMessageID(id); - this.keysInterner = null; - this.valuesInterner = null; + this.coreMessageObjectPools = coreMessageObjectPools; } protected CoreMessage(CoreMessage other, TypedProperties copyProperties) { @@ -343,8 +342,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { this.timestamp = other.timestamp; this.priority = other.priority; this.userID = other.userID; - this.keysInterner = other.keysInterner; - this.valuesInterner = other.valuesInterner; + this.coreMessageObjectPools = other.coreMessageObjectPools; if (copyProperties != null) { this.properties = new TypedProperties(copyProperties); } @@ -424,7 +422,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage setValidatedUserID(String validatedUserID) { - putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID)); + putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool())); return this; } @@ -479,7 +477,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { TypedProperties properties = new TypedProperties(); if (buffer != null && propertiesLocation >= 0) { final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); - properties.decode(byteBuf, keysInterner, valuesInterner); + properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); } this.properties = properties; } @@ -543,17 +541,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) { messageIDPosition = buffer.readerIndex(); messageID = buffer.readLong(); - int b = buffer.readByte(); - if (b != DataConstants.NULL) { - final int length = buffer.readInt(); - if (keysInterner != null) { - address = keysInterner.intern(buffer, length); - } else { - address = SimpleString.readSimpleString(buffer, length); - } - } else { - address = null; - } + + address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool()); if (buffer.readByte() == DataConstants.NOT_NULL) { byte[] bytes = new byte[16]; buffer.readBytes(bytes); @@ -571,7 +560,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { propertiesLocation = buffer.readerIndex(); } else { properties = new TypedProperties(); - properties.decode(buffer, keysInterner, valuesInterner); + properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools()); } } @@ -671,7 +660,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage setAddress(String address) { messageChanged(); - this.address = SimpleString.toSimpleString(address); + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); return this; } @@ -703,7 +692,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putBooleanProperty(final String key, final boolean value) { messageChanged(); checkProperties(); - properties.putBooleanProperty(new SimpleString(key), value); + properties.putBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -724,7 +713,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException { checkProperties(); - return properties.getBooleanProperty(new SimpleString(key)); + return properties.getBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -739,7 +728,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putByteProperty(final String key, final byte value) { messageChanged(); checkProperties(); - properties.putByteProperty(new SimpleString(key), value); + properties.putByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -752,7 +741,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException { - return getByteProperty(SimpleString.toSimpleString(key)); + return getByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -768,7 +757,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putBytesProperty(final String key, final byte[] value) { messageChanged(); checkProperties(); - properties.putBytesProperty(new SimpleString(key), value); + properties.putBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -780,7 +769,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException { - return getBytesProperty(new SimpleString(key)); + return getBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -795,7 +784,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putCharProperty(String key, char value) { messageChanged(); checkProperties(); - properties.putCharProperty(new SimpleString(key), value); + properties.putCharProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -811,7 +800,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putShortProperty(final String key, final short value) { messageChanged(); checkProperties(); - properties.putShortProperty(new SimpleString(key), value); + properties.putShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -827,7 +816,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putIntProperty(final String key, final int value) { messageChanged(); checkProperties(); - properties.putIntProperty(new SimpleString(key), value); + properties.putIntProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -854,7 +843,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putLongProperty(final String key, final long value) { messageChanged(); checkProperties(); - properties.putLongProperty(new SimpleString(key), value); + properties.putLongProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -882,7 +871,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putFloatProperty(final String key, final float value) { messageChanged(); checkProperties(); - properties.putFloatProperty(new SimpleString(key), value); + properties.putFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -898,7 +887,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage putDoubleProperty(final String key, final double value) { messageChanged(); checkProperties(); - properties.putDoubleProperty(new SimpleString(key), value); + properties.putDoubleProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -923,11 +912,20 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return this; } + @Override + public CoreMessage putStringProperty(final SimpleString key, final String value) { + messageChanged(); + checkProperties(); + properties.putSimpleStringProperty(key, SimpleString.toSimpleString(value, getPropertyValuesPool())); + return this; + } + + @Override public CoreMessage putStringProperty(final String key, final String value) { messageChanged(); checkProperties(); - properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value)); + properties.putSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), SimpleString.toSimpleString(value, getPropertyValuesPool())); return this; } @@ -943,7 +941,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Object getObjectProperty(final String key) { checkProperties(); - return getObjectProperty(SimpleString.toSimpleString(key)); + return getObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -955,7 +953,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException { messageChanged(); - putObjectProperty(new SimpleString(key), value); + putObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value); return this; } @@ -968,7 +966,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException { checkProperties(); - return properties.getShortProperty(new SimpleString(key)); + return properties.getShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -980,7 +978,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException { checkProperties(); - return properties.getFloatProperty(new SimpleString(key)); + return properties.getFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -996,7 +994,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public String getStringProperty(final String key) throws ActiveMQPropertyConversionException { - return getStringProperty(new SimpleString(key)); + return getStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -1008,7 +1006,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException { checkProperties(); - return properties.getSimpleStringProperty(new SimpleString(key)); + return properties.getSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -1025,7 +1023,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public Object removeProperty(final String key) { messageChanged(); checkProperties(); - Object oldValue = properties.removeProperty(new SimpleString(key)); + Object oldValue = properties.removeProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); if (oldValue != null) { messageChanged(); } @@ -1041,7 +1039,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public boolean containsProperty(final String key) { checkProperties(); - return properties.containsProperty(new SimpleString(key)); + return properties.containsProperty(SimpleString.toSimpleString(key, getPropertyKeysPool())); } @Override @@ -1115,6 +1113,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return this; } + @Override + public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { + return this; + } + @Override public String toString() { try { @@ -1135,4 +1138,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return new java.util.Date(timestamp).toString(); } } + + private SimpleString.StringSimpleStringPool getPropertyKeysPool() { + return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool(); + } + + private SimpleString.StringSimpleStringPool getPropertyValuesPool() { + return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool(); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java new file mode 100644 index 0000000000..d4e3ed1659 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessageObjectPools.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.message.impl; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.utils.collections.TypedProperties; + +public class CoreMessageObjectPools { + + private Supplier addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new); + private Supplier propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new); + + private Supplier groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); + private Supplier addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new); + private Supplier propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new); + + public CoreMessageObjectPools() { + } + + public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() { + return addressDecoderPool.get(); + } + + public SimpleString.StringSimpleStringPool getAddressStringSimpleStringPool() { + return addressStringSimpleStringPool.get(); + } + + public SimpleString.StringSimpleStringPool getGroupIdStringSimpleStringPool() { + return groupIdStringSimpleStringPool.get(); + } + + public TypedProperties.TypedPropertiesDecoderPools getPropertiesDecoderPools() { + return propertiesDecoderPools.get(); + } + + public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() { + return propertiesStringSimpleStringPools.get(); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java index 787e4997c6..ad8c7a9cb5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder; @@ -32,11 +33,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES public class ClientPacketDecoder extends PacketDecoder { private static final long serialVersionUID = 6952614096979334582L; - public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder(); - - protected ClientPacketDecoder() { - - } + protected final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); @Override public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) { @@ -56,9 +53,9 @@ public class ClientPacketDecoder extends PacketDecoder { switch (packetType) { case SESS_RECEIVE_MSG: { if (connection.isVersionBeforeAddressChange()) { - packet = new SessionReceiveMessage_1X(new ClientMessageImpl()); + packet = new SessionReceiveMessage_1X(new ClientMessageImpl(coreMessageObjectPools)); } else { - packet = new SessionReceiveMessage(new ClientMessageImpl()); + packet = new SessionReceiveMessage(new ClientMessageImpl(coreMessageObjectPools)); } break; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index f0005ff9ce..c58a0bde6b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -511,7 +511,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } protected PacketDecoder createPacketDecoder() { - return ClientPacketDecoder.INSTANCE; + return new ClientPacketDecoder(); } private void forceReturnChannel1(ActiveMQException cause) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java index 2660f96991..e8f5920491 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java @@ -109,12 +109,18 @@ public class MessageUtil { return message.getSimpleStringProperty(REPLYTO_HEADER_NAME); } - public static void setJMSReplyTo(Message message, final SimpleString dest) { - + public static void setJMSReplyTo(Message message, final String dest) { if (dest == null) { message.removeProperty(REPLYTO_HEADER_NAME); } else { + message.putStringProperty(REPLYTO_HEADER_NAME, dest); + } + } + public static void setJMSReplyTo(Message message, final SimpleString dest) { + if (dest == null) { + message.removeProperty(REPLYTO_HEADER_NAME); + } else { message.putStringProperty(REPLYTO_HEADER_NAME, dest); } } diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java index ec94011fd1..310b4ed160 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java @@ -337,7 +337,7 @@ public class CoreMessageTest { public String generate(String body) throws Exception { - ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024); + ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024, null); TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body)); message.setAddress(ADDRESS); diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java index 626dd4d1a5..7750564f26 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java @@ -99,26 +99,28 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se } } - public static String createQueueNameForSubscription(final boolean isDurable, + public static SimpleString createQueueNameForSubscription(final boolean isDurable, final String clientID, final String subscriptionName) { + final String queueName; if (clientID != null) { if (isDurable) { - return ActiveMQDestination.escape(clientID) + SEPARATOR + + queueName = ActiveMQDestination.escape(clientID) + SEPARATOR + ActiveMQDestination.escape(subscriptionName); } else { - return "nonDurable" + SEPARATOR + + queueName = "nonDurable" + SEPARATOR + ActiveMQDestination.escape(clientID) + SEPARATOR + ActiveMQDestination.escape(subscriptionName); } } else { if (isDurable) { - return ActiveMQDestination.escape(subscriptionName); + queueName = ActiveMQDestination.escape(subscriptionName); } else { - return "nonDurable" + SEPARATOR + + queueName = "nonDurable" + SEPARATOR + ActiveMQDestination.escape(subscriptionName); } } + return SimpleString.toSimpleString(queueName); } public static String createQueueNameForSharedSubscription(final boolean isDurable, @@ -192,10 +194,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se return new ActiveMQQueue(address); } + public static ActiveMQQueue createQueue(final SimpleString address) { + return new ActiveMQQueue(address); + } + public static ActiveMQTopic createTopic(final String address) { return new ActiveMQTopic(address); } + public static ActiveMQTopic createTopic(final SimpleString address) { + return new ActiveMQTopic(address); + } + public static ActiveMQTemporaryQueue createTemporaryQueue(final String address, final ActiveMQSession session) { return new ActiveMQTemporaryQueue(address, session); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index 896a8ed869..6e28c0e460 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -372,7 +372,7 @@ public class ActiveMQMessage implements javax.jms.Message { public void setJMSReplyTo(final Destination dest) throws JMSException { if (dest == null) { - MessageUtil.setJMSReplyTo(message, null); + MessageUtil.setJMSReplyTo(message, (String) null); replyTo = null; } else { if (dest instanceof ActiveMQDestination == false) { @@ -391,7 +391,7 @@ public class ActiveMQMessage implements javax.jms.Message { } ActiveMQDestination jbd = (ActiveMQDestination) dest; - MessageUtil.setJMSReplyTo(message, SimpleString.toSimpleString(prefix + jbd.getAddress())); + MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress()); replyTo = jbd; } @@ -401,14 +401,15 @@ public class ActiveMQMessage implements javax.jms.Message { public Destination getJMSDestination() throws JMSException { if (dest == null) { SimpleString address = message.getAddressSimpleString(); - String prefix = ""; - if (RoutingType.ANYCAST.equals(message.getRoutingType())) { - prefix = QUEUE_QUALIFIED_PREFIX; + if (address == null) { + dest = null; + } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) { + dest = ActiveMQDestination.createQueue(address); } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) { - prefix = TOPIC_QUALIFIED_PREFIX; + dest = ActiveMQDestination.createTopic(address); + } else { + dest = ActiveMQDestination.fromPrefixedName(address.toString()); } - - dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString()); } return dest; @@ -513,7 +514,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public boolean getBooleanProperty(final String name) throws JMSException { try { - return message.getBooleanProperty(new SimpleString(name)); + return message.getBooleanProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -522,7 +523,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public byte getByteProperty(final String name) throws JMSException { try { - return message.getByteProperty(new SimpleString(name)); + return message.getByteProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -531,7 +532,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public short getShortProperty(final String name) throws JMSException { try { - return message.getShortProperty(new SimpleString(name)); + return message.getShortProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -544,7 +545,7 @@ public class ActiveMQMessage implements javax.jms.Message { } try { - return message.getIntProperty(new SimpleString(name)); + return message.getIntProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -557,7 +558,7 @@ public class ActiveMQMessage implements javax.jms.Message { } try { - return message.getLongProperty(new SimpleString(name)); + return message.getLongProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -566,7 +567,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public float getFloatProperty(final String name) throws JMSException { try { - return message.getFloatProperty(new SimpleString(name)); + return message.getFloatProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -575,7 +576,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public double getDoubleProperty(final String name) throws JMSException { try { - return message.getDoubleProperty(new SimpleString(name)); + return message.getDoubleProperty(name); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -593,7 +594,7 @@ public class ActiveMQMessage implements javax.jms.Message { } else if (MessageUtil.JMSXUSERID.equals(name)) { return message.getValidatedUserID(); } else { - return message.getStringProperty(new SimpleString(name)); + return message.getStringProperty(name); } } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); @@ -608,7 +609,7 @@ public class ActiveMQMessage implements javax.jms.Message { Object val = message.getObjectProperty(name); if (val instanceof SimpleString) { - val = ((SimpleString) val).toString(); + val = val.toString(); } return val; } @@ -622,43 +623,43 @@ public class ActiveMQMessage implements javax.jms.Message { public void setBooleanProperty(final String name, final boolean value) throws JMSException { checkProperty(name); - message.putBooleanProperty(new SimpleString(name), value); + message.putBooleanProperty(name, value); } @Override public void setByteProperty(final String name, final byte value) throws JMSException { checkProperty(name); - message.putByteProperty(new SimpleString(name), value); + message.putByteProperty(name, value); } @Override public void setShortProperty(final String name, final short value) throws JMSException { checkProperty(name); - message.putShortProperty(new SimpleString(name), value); + message.putShortProperty(name, value); } @Override public void setIntProperty(final String name, final int value) throws JMSException { checkProperty(name); - message.putIntProperty(new SimpleString(name), value); + message.putIntProperty(name, value); } @Override public void setLongProperty(final String name, final long value) throws JMSException { checkProperty(name); - message.putLongProperty(new SimpleString(name), value); + message.putLongProperty(name, value); } @Override public void setFloatProperty(final String name, final float value) throws JMSException { checkProperty(name); - message.putFloatProperty(new SimpleString(name), value); + message.putFloatProperty(name, value); } @Override public void setDoubleProperty(final String name, final double value) throws JMSException { checkProperty(name); - message.putDoubleProperty(new SimpleString(name), value); + message.putDoubleProperty(name, value); } @Override @@ -670,7 +671,7 @@ public class ActiveMQMessage implements javax.jms.Message { } else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) { return; } else { - message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value)); + message.putStringProperty(name, value); } } @@ -703,7 +704,7 @@ public class ActiveMQMessage implements javax.jms.Message { } try { - message.putObjectProperty(new SimpleString(name), value); + message.putObjectProperty(name, value); } catch (ActiveMQPropertyConversionException e) { throw new MessageFormatException(e.getMessage()); } @@ -964,7 +965,7 @@ public class ActiveMQMessage implements javax.jms.Message { boolean result = false; if (jmsPropertyName.equals(name)) { - message.putStringProperty(corePropertyName, SimpleString.toSimpleString(value.toString())); + message.putStringProperty(corePropertyName, value.toString()); result = true; } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java index 2deefa974b..ff4ee0fbce 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueue.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client; import javax.jms.Queue; +import org.apache.activemq.artemis.api.core.SimpleString; + /** * ActiveMQ Artemis implementation of a JMS Queue. *
@@ -34,13 +36,17 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue { // Constructors -------------------------------------------------- public ActiveMQQueue() { - this(null); + this((SimpleString) null); } public ActiveMQQueue(final String address) { super(address, TYPE.QUEUE, null); } + public ActiveMQQueue(final SimpleString address) { + super(address, TYPE.QUEUE, null); + } + public ActiveMQQueue(final String address, boolean temporary) { super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 374a985e26..cf2ec59d3d 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); } - queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName)); + queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName); if (durability == ConsumerDurability.DURABLE) { try { @@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); } - queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName)); + queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName); QueueQuery subResponse = session.queueQuery(queueName); @@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new IllegalStateException("Cannot unsubscribe using a QueueSession"); } - SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name)); + SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name); try { QueueQuery response = session.queueQuery(queueName); diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java index 2762a9c8e7..1c70c5bde2 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQStreamMessage.java @@ -73,7 +73,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre // For testing only public ActiveMQStreamMessage() { - message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500); + message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500, null); } // Public -------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java index e22e67b65b..4dbefec660 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQTopic.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client; import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.SimpleString; + /** * ActiveMQ Artemis implementation of a JMS Topic. *
@@ -33,13 +35,17 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic { // Constructors -------------------------------------------------- public ActiveMQTopic() { - this(null); + this((SimpleString) null); } public ActiveMQTopic(final String address) { this(address, false); } + public ActiveMQTopic(final SimpleString address) { + super(address, TYPE.TOPIC, null); + } + public ActiveMQTopic(final String address, boolean temporary) { super(address, TYPE.TOPIC, null); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 2bdd88a543..cdab41224f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper; @@ -70,7 +71,7 @@ public class AMQPMessage extends RefCountMessage { boolean bufferValid; Boolean durable; long messageID; - String address; + SimpleString address; MessageImpl protonMessage; private volatile int memoryEstimate = -1; private long expiration = 0; @@ -90,6 +91,7 @@ public class AMQPMessage extends RefCountMessage { private ApplicationProperties applicationProperties; private long scheduledTime = -1; private String connectionID; + private final CoreMessageObjectPools coreMessageObjectPools; Set rejectedConsumers; @@ -98,9 +100,14 @@ public class AMQPMessage extends RefCountMessage { private volatile TypedProperties extraProperties; public AMQPMessage(long messageFormat, byte[] data) { + this(messageFormat, data, null); + } + + public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) { this.data = Unpooled.wrappedBuffer(data); this.messageFormat = messageFormat; this.bufferValid = true; + this.coreMessageObjectPools = coreMessageObjectPools; parseHeaders(); } @@ -108,12 +115,14 @@ public class AMQPMessage extends RefCountMessage { public AMQPMessage(long messageFormat) { this.messageFormat = messageFormat; this.bufferValid = false; + this.coreMessageObjectPools = null; } public AMQPMessage(long messageFormat, Message message) { this.messageFormat = messageFormat; this.protonMessage = (MessageImpl) message; this.bufferValid = false; + this.coreMessageObjectPools = null; } public AMQPMessage(Message message) { @@ -301,7 +310,7 @@ public class AMQPMessage extends RefCountMessage { parseHeaders(); if (_properties != null && _properties.getGroupId() != null) { - return SimpleString.toSimpleString(_properties.getGroupId()); + return SimpleString.toSimpleString(_properties.getGroupId(), coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool()); } else { return null; } @@ -588,36 +597,33 @@ public class AMQPMessage extends RefCountMessage { @Override public String getAddress() { - if (address == null) { - Properties properties = getProtonMessage().getProperties(); - if (properties != null) { - return properties.getTo(); - } else { - return null; - } - } else { - return address; - } + SimpleString addressSimpleString = getAddressSimpleString(); + return addressSimpleString == null ? null : addressSimpleString.toString(); } @Override public AMQPMessage setAddress(String address) { - this.address = address; + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); return this; } @Override public AMQPMessage setAddress(SimpleString address) { - if (address != null) { - return setAddress(address.toString()); - } else { - return setAddress((String) null); - } + this.address = address; + return this; } @Override public SimpleString getAddressSimpleString() { - return SimpleString.toSimpleString(getAddress()); + if (address == null) { + Properties properties = getProtonMessage().getProperties(); + if (properties != null) { + setAddress(properties.getTo()); + } else { + return null; + } + } + return address; } @Override @@ -977,7 +983,7 @@ public class AMQPMessage extends RefCountMessage { if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties); if (_properties != null) { if (address != null) { - _properties.setTo(address); + _properties.setTo(address.toString()); } getProtonMessage().setProperties(this._properties); } @@ -987,7 +993,7 @@ public class AMQPMessage extends RefCountMessage { @Override public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { - return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key)); + return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key), getPropertyValuesPool()); } @Override @@ -1065,11 +1071,16 @@ public class AMQPMessage extends RefCountMessage { return putStringProperty(key.toString(), value.toString()); } + @Override + public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) { + return putStringProperty(key.toString(), value); + } + @Override public Set getPropertyNames() { HashSet values = new HashSet<>(); for (Object k : getApplicationPropertiesMap().keySet()) { - values.add(SimpleString.toSimpleString(k.toString())); + values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool())); } return values; } @@ -1084,17 +1095,22 @@ public class AMQPMessage extends RefCountMessage { } @Override - public ICoreMessage toCore() { + public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { try { - return AMQPConverter.getInstance().toCore(this); + return AMQPConverter.getInstance().toCore(this, coreMessageObjectPools); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } + @Override + public ICoreMessage toCore() { + return toCore(null); + } + @Override public SimpleString getLastValueProperty() { - return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString()); + return getSimpleStringProperty(HDR_LAST_VALUE_NAME); } @Override @@ -1155,4 +1171,12 @@ public class AMQPMessage extends RefCountMessage { ", address=" + getAddress() + "]"; } + + private SimpleString.StringSimpleStringPool getPropertyKeysPool() { + return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool(); + } + + private SimpleString.StringSimpleStringPool getPropertyValuesPool() { + return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool(); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 19348f4f6a..7134d3bf9c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -101,6 +102,7 @@ public class AMQPSessionCallback implements SessionCallback { private final AtomicBoolean draining = new AtomicBoolean(false); + private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); private final AddressQueryCache addressQueryCache = new AddressQueryCache<>(); @@ -210,14 +212,14 @@ public class AMQPSessionCallback implements SessionCallback { } public Object createSender(ProtonServerSenderContext protonSender, - String queue, + SimpleString queue, String filter, boolean browserOnly) throws Exception { long consumerID = consumerIDGenerator.generateID(); filter = SelectorTranslator.convertToActiveMQFilterString(filter); - ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null); + ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null); // AMQP handles its own flow control for when it's started consumer.setStarted(true); @@ -233,48 +235,48 @@ public class AMQPSessionCallback implements SessionCallback { serverConsumer.receiveCredits(-1); } - public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false); + public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception { + serverSession.createQueue(queueName, queueName, routingType, null, true, false); } - public void createTemporaryQueue(String address, - String queueName, + public void createTemporaryQueue(SimpleString address, + SimpleString queueName, RoutingType routingType, - String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false); + SimpleString filter) throws Exception { + serverSession.createQueue(address, queueName, routingType, filter, true, false); } - public void createUnsharedDurableQueue(String address, + public void createUnsharedDurableQueue(SimpleString address, RoutingType routingType, - String queueName, - String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false); + SimpleString queueName, + SimpleString filter) throws Exception { + serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false); } - public void createSharedDurableQueue(String address, + public void createSharedDurableQueue(SimpleString address, RoutingType routingType, - String queueName, - String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false); + SimpleString queueName, + SimpleString filter) throws Exception { + serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false); } - public void createSharedVolatileQueue(String address, + public void createSharedVolatileQueue(SimpleString address, RoutingType routingType, - String queueName, - String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true); + SimpleString queueName, + SimpleString filter) throws Exception { + serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true); } - public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception { - QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); + public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception { + QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName); if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { try { - serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true, true); + serverSession.createQueue(queueName, queueName, routingType, null, false, true, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } - queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); + queueQueryResult = serverSession.executeQueueQuery(queueName); } // if auto-create we will return whatever type was used before @@ -287,32 +289,31 @@ public class AMQPSessionCallback implements SessionCallback { - public boolean bindingQuery(String address, RoutingType routingType) throws Exception { + public boolean bindingQuery(SimpleString address, RoutingType routingType) throws Exception { BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address); if (bindingQueryResult != null) { return bindingQueryResult.isExists(); } - SimpleString simpleAddress = SimpleString.toSimpleString(address); - bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + bindingQueryResult = serverSession.executeBindingQuery(address); if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) { try { - serverSession.createAddress(simpleAddress, routingType, true); + serverSession.createAddress(address, routingType, true); } catch (ActiveMQAddressExistsException e) { // The address may have been created by another thread in the mean time. Catch and do nothing. } - bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + bindingQueryResult = serverSession.executeBindingQuery(address); } else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) { - QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress); + QueueQueryResult queueBinding = serverSession.executeQueueQuery(address); if (!queueBinding.isExists()) { try { - serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true); + serverSession.createQueue(address, address, routingType, null, false, true, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } } - bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + bindingQueryResult = serverSession.executeBindingQuery(address); } bindingQueryCache.setResult(address, bindingQueryResult); @@ -320,7 +321,7 @@ public class AMQPSessionCallback implements SessionCallback { } - public AddressQueryResult addressQuery(String addressName, + public AddressQueryResult addressQuery(SimpleString addressName, RoutingType routingType, boolean autoCreate) throws Exception { @@ -329,15 +330,15 @@ public class AMQPSessionCallback implements SessionCallback { return addressQueryResult; } - addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + addressQueryResult = serverSession.executeAddressQuery(addressName); if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) { try { - serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true); + serverSession.createAddress(addressName, routingType, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } - addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + addressQueryResult = serverSession.executeAddressQuery(addressName); } addressQueryCache.setResult(addressName, addressQueryResult); @@ -438,15 +439,15 @@ public class AMQPSessionCallback implements SessionCallback { final Transaction transaction, final Receiver receiver, final Delivery delivery, - String address, + SimpleString address, int messageFormat, byte[] data) throws Exception { - AMQPMessage message = new AMQPMessage(messageFormat, data); + AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools); if (address != null) { - message.setAddress(new SimpleString(address)); + message.setAddress(address); } else { // Anonymous relay must set a To value - address = message.getAddress(); + address = message.getAddressSimpleString(); if (address == null) { rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); return; @@ -552,7 +553,7 @@ public class AMQPSessionCallback implements SessionCallback { }); } - public void offerProducerCredit(final String address, + public void offerProducerCredit(final SimpleString address, final int credits, final int threshold, final Receiver receiver) { @@ -567,7 +568,7 @@ public class AMQPSessionCallback implements SessionCallback { connection.flush(); return; } - final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address)); + final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); store.checkMemory(new Runnable() { @Override public void run() { @@ -587,8 +588,8 @@ public class AMQPSessionCallback implements SessionCallback { } } - public void deleteQueue(String queueName) throws Exception { - manager.getServer().destroyQueue(new SimpleString(queueName)); + public void deleteQueue(SimpleString queueName) throws Exception { + manager.getServer().destroyQueue(queueName); } public void resetContext(OperationContext oldContext) { @@ -657,7 +658,7 @@ public class AMQPSessionCallback implements SessionCallback { } @Override - public void disconnect(ServerConsumer consumer, String queueName) { + public void disconnect(ServerConsumer consumer, SimpleString queueName) { ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName); connection.lock(); try { @@ -703,12 +704,12 @@ public class AMQPSessionCallback implements SessionCallback { return serverSession.getAddress(address); } - public void removeTemporaryQueue(String address) throws Exception { - serverSession.deleteQueue(SimpleString.toSimpleString(address)); + public void removeTemporaryQueue(SimpleString address) throws Exception { + serverSession.deleteQueue(address); } - public RoutingType getDefaultRoutingType(String address) { - return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType(); + public RoutingType getDefaultRoutingType(SimpleString address) { + return manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultAddressRoutingType(); } public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception { @@ -733,10 +734,10 @@ public class AMQPSessionCallback implements SessionCallback { class AddressQueryCache { - String address; + SimpleString address; T result; - public synchronized T getResult(String parameterAddress) { + public synchronized T getResult(SimpleString parameterAddress) { if (address != null && address.equals(parameterAddress)) { return result; } else { @@ -746,7 +747,7 @@ public class AMQPSessionCallback implements SessionCallback { } } - public synchronized void setResult(String parameterAddress, T result) { + public synchronized void setResult(SimpleString parameterAddress, T result) { this.address = parameterAddress; this.result = result; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java index 724474b877..e67fc67fbf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.converter; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; @@ -38,7 +39,7 @@ public class AMQPConverter implements MessageConverter { } @Override - public ICoreMessage toCore(AMQPMessage messageSource) throws Exception { - return AmqpCoreConverter.toCore(messageSource); + public ICoreMessage toCore(AMQPMessage messageSource, CoreMessageObjectPools coreMessageObjectPools) throws Exception { + return AmqpCoreConverter.toCore(messageSource, coreMessageObjectPools); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java index da2f4e0f96..1bac1e5544 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; @@ -242,56 +243,56 @@ public final class AMQPMessageSupport { return null; } - public static ServerJMSBytesMessage createBytesMessage(long id) { - return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE)); + public static ServerJMSBytesMessage createBytesMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE, coreMessageObjectPools)); } - public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException { - ServerJMSBytesMessage message = createBytesMessage(id); + public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSBytesMessage message = createBytesMessage(id, coreMessageObjectPools); message.writeBytes(array, arrayOffset, length); return message; } - public static ServerJMSStreamMessage createStreamMessage(long id) { - return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE)); + public static ServerJMSStreamMessage createStreamMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE, coreMessageObjectPools)); } - public static ServerJMSMessage createMessage(long id) { - return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE)); + public static ServerJMSMessage createMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE, coreMessageObjectPools)); } - public static ServerJMSTextMessage createTextMessage(long id) { - return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE)); + public static ServerJMSTextMessage createTextMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE, coreMessageObjectPools)); } - public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException { - ServerJMSTextMessage message = createTextMessage(id); + public static ServerJMSTextMessage createTextMessage(long id, String text, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSTextMessage message = createTextMessage(id, coreMessageObjectPools); message.setText(text); return message; } - public static ServerJMSObjectMessage createObjectMessage(long id) { - return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE)); + public static ServerJMSObjectMessage createObjectMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE, coreMessageObjectPools)); } - public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(id); + public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools); message.setSerializedForm(serializedForm); return message; } - public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException { - ServerJMSObjectMessage message = createObjectMessage(id); + public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools); message.setSerializedForm(new Binary(array, offset, length)); return message; } - public static ServerJMSMapMessage createMapMessage(long id) { - return new ServerJMSMapMessage(newMessage(id, MAP_TYPE)); + public static ServerJMSMapMessage createMapMessage(long id, CoreMessageObjectPools coreMessageObjectPools) { + return new ServerJMSMapMessage(newMessage(id, MAP_TYPE, coreMessageObjectPools)); } - public static ServerJMSMapMessage createMapMessage(long id, Map content) throws JMSException { - ServerJMSMapMessage message = createMapMessage(id); + public static ServerJMSMapMessage createMapMessage(long id, Map content, CoreMessageObjectPools coreMessageObjectPools) throws JMSException { + ServerJMSMapMessage message = createMapMessage(id, coreMessageObjectPools); final Set> set = content.entrySet(); for (Map.Entry entry : set) { Object value = entry.getValue(); @@ -304,8 +305,8 @@ public final class AMQPMessageSupport { return message; } - private static CoreMessage newMessage(long id, byte messageType) { - CoreMessage message = new CoreMessage(id, 512); + private static CoreMessage newMessage(long id, byte messageType, CoreMessageObjectPools coreMessageObjectPools) { + CoreMessage message = new CoreMessage(id, 512, coreMessageObjectPools); message.setType(messageType); // ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); return message; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index fbaf0efa16..80969f6655 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -52,6 +52,7 @@ import javax.jms.JMSException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; @@ -89,31 +90,31 @@ import io.netty.buffer.PooledByteBufAllocator; public class AmqpCoreConverter { @SuppressWarnings("unchecked") - public static ICoreMessage toCore(AMQPMessage message) throws Exception { + public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception { Section body = message.getProtonMessage().getBody(); ServerJMSMessage result; if (body == null) { if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { - result = createObjectMessage(message.getMessageID()); + result = createObjectMessage(message.getMessageID(), coreMessageObjectPools); } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) { - result = createBytesMessage(message.getMessageID()); + result = createBytesMessage(message.getMessageID(), coreMessageObjectPools); } else { Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); if (charset != null) { - result = createTextMessage(message.getMessageID()); + result = createTextMessage(message.getMessageID(), coreMessageObjectPools); } else { - result = createMessage(message.getMessageID()); + result = createMessage(message.getMessageID(), coreMessageObjectPools); } } } else if (body instanceof Data) { Binary payload = ((Data) body).getValue(); if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { - result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) { - result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } else { Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); if (StandardCharsets.UTF_8.equals(charset)) { @@ -121,18 +122,18 @@ public class AmqpCoreConverter { try { CharBuffer chars = charset.newDecoder().decode(buf); - result = createTextMessage(message.getMessageID(), String.valueOf(chars)); + result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools); } catch (CharacterCodingException e) { - result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } } else { - result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } } } else if (body instanceof AmqpSequence) { AmqpSequence sequence = (AmqpSequence) body; - ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); + ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools); for (Object item : sequence.getValue()) { m.writeObject(item); } @@ -141,31 +142,31 @@ public class AmqpCoreConverter { } else if (body instanceof AmqpValue) { Object value = ((AmqpValue) body).getValue(); if (value == null || value instanceof String) { - result = createTextMessage(message.getMessageID(), (String) value); + result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools); } else if (value instanceof Binary) { Binary payload = (Binary) value; if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { - result = createObjectMessage(message.getMessageID(), payload); + result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools); } else { - result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); } } else if (value instanceof List) { - ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); + ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools); for (Object item : (List) value) { m.writeObject(item); } result = m; } else if (value instanceof Map) { - result = createMapMessage(message.getMessageID(), (Map) value); + result = createMapMessage(message.getMessageID(), (Map) value, coreMessageObjectPools); } else { ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); try { TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf)); TLSEncode.getEncoder().writeObject(body); - result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex()); + result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools); } finally { buf.release(); TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); @@ -186,7 +187,7 @@ public class AmqpCoreConverter { result.getInnerMessage().setReplyTo(message.getReplyTo()); result.getInnerMessage().setDurable(message.isDurable()); result.getInnerMessage().setPriority(message.getPriority()); - result.getInnerMessage().setAddress(message.getAddress()); + result.getInnerMessage().setAddress(message.getAddressSimpleString()); result.encode(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 3e1c0feeb8..3c35d763c4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -54,7 +54,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements protected final Receiver receiver; - protected String address; + protected SimpleString address; protected final AMQPSessionCallback sessionSPI; @@ -102,7 +102,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (target.getDynamic()) { // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // will be deleted on closing of the session - address = sessionSPI.tempQueueName(); + address = SimpleString.toSimpleString(sessionSPI.tempQueueName()); defRoutingType = getRoutingType(target.getCapabilities(), address); try { @@ -113,12 +113,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH; - target.setAddress(address); + target.setAddress(address.toString()); } else { // the target will have an address unless the remote is requesting an anonymous // relay in which case the address in the incoming message's to field will be // matched on receive of the message. - address = target.getAddress(); + address = SimpleString.toSimpleString(target.getAddress()); if (address != null && !address.isEmpty()) { defRoutingType = getRoutingType(target.getCapabilities(), address); @@ -134,7 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } try { - sessionSPI.check(SimpleString.toSimpleString(address), CheckType.SEND, new SecurityAuth() { + sessionSPI.check(address, CheckType.SEND, new SecurityAuth() { @Override public String getUsername() { String username = null; @@ -181,12 +181,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements flow(amqpCredits, minCreditRefresh); } - public RoutingType getRoutingType(Receiver receiver, String address) { + public RoutingType getRoutingType(Receiver receiver, SimpleString address) { org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address); } - private RoutingType getRoutingType(Symbol[] symbols, String address) { + private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) { if (symbols != null) { for (Symbol symbol : symbols) { if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { @@ -264,7 +264,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { try { - sessionSPI.removeTemporaryQueue(target.getAddress()); + sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(target.getAddress())); } catch (Exception e) { //ignore on close, its temp anyway and will be removed later } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index fbaae8ab42..1823168f34 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -102,7 +102,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private boolean shared = false; private boolean global = false; private boolean isVolatile = false; - private String tempQueueName; + private SimpleString tempQueueName; public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, @@ -157,7 +157,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr super.initialise(); Source source = (Source) sender.getRemoteSource(); - String queue = null; + SimpleString queue = null; String selector = null; final Map supportedFilters = new HashMap<>(); @@ -199,7 +199,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // the lifetime policy and capabilities of the new subscription. if (result.isExists()) { source = new org.apache.qpid.proton.amqp.messaging.Source(); - source.setAddress(queue); + source.setAddress(queue.toString()); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); source.setDistributionMode(COPY); @@ -240,7 +240,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else if (source.getDynamic()) { // if dynamic we have to create the node (queue) and set the address on the target, the // node is temporary and will be deleted on closing of the session - queue = java.util.UUID.randomUUID().toString(); + queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString()); tempQueueName = queue; try { sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST); @@ -248,7 +248,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } - source.setAddress(queue); + source.setAddress(queue.toString()); } else { SimpleString addressToUse; SimpleString queueNameToUse = null; @@ -269,7 +269,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr multicast = hasCapabilities(TOPIC, source); AddressQueryResult addressQueryResult = null; try { - addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true); + addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true); } catch (ActiveMQSecurityException e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage()); } catch (ActiveMQAMQPException e) { @@ -294,7 +294,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // if not we look up the address AddressQueryResult addressQueryResult = null; try { - addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true); + addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true); } catch (ActiveMQSecurityException e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage()); } catch (ActiveMQAMQPException e) { @@ -333,6 +333,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST); + SimpleString simpleStringSelector = SimpleString.toSimpleString(selector); //if the address specifies a broker configured queue then we always use this, treat it as a queue if (queue != null) { @@ -345,24 +346,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr String pubId = sender.getName(); queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false); QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); - if (result.isExists()) { // If a client reattaches to a durable subscription with a different no-local // filter value, selector or address then we must recreate the queue (JMS semantics). - if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { + if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { if (result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); - sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); } else { throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); } } } else { if (shared) { - sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); } else { - sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); } } } else { @@ -371,15 +371,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (shared && sender.getName() != null) { queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile); try { - sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector); } catch (ActiveMQQueueExistsException e) { //this is ok, just means its shared } } else { - queue = java.util.UUID.randomUUID().toString(); + queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString()); tempQueueName = queue; try { - sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); + sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } @@ -387,18 +387,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { if (queueNameToUse != null) { - SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST)); + SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST); if (matchingAnycastQueue != null) { - queue = matchingAnycastQueue.toString(); + queue = matchingAnycastQueue; } else { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); } } else { SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST); if (matchingAnycastQueue != null) { - queue = matchingAnycastQueue.toString(); + queue = matchingAnycastQueue; } else { - queue = addressToUse.toString(); + queue = addressToUse; } } @@ -437,16 +437,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception { + private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception { if (queueName != null) { - QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false); + QueueQueryResult result = sessionSPI.queueQuery(queueName, routingType, false); if (!result.isExists()) { throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist"); } else { if (!result.getAddress().equals(address)) { throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'"); } - return sessionSPI.getMatchingQueue(address, queueName, routingType).toString(); + return sessionSPI.getMatchingQueue(address, queueName, routingType); } } return null; @@ -495,7 +495,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (remoteLinkClose) { Source source = (Source) sender.getSource(); if (source != null && source.getAddress() != null && multicast) { - String queueName = source.getAddress(); + SimpleString queueName = SimpleString.toSimpleString(source.getAddress()); QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false); if (result.isExists() && source.getDynamic()) { sessionSPI.deleteQueue(queueName); @@ -508,7 +508,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (pubId.contains("|")) { pubId = pubId.split("\\|")[0]; } - String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile); + SimpleString queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile); result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); //only delete if it isn't volatile and has no consumers if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { @@ -518,7 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { try { - sessionSPI.removeTemporaryQueue(source.getAddress()); + sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress())); } catch (Exception e) { //ignore on close, its temp anyway and will be removed later } @@ -760,7 +760,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return false; } - private static String createQueueName(boolean useCoreSubscriptionNaming, + private static SimpleString createQueueName(boolean useCoreSubscriptionNaming, String clientId, String pubId, boolean shared, @@ -784,7 +784,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr queue += ":global"; } } - return queue; + return SimpleString.toSimpleString(queue); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java index d06464f2cd..ccafd37b04 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java @@ -620,7 +620,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) { - ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0); + ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0, null); if (compression) { // TODO @@ -647,7 +647,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSTextMessage createTextMessage(String text, boolean compression) { - ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0); + ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0, null); if (compression) { // TODO diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 73dbeaa4b5..da10f47db1 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -22,6 +22,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.core.config.WildcardConfiguration; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -56,11 +57,12 @@ public class MQTTSession { private MQTTProtocolManager protocolManager; - private boolean isClean; private WildcardConfiguration wildcardConfiguration; + private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); + public MQTTSession(MQTTProtocolHandler protocolHandler, MQTTConnection connection, MQTTProtocolManager protocolManager, @@ -195,4 +197,8 @@ public class MQTTSession { public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) { this.wildcardConfiguration = wildcardConfiguration; } + + public CoreMessageObjectPools getCoreMessageObjectPools() { + return coreMessageObjectPools; + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 21b1f2b369..39e2ba94ed 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -78,7 +78,7 @@ public class MQTTSessionCallback implements SessionCallback { } @Override - public void disconnect(ServerConsumer consumer, String queueName) { + public void disconnect(ServerConsumer consumer, SimpleString queueName) { try { consumer.removeItself(); } catch (Exception e) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 2cb1f7e2ab..2667f81f45 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -64,13 +64,13 @@ public class MQTTUtil { public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain."; - public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level"; + public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level"); - public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id"; + public static final SimpleString MQTT_MESSAGE_ID_KEY = SimpleString.toSimpleString("mqtt.message.id"); - public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type"; + public static final SimpleString MQTT_MESSAGE_TYPE_KEY = SimpleString.toSimpleString("mqtt.message.type"); - public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain"); + public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = SimpleString.toSimpleString("mqtt.message.retain"); public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; @@ -113,10 +113,10 @@ public class MQTTUtil { int qos) { long id = session.getServer().getStorageManager().generateID(); - CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); + CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE, session.getCoreMessageObjectPools()); message.setAddress(address); message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain); - message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); + message.putIntProperty(MQTT_QOS_LEVEL_KEY, qos); message.setType(Message.BYTES_TYPE); return message; } @@ -127,7 +127,8 @@ public class MQTTUtil { int qos, ByteBuf payload) { String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); - ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); + SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool()); + ICoreMessage message = createServerMessage(session, address, retain, qos); message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes()); return message; @@ -135,8 +136,8 @@ public class MQTTUtil { public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { Message message = createServerMessage(session, address, false, 1); - message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId); - message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value()); + message.putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId); + message.putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, MqttMessageType.PUBREL.value()); return message; } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 9923953aed..86a95db500 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1121,7 +1121,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { - SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); + SimpleString subQueueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()); server.destroyQueue(subQueueName); return null; diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 6af9997bd7..83ff6d6bea 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.MessageReference; @@ -108,10 +109,11 @@ public class OpenWireMessageConverter implements MessageConverter { - ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception; + ICoreMessage toCore(ProtocolMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception; ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index ae1612f292..c4a2dbe5a0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -81,7 +81,7 @@ public interface SessionCallback { void closed(); - void disconnect(ServerConsumer consumerId, String queueName); + void disconnect(ServerConsumer consumerId, SimpleString queueName); boolean isWritable(ReadyListener callback, Object protocolContext); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 27071901bb..5cfac12bf0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.Persister; @@ -334,6 +335,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { @Override public CoreMessage toCore() { + return toCore(null); + } + + @Override + public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { return null; } @@ -590,6 +596,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } + @Override + public Message putStringProperty(SimpleString key, String value) { + return null; + } + @Override public Message putStringProperty(String key, String value) { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index d7c9855701..078c397803 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -386,6 +387,11 @@ public class AcknowledgeTest extends ActiveMQTestBase { @Override public ICoreMessage toCore() { + return toCore(null); + } + + @Override + public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { return null; } @@ -647,6 +653,11 @@ public class AcknowledgeTest extends ActiveMQTestBase { return null; } + @Override + public Message putStringProperty(SimpleString key, String value) { + return null; + } + @Override public Message putStringProperty(String key, String value) { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 2f254804cd..dc57a12aa2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -585,7 +585,7 @@ public class HangConsumerTest extends ActiveMQTestBase { } @Override - public void disconnect(ServerConsumer consumerId, String queueName) { + public void disconnect(ServerConsumer consumerId, SimpleString queueName) { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java index 790ed82ad5..aaf29b0a1a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java @@ -128,7 +128,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { msg.putStringProperty("myNonAsciiStringProperty", international.toString()); msg.putStringProperty("mySpecialCharacters", special); msg.putStringProperty(new SimpleString("mySimpleStringProperty"), new SimpleString("mySimpleStringPropertyValue_" + i)); - msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), null); + msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), (SimpleString) null); producer.send(msg); }