ARTEMIS-1586 Refactor to make more generic

* Move byte util code into ByteUtil
* Re-use the new equals method in SimpleString
* Apply same pools/interners to client decode
* Create String to SimpleString pools/interners for property access via String keys (producer and consumer benefits)
* Lazy init the pools on withing the get methods of CoreMessageObjectPools to get the specific pool, to avoid having this scattered every where.
* reduce SimpleString creation in conversion to/from core message methods with JMS wrapper.
* reduce SimpleString creation in conversion to/from Core in OpenWire, AMQP, MQTT.
This commit is contained in:
Michael André Pearce 2018-01-10 08:48:14 +00:00 committed by Michael Pearce
parent 8d776eddfc
commit 98028cdecc
47 changed files with 910 additions and 505 deletions

View File

@ -21,8 +21,9 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.utils.AbstractByteBufPool;
import org.apache.activemq.artemis.utils.AbstractInterner; import org.apache.activemq.artemis.utils.AbstractPool;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
/** /**
@ -33,129 +34,6 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/ */
public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> { public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
public static final class Interner extends AbstractInterner<SimpleString> {
private final int maxLength;
public Interner(final int capacity, final int maxCharsLength) {
super(capacity);
this.maxLength = maxCharsLength;
}
@Override
protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
return SimpleString.isEqual(entry, byteBuf, offset, length);
}
@Override
protected boolean canIntern(final ByteBuf byteBuf, final int length) {
assert length % 2 == 0 : "length must be a multiple of 2";
final int expectedStringLength = length >> 1;
return expectedStringLength <= maxLength;
}
@Override
protected SimpleString create(final ByteBuf byteBuf, final int length) {
return readSimpleString(byteBuf, length);
}
}
/**
* Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
* {@code false} otherwise.
* <p>
* It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
* length field.
*/
public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) {
if (s == null) {
return false;
}
final byte[] chars = s.getData();
if (chars.length != length)
return false;
if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
if ((offset + length) > bytes.writerIndex()) {
throw new IndexOutOfBoundsException();
}
if (bytes.hasArray()) {
return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length);
} else if (bytes.hasMemoryAddress()) {
return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length);
}
}
return byteBufIsEqual(chars, bytes, offset, length);
}
private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) {
for (int i = 0; i < length; i++)
if (chars[i] != bytes.getByte(offset + i))
return false;
return true;
}
private static boolean batchOnHeapIsEqual(final byte[] chars,
final byte[] array,
final int arrayOffset,
final int length) {
final int longCount = length >>> 3;
final int bytesCount = length & 7;
int bytesIndex = arrayOffset;
int charsIndex = 0;
for (int i = 0; i < longCount; i++) {
final long charsLong = PlatformDependent.getLong(chars, charsIndex);
final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
if (charsLong != bytesLong) {
return false;
}
bytesIndex += 8;
charsIndex += 8;
}
for (int i = 0; i < bytesCount; i++) {
final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
if (charsByte != bytesByte) {
return false;
}
bytesIndex++;
charsIndex++;
}
return true;
}
private static boolean batchOffHeapIsEqual(final byte[] chars,
final long address,
final int offset,
final int length) {
final int longCount = length >>> 3;
final int bytesCount = length & 7;
long bytesAddress = address + offset;
int charsIndex = 0;
for (int i = 0; i < longCount; i++) {
final long charsLong = PlatformDependent.getLong(chars, charsIndex);
final long bytesLong = PlatformDependent.getLong(bytesAddress);
if (charsLong != bytesLong) {
return false;
}
bytesAddress += 8;
charsIndex += 8;
}
for (int i = 0; i < bytesCount; i++) {
final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
final byte bytesByte = PlatformDependent.getByte(bytesAddress);
if (charsByte != bytesByte) {
return false;
}
bytesAddress++;
charsIndex++;
}
return true;
}
private static final long serialVersionUID = 4204223851422244307L; private static final long serialVersionUID = 4204223851422244307L;
// Attributes // Attributes
@ -185,6 +63,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return new SimpleString(string); return new SimpleString(string);
} }
public static SimpleString toSimpleString(final String string, StringSimpleStringPool pool) {
if (pool == null) {
return toSimpleString(string);
}
return pool.getOrCreate(string);
}
// Constructors // Constructors
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
@ -236,6 +121,10 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
data[1] = high; data[1] = high;
} }
public boolean isEmpty() {
return data.length == 0;
}
// CharSequence implementation // CharSequence implementation
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -267,11 +156,26 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return readSimpleString(buffer); return readSimpleString(buffer);
} }
public static SimpleString readNullableSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) {
int b = buffer.readByte();
if (b == DataConstants.NULL) {
return null;
}
return readSimpleString(buffer, pool);
}
public static SimpleString readSimpleString(ByteBuf buffer) { public static SimpleString readSimpleString(ByteBuf buffer) {
int len = buffer.readInt(); int len = buffer.readInt();
return readSimpleString(buffer, len); return readSimpleString(buffer, len);
} }
public static SimpleString readSimpleString(ByteBuf buffer, ByteBufSimpleStringPool pool) {
if (pool == null) {
return readSimpleString(buffer);
}
return pool.getOrCreate(buffer);
}
public static SimpleString readSimpleString(final ByteBuf buffer, final int length) { public static SimpleString readSimpleString(final ByteBuf buffer, final int length) {
byte[] data = new byte[length]; byte[] data = new byte[length];
buffer.readBytes(data); buffer.readBytes(data);
@ -381,22 +285,23 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
if (other instanceof SimpleString) { if (other instanceof SimpleString) {
SimpleString s = (SimpleString) other; SimpleString s = (SimpleString) other;
if (data.length != s.data.length) { return ByteUtil.equals(data, s.data);
return false;
}
for (int i = 0; i < data.length; i++) {
if (data[i] != s.data[i]) {
return false;
}
}
return true;
} else { } else {
return false; return false;
} }
} }
/**
* Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
* {@code false} otherwise.
* <p>
* It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
* length field.
*/
public boolean equals(final ByteBuf byteBuf, final int offset, final int length) {
return ByteUtil.equals(data, byteBuf, offset, length);
}
@Override @Override
public int hashCode() { public int hashCode() {
if (hash == 0) { if (hash == 0) {
@ -575,4 +480,64 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
dst[d++] = (char) (low | high); dst[d++] = (char) (low | high);
} }
} }
public static final class ByteBufSimpleStringPool extends AbstractByteBufPool<SimpleString> {
private static final int UUID_LENGTH = 36;
private final int maxLength;
public ByteBufSimpleStringPool() {
this.maxLength = UUID_LENGTH;
}
public ByteBufSimpleStringPool(final int capacity, final int maxCharsLength) {
super(capacity);
this.maxLength = maxCharsLength;
}
@Override
protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
if (entry == null) {
return false;
}
return entry.equals(byteBuf, offset, length);
}
@Override
protected boolean canPool(final ByteBuf byteBuf, final int length) {
assert length % 2 == 0 : "length must be a multiple of 2";
final int expectedStringLength = length >> 1;
return expectedStringLength <= maxLength;
}
@Override
protected SimpleString create(final ByteBuf byteBuf, final int length) {
return readSimpleString(byteBuf, length);
}
}
public static final class StringSimpleStringPool extends AbstractPool<String, SimpleString> {
public StringSimpleStringPool() {
super();
}
public StringSimpleStringPool(final int capacity) {
super(capacity);
}
@Override
protected SimpleString create(String value) {
return toSimpleString(value);
}
@Override
protected boolean isEqual(SimpleString entry, String value) {
if (entry == null) {
return false;
}
return entry.toString().equals(value);
}
}
} }

View File

@ -28,13 +28,19 @@ import io.netty.util.internal.PlatformDependent;
* when used by concurrent threads it doesn't ensure the uniqueness of the entries ie * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
* the same entry could be allocated multiple times by concurrent calls. * the same entry could be allocated multiple times by concurrent calls.
*/ */
public abstract class AbstractInterner<T> { public abstract class AbstractByteBufPool<T> {
public static final int DEFAULT_POOL_CAPACITY = 32;
private final T[] entries; private final T[] entries;
private final int mask; private final int mask;
private final int shift; private final int shift;
public AbstractInterner(final int capacity) { public AbstractByteBufPool() {
this(DEFAULT_POOL_CAPACITY);
}
public AbstractByteBufPool(final int capacity) {
entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)]; entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
mask = entries.length - 1; mask = entries.length - 1;
//log2 of entries.length //log2 of entries.length
@ -105,10 +111,10 @@ public abstract class AbstractInterner<T> {
} }
/** /**
* Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned, * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be pooled,
* {@code false} otherwise. * {@code false} otherwise.
*/ */
protected abstract boolean canIntern(ByteBuf byteBuf, int length); protected abstract boolean canPool(ByteBuf byteBuf, int length);
/** /**
* Create a new entry. * Create a new entry.
@ -122,12 +128,13 @@ public abstract class AbstractInterner<T> {
protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length); protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
/** /**
* Returns and interned entry if possible, a new one otherwise. * Returns a pooled entry if possible, a new one otherwise.
* <p> * <p>
* The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it. * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
*/ */
public final T intern(final ByteBuf byteBuf, final int length) { public final T getOrCreate(final ByteBuf byteBuf) {
if (!canIntern(byteBuf, length)) { final int length = byteBuf.readInt();
if (!canPool(byteBuf, length)) {
return create(byteBuf, length); return create(byteBuf, length);
} else { } else {
if (!byteBuf.isReadable(length)) { if (!byteBuf.isReadable(length)) {

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.MathUtil;
/**
* Thread-safe {@code <T>} interner.
* <p>
* Differently from {@link String#intern()} it contains a fixed amount of entries and
* when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
* the same entry could be allocated multiple times by concurrent calls.
*/
public abstract class AbstractPool<I, O> {
public static final int DEFAULT_POOL_CAPACITY = 32;
private final O[] entries;
private final int mask;
private final int shift;
public AbstractPool() {
this(DEFAULT_POOL_CAPACITY);
}
public AbstractPool(final int capacity) {
entries = (O[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
mask = entries.length - 1;
//log2 of entries.length
shift = 31 - Integer.numberOfLeadingZeros(entries.length);
}
/**
* Create a new entry.
*/
protected abstract O create(I value);
/**
* Returns {@code true} if the {@code entry} content is equal to {@code value};
*/
protected abstract boolean isEqual(O entry, I value);
protected int hashCode(I value) {
return value.hashCode();
}
/**
* Returns and interned entry if possible, a new one otherwise.
* <p>
* The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
*/
public final O getOrCreate(final I value) {
if (value == null) {
return null;
}
final int hashCode = hashCode(value);
//fast % operation with power of 2 entries.length
final int firstIndex = hashCode & mask;
final O firstEntry = entries[firstIndex];
if (isEqual(firstEntry, value)) {
return firstEntry;
}
final int secondIndex = (hashCode >> shift) & mask;
final O secondEntry = entries[secondIndex];
if (isEqual(secondEntry, value)) {
return secondEntry;
}
final O internedEntry = create(value);
final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
entries[entryIndex] = internedEntry;
return internedEntry;
}
}

View File

@ -22,6 +22,7 @@ import java.util.regex.Pattern;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
@ -207,4 +208,125 @@ public class ByteUtil {
throw ActiveMQUtilBundle.BUNDLE.failedToParseLong(text); throw ActiveMQUtilBundle.BUNDLE.failedToParseLong(text);
} }
} }
public static boolean equals(final byte[] left, final byte[] right) {
return equals(left, right, 0, right.length);
}
public static boolean equals(final byte[] left,
final byte[] right,
final int rightOffset,
final int rightLength) {
if (left == right)
return true;
if (left == null || right == null)
return false;
if (left.length != rightLength)
return false;
if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
return equalsUnsafe(left, right, rightOffset, rightLength);
} else {
return equalsSafe(left, right, rightOffset, rightLength);
}
}
private static boolean equalsSafe(byte[] left, byte[] right, int rightOffset, int rightLength) {
for (int i = 0; i < rightLength; i++)
if (left[i] != right[rightOffset + i])
return false;
return true;
}
private static boolean equalsUnsafe(final byte[] left,
final byte[] right,
final int rightOffset,
final int rightLength) {
final int longCount = rightLength >>> 3;
final int bytesCount = rightLength & 7;
int bytesIndex = rightOffset;
int charsIndex = 0;
for (int i = 0; i < longCount; i++) {
final long charsLong = PlatformDependent.getLong(left, charsIndex);
final long bytesLong = PlatformDependent.getLong(right, bytesIndex);
if (charsLong != bytesLong) {
return false;
}
bytesIndex += 8;
charsIndex += 8;
}
for (int i = 0; i < bytesCount; i++) {
final byte charsByte = PlatformDependent.getByte(left, charsIndex);
final byte bytesByte = PlatformDependent.getByte(right, bytesIndex);
if (charsByte != bytesByte) {
return false;
}
bytesIndex++;
charsIndex++;
}
return true;
}
/**
* Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
* {@code false} otherwise.
* <p>
* It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
* length field.
*/
public static boolean equals(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) {
if (bytes.length != length)
return false;
if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
if ((offset + length) > byteBuf.writerIndex()) {
throw new IndexOutOfBoundsException();
}
if (byteBuf.hasArray()) {
return equals(bytes, byteBuf.array(), byteBuf.arrayOffset() + offset, length);
} else if (byteBuf.hasMemoryAddress()) {
return equalsOffHeap(bytes, byteBuf.memoryAddress(), offset, length);
}
}
return equalsOnHeap(bytes, byteBuf, offset, length);
}
private static boolean equalsOnHeap(final byte[] bytes, final ByteBuf byteBuf, final int offset, final int length) {
if (bytes.length != length)
return false;
for (int i = 0; i < length; i++)
if (bytes[i] != byteBuf.getByte(offset + i))
return false;
return true;
}
private static boolean equalsOffHeap(final byte[] bytes,
final long address,
final int offset,
final int length) {
final int longCount = length >>> 3;
final int bytesCount = length & 7;
long bytesAddress = address + offset;
int charsIndex = 0;
for (int i = 0; i < longCount; i++) {
final long charsLong = PlatformDependent.getLong(bytes, charsIndex);
final long bytesLong = PlatformDependent.getLong(bytesAddress);
if (charsLong != bytesLong) {
return false;
}
bytesAddress += 8;
charsIndex += 8;
}
for (int i = 0; i < bytesCount; i++) {
final byte charsByte = PlatformDependent.getByte(bytes, charsIndex);
final byte bytesByte = PlatformDependent.getByte(bytesAddress);
if (charsByte != bytesByte) {
return false;
}
bytesAddress++;
charsIndex++;
}
return true;
}
} }

View File

@ -28,7 +28,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
import org.apache.activemq.artemis.utils.AbstractInterner; import org.apache.activemq.artemis.utils.AbstractByteBufPool;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
@ -332,8 +332,7 @@ public class TypedProperties {
} }
public synchronized void decode(final ByteBuf buffer, public synchronized void decode(final ByteBuf buffer,
final SimpleString.Interner keyInterner, final TypedPropertiesDecoderPools keyValuePools) {
final StringValue.Interner valueInterner) {
byte b = buffer.readByte(); byte b = buffer.readByte();
if (b == DataConstants.NULL) { if (b == DataConstants.NULL) {
@ -346,15 +345,7 @@ public class TypedProperties {
size = 0; size = 0;
for (int i = 0; i < numHeaders; i++) { for (int i = 0; i < numHeaders; i++) {
final SimpleString key; final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
int len = buffer.readInt();
if (keyInterner != null) {
key = keyInterner.intern(buffer, len);
} else {
byte[] data = new byte[len];
buffer.readBytes(data);
key = new SimpleString(data);
}
byte type = buffer.readByte(); byte type = buffer.readByte();
@ -412,12 +403,7 @@ public class TypedProperties {
break; break;
} }
case STRING: { case STRING: {
if (valueInterner != null) { val = StringValue.readStringValue(buffer, keyValuePools == null ? null : keyValuePools.getPropertyValuesPool());
final int length = buffer.readInt();
val = valueInterner.intern(buffer, length);
} else {
val = new StringValue(buffer);
}
doPutValue(key, val); doPutValue(key, val);
break; break;
} }
@ -430,7 +416,7 @@ public class TypedProperties {
} }
public synchronized void decode(final ByteBuf buffer) { public synchronized void decode(final ByteBuf buffer) {
decode(buffer, null, null); decode(buffer, null);
} }
public synchronized void encode(final ByteBuf buffer) { public synchronized void encode(final ByteBuf buffer) {
@ -901,44 +887,18 @@ public class TypedProperties {
public static final class StringValue extends PropertyValue { public static final class StringValue extends PropertyValue {
public static final class Interner extends AbstractInterner<StringValue> {
private final int maxLength;
public Interner(final int capacity, final int maxCharsLength) {
super(capacity);
this.maxLength = maxCharsLength;
}
@Override
protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) {
if (entry == null) {
return false;
}
return SimpleString.isEqual(entry.val, byteBuf, offset, length);
}
@Override
protected boolean canIntern(final ByteBuf byteBuf, final int length) {
assert length % 2 == 0 : "length must be a multiple of 2";
final int expectedStringLength = length >> 1;
return expectedStringLength <= maxLength;
}
@Override
protected StringValue create(final ByteBuf byteBuf, final int length) {
return new StringValue(SimpleString.readSimpleString(byteBuf, length));
}
}
final SimpleString val; final SimpleString val;
private StringValue(final SimpleString val) { private StringValue(final SimpleString val) {
this.val = val; this.val = val;
} }
private StringValue(final ByteBuf buffer) { static StringValue readStringValue(final ByteBuf byteBuf, ByteBufStringValuePool pool) {
val = SimpleString.readSimpleString(buffer); if (pool == null) {
return new StringValue(SimpleString.readSimpleString(byteBuf));
} else {
return pool.getOrCreate(byteBuf);
}
} }
@Override @Override
@ -956,6 +916,90 @@ public class TypedProperties {
public int encodeSize() { public int encodeSize() {
return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val); return DataConstants.SIZE_BYTE + SimpleString.sizeofString(val);
} }
public static final class ByteBufStringValuePool extends AbstractByteBufPool<StringValue> {
private static final int UUID_LENGTH = 36;
private final int maxLength;
public ByteBufStringValuePool() {
this.maxLength = UUID_LENGTH;
}
public ByteBufStringValuePool(final int capacity, final int maxCharsLength) {
super(capacity);
this.maxLength = maxCharsLength;
}
@Override
protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) {
if (entry == null || entry.val == null) {
return false;
}
return entry.val.equals(byteBuf, offset, length);
}
@Override
protected boolean canPool(final ByteBuf byteBuf, final int length) {
assert length % 2 == 0 : "length must be a multiple of 2";
final int expectedStringLength = length >> 1;
return expectedStringLength <= maxLength;
}
@Override
protected StringValue create(final ByteBuf byteBuf, final int length) {
return new StringValue(SimpleString.readSimpleString(byteBuf, length));
}
}
}
public static class TypedPropertiesDecoderPools {
private SimpleString.ByteBufSimpleStringPool propertyKeysPool;
private TypedProperties.StringValue.ByteBufStringValuePool propertyValuesPool;
public TypedPropertiesDecoderPools() {
this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool();
this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool();
}
public TypedPropertiesDecoderPools(int keyPoolCapacity, int valuePoolCapacity, int maxCharsLength) {
this.propertyKeysPool = new SimpleString.ByteBufSimpleStringPool(keyPoolCapacity, maxCharsLength);
this.propertyValuesPool = new TypedProperties.StringValue.ByteBufStringValuePool(valuePoolCapacity, maxCharsLength);
}
public SimpleString.ByteBufSimpleStringPool getPropertyKeysPool() {
return propertyKeysPool;
}
public TypedProperties.StringValue.ByteBufStringValuePool getPropertyValuesPool() {
return propertyValuesPool;
}
}
public static class TypedPropertiesStringSimpleStringPools {
private SimpleString.StringSimpleStringPool propertyKeysPool;
private SimpleString.StringSimpleStringPool propertyValuesPool;
public TypedPropertiesStringSimpleStringPools() {
this.propertyKeysPool = new SimpleString.StringSimpleStringPool();
this.propertyValuesPool = new SimpleString.StringSimpleStringPool();
}
public TypedPropertiesStringSimpleStringPools(int keyPoolCapacity, int valuePoolCapacity) {
this.propertyKeysPool = new SimpleString.StringSimpleStringPool(keyPoolCapacity);
this.propertyValuesPool = new SimpleString.StringSimpleStringPool(valuePoolCapacity);
}
public SimpleString.StringSimpleStringPool getPropertyKeysPool() {
return propertyKeysPool;
}
public SimpleString.StringSimpleStringPool getPropertyValuesPool() {
return propertyValuesPool;
}
} }
public boolean isEmpty() { public boolean isEmpty() {

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
/** /**
@ -587,6 +588,8 @@ public interface Message {
Message putStringProperty(SimpleString key, SimpleString value); Message putStringProperty(SimpleString key, SimpleString value);
Message putStringProperty(SimpleString key, String value);
/** /**
* Returns the size of the <em>encoded</em> message. * Returns the size of the <em>encoded</em> message.
*/ */
@ -649,6 +652,9 @@ public interface Message {
/** This should make you convert your message into Core format. */ /** This should make you convert your message into Core format. */
ICoreMessage toCore(); ICoreMessage toCore();
/** This should make you convert your message into Core format. */
ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools);
int getMemoryEstimate(); int getMemoryEstimate();

View File

@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -59,6 +60,10 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
public ClientMessageImpl() { public ClientMessageImpl() {
} }
public ClientMessageImpl(CoreMessageObjectPools coreMessageObjectPools) {
super(coreMessageObjectPools);
}
protected ClientMessageImpl(ClientMessageImpl other) { protected ClientMessageImpl(ClientMessageImpl other) {
super(other); super(other);
} }
@ -96,11 +101,22 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
final long expiration, final long expiration,
final long timestamp, final long timestamp,
final byte priority, final byte priority,
final int initialMessageBufferSize) { final int initialMessageBufferSize,
final CoreMessageObjectPools coreMessageObjectPools) {
super(coreMessageObjectPools);
this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable). this.setType(type).setExpiration(expiration).setTimestamp(timestamp).setDurable(durable).
setPriority(priority).initBuffer(initialMessageBufferSize); setPriority(priority).initBuffer(initialMessageBufferSize);
} }
public ClientMessageImpl(final byte type,
final boolean durable,
final long expiration,
final long timestamp,
final byte priority,
final int initialMessageBufferSize) {
this(type, durable, expiration, timestamp, priority, initialMessageBufferSize, null);
}
@Override @Override
public TypedProperties getProperties() { public TypedProperties getProperties() {
return this.checkProperties(); return this.checkProperties();
@ -285,6 +301,11 @@ public class ClientMessageImpl extends CoreMessage implements ClientMessageInter
return (ClientMessageImpl) super.putStringProperty(key, value); return (ClientMessageImpl) super.putStringProperty(key, value);
} }
@Override
public ClientMessageImpl putStringProperty(final SimpleString key, final String value) {
return (ClientMessageImpl) super.putStringProperty(key, value);
}
@Override @Override
public ClientMessageImpl putObjectProperty(final SimpleString key, public ClientMessageImpl putObjectProperty(final SimpleString key,
final Object value) throws ActiveMQPropertyConversionException { final Object value) throws ActiveMQPropertyConversionException {

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -148,6 +149,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private final Executor closeExecutor; private final Executor closeExecutor;
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory, ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
final String name, final String name,
final String username, final String username,
@ -869,7 +872,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final long expiration, final long expiration,
final long timestamp, final long timestamp,
final byte priority) { final byte priority) {
return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize); return new ClientMessageImpl(type, durable, expiration, timestamp, priority, initialMessagePacketSize, coreMessageObjectPools);
} }
@Override @Override

View File

@ -93,18 +93,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
protected volatile TypedProperties properties; protected volatile TypedProperties properties;
private final SimpleString.Interner keysInterner; private final CoreMessageObjectPools coreMessageObjectPools;
private final TypedProperties.StringValue.Interner valuesInterner;
public CoreMessage(final SimpleString.Interner keysInterner, public CoreMessage(final CoreMessageObjectPools coreMessageObjectPools) {
final TypedProperties.StringValue.Interner valuesInterner) { this.coreMessageObjectPools = coreMessageObjectPools;
this.keysInterner = keysInterner;
this.valuesInterner = valuesInterner;
} }
public CoreMessage() { public CoreMessage() {
this.keysInterner = null; this.coreMessageObjectPools = null;
this.valuesInterner = null;
} }
/** On core there's no delivery annotation */ /** On core there's no delivery annotation */
@ -326,10 +322,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
} }
public CoreMessage(long id, int bufferSize) { public CoreMessage(long id, int bufferSize) {
this(id, bufferSize, null);
}
public CoreMessage(long id, int bufferSize, CoreMessageObjectPools coreMessageObjectPools) {
this.initBuffer(bufferSize); this.initBuffer(bufferSize);
this.setMessageID(id); this.setMessageID(id);
this.keysInterner = null; this.coreMessageObjectPools = coreMessageObjectPools;
this.valuesInterner = null;
} }
protected CoreMessage(CoreMessage other, TypedProperties copyProperties) { protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@ -343,8 +342,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
this.timestamp = other.timestamp; this.timestamp = other.timestamp;
this.priority = other.priority; this.priority = other.priority;
this.userID = other.userID; this.userID = other.userID;
this.keysInterner = other.keysInterner; this.coreMessageObjectPools = other.coreMessageObjectPools;
this.valuesInterner = other.valuesInterner;
if (copyProperties != null) { if (copyProperties != null) {
this.properties = new TypedProperties(copyProperties); this.properties = new TypedProperties(copyProperties);
} }
@ -424,7 +422,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public CoreMessage setValidatedUserID(String validatedUserID) { public CoreMessage setValidatedUserID(String validatedUserID) {
putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID)); putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUserID, getPropertyValuesPool()));
return this; return this;
} }
@ -479,7 +477,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
TypedProperties properties = new TypedProperties(); TypedProperties properties = new TypedProperties();
if (buffer != null && propertiesLocation >= 0) { if (buffer != null && propertiesLocation >= 0) {
final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
properties.decode(byteBuf, keysInterner, valuesInterner); properties.decode(byteBuf, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
} }
this.properties = properties; this.properties = properties;
} }
@ -543,17 +541,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) { private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
messageIDPosition = buffer.readerIndex(); messageIDPosition = buffer.readerIndex();
messageID = buffer.readLong(); messageID = buffer.readLong();
int b = buffer.readByte();
if (b != DataConstants.NULL) { address = SimpleString.readNullableSimpleString(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressDecoderPool());
final int length = buffer.readInt();
if (keysInterner != null) {
address = keysInterner.intern(buffer, length);
} else {
address = SimpleString.readSimpleString(buffer, length);
}
} else {
address = null;
}
if (buffer.readByte() == DataConstants.NOT_NULL) { if (buffer.readByte() == DataConstants.NOT_NULL) {
byte[] bytes = new byte[16]; byte[] bytes = new byte[16];
buffer.readBytes(bytes); buffer.readBytes(bytes);
@ -571,7 +560,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
propertiesLocation = buffer.readerIndex(); propertiesLocation = buffer.readerIndex();
} else { } else {
properties = new TypedProperties(); properties = new TypedProperties();
properties.decode(buffer, keysInterner, valuesInterner); properties.decode(buffer, coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesDecoderPools());
} }
} }
@ -671,7 +660,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public CoreMessage setAddress(String address) { public CoreMessage setAddress(String address) {
messageChanged(); messageChanged();
this.address = SimpleString.toSimpleString(address); this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
return this; return this;
} }
@ -703,7 +692,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putBooleanProperty(final String key, final boolean value) { public CoreMessage putBooleanProperty(final String key, final boolean value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putBooleanProperty(new SimpleString(key), value); properties.putBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -724,7 +713,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException { public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); checkProperties();
return properties.getBooleanProperty(new SimpleString(key)); return properties.getBooleanProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -739,7 +728,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putByteProperty(final String key, final byte value) { public CoreMessage putByteProperty(final String key, final byte value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putByteProperty(new SimpleString(key), value); properties.putByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -752,7 +741,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException { public Byte getByteProperty(final String key) throws ActiveMQPropertyConversionException {
return getByteProperty(SimpleString.toSimpleString(key)); return getByteProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -768,7 +757,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putBytesProperty(final String key, final byte[] value) { public CoreMessage putBytesProperty(final String key, final byte[] value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putBytesProperty(new SimpleString(key), value); properties.putBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -780,7 +769,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException { public byte[] getBytesProperty(final String key) throws ActiveMQPropertyConversionException {
return getBytesProperty(new SimpleString(key)); return getBytesProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -795,7 +784,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putCharProperty(String key, char value) { public CoreMessage putCharProperty(String key, char value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putCharProperty(new SimpleString(key), value); properties.putCharProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -811,7 +800,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putShortProperty(final String key, final short value) { public CoreMessage putShortProperty(final String key, final short value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putShortProperty(new SimpleString(key), value); properties.putShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -827,7 +816,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putIntProperty(final String key, final int value) { public CoreMessage putIntProperty(final String key, final int value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putIntProperty(new SimpleString(key), value); properties.putIntProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -854,7 +843,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putLongProperty(final String key, final long value) { public CoreMessage putLongProperty(final String key, final long value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putLongProperty(new SimpleString(key), value); properties.putLongProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -882,7 +871,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putFloatProperty(final String key, final float value) { public CoreMessage putFloatProperty(final String key, final float value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putFloatProperty(new SimpleString(key), value); properties.putFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -898,7 +887,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage putDoubleProperty(final String key, final double value) { public CoreMessage putDoubleProperty(final String key, final double value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putDoubleProperty(new SimpleString(key), value); properties.putDoubleProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -923,11 +912,20 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this; return this;
} }
@Override
public CoreMessage putStringProperty(final SimpleString key, final String value) {
messageChanged();
checkProperties();
properties.putSimpleStringProperty(key, SimpleString.toSimpleString(value, getPropertyValuesPool()));
return this;
}
@Override @Override
public CoreMessage putStringProperty(final String key, final String value) { public CoreMessage putStringProperty(final String key, final String value) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value)); properties.putSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), SimpleString.toSimpleString(value, getPropertyValuesPool()));
return this; return this;
} }
@ -943,7 +941,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public Object getObjectProperty(final String key) { public Object getObjectProperty(final String key) {
checkProperties(); checkProperties();
return getObjectProperty(SimpleString.toSimpleString(key)); return getObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -955,7 +953,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException { public CoreMessage putObjectProperty(final String key, final Object value) throws ActiveMQPropertyConversionException {
messageChanged(); messageChanged();
putObjectProperty(new SimpleString(key), value); putObjectProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()), value);
return this; return this;
} }
@ -968,7 +966,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException { public Short getShortProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); checkProperties();
return properties.getShortProperty(new SimpleString(key)); return properties.getShortProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -980,7 +978,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException { public Float getFloatProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); checkProperties();
return properties.getFloatProperty(new SimpleString(key)); return properties.getFloatProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -996,7 +994,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public String getStringProperty(final String key) throws ActiveMQPropertyConversionException { public String getStringProperty(final String key) throws ActiveMQPropertyConversionException {
return getStringProperty(new SimpleString(key)); return getStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -1008,7 +1006,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException { public SimpleString getSimpleStringProperty(final String key) throws ActiveMQPropertyConversionException {
checkProperties(); checkProperties();
return properties.getSimpleStringProperty(new SimpleString(key)); return properties.getSimpleStringProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -1025,7 +1023,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public Object removeProperty(final String key) { public Object removeProperty(final String key) {
messageChanged(); messageChanged();
checkProperties(); checkProperties();
Object oldValue = properties.removeProperty(new SimpleString(key)); Object oldValue = properties.removeProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
if (oldValue != null) { if (oldValue != null) {
messageChanged(); messageChanged();
} }
@ -1041,7 +1039,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override @Override
public boolean containsProperty(final String key) { public boolean containsProperty(final String key) {
checkProperties(); checkProperties();
return properties.containsProperty(new SimpleString(key)); return properties.containsProperty(SimpleString.toSimpleString(key, getPropertyKeysPool()));
} }
@Override @Override
@ -1115,6 +1113,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this; return this;
} }
@Override
public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return this;
}
@Override @Override
public String toString() { public String toString() {
try { try {
@ -1135,4 +1138,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return new java.util.Date(timestamp).toString(); return new java.util.Date(timestamp).toString();
} }
} }
private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
}
private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
}
} }

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.message.impl;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
public class CoreMessageObjectPools {
private Supplier<SimpleString.ByteBufSimpleStringPool> addressDecoderPool = Suppliers.memoize(SimpleString.ByteBufSimpleStringPool::new);
private Supplier<TypedProperties.TypedPropertiesDecoderPools> propertiesDecoderPools = Suppliers.memoize(TypedProperties.TypedPropertiesDecoderPools::new);
private Supplier<SimpleString.StringSimpleStringPool> groupIdStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
private Supplier<SimpleString.StringSimpleStringPool> addressStringSimpleStringPool = Suppliers.memoize(SimpleString.StringSimpleStringPool::new);
private Supplier<TypedProperties.TypedPropertiesStringSimpleStringPools> propertiesStringSimpleStringPools = Suppliers.memoize(TypedProperties.TypedPropertiesStringSimpleStringPools::new);
public CoreMessageObjectPools() {
}
public SimpleString.ByteBufSimpleStringPool getAddressDecoderPool() {
return addressDecoderPool.get();
}
public SimpleString.StringSimpleStringPool getAddressStringSimpleStringPool() {
return addressStringSimpleStringPool.get();
}
public SimpleString.StringSimpleStringPool getGroupIdStringSimpleStringPool() {
return groupIdStringSimpleStringPool.get();
}
public TypedProperties.TypedPropertiesDecoderPools getPropertiesDecoderPools() {
return propertiesDecoderPools.get();
}
public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() {
return propertiesStringSimpleStringPools.get();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder; import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@ -32,11 +33,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ClientPacketDecoder extends PacketDecoder { public class ClientPacketDecoder extends PacketDecoder {
private static final long serialVersionUID = 6952614096979334582L; private static final long serialVersionUID = 6952614096979334582L;
public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder(); protected final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
protected ClientPacketDecoder() {
}
@Override @Override
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) { public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
@ -56,9 +53,9 @@ public class ClientPacketDecoder extends PacketDecoder {
switch (packetType) { switch (packetType) {
case SESS_RECEIVE_MSG: { case SESS_RECEIVE_MSG: {
if (connection.isVersionBeforeAddressChange()) { if (connection.isVersionBeforeAddressChange()) {
packet = new SessionReceiveMessage_1X(new ClientMessageImpl()); packet = new SessionReceiveMessage_1X(new ClientMessageImpl(coreMessageObjectPools));
} else { } else {
packet = new SessionReceiveMessage(new ClientMessageImpl()); packet = new SessionReceiveMessage(new ClientMessageImpl(coreMessageObjectPools));
} }
break; break;
} }

View File

@ -511,7 +511,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
} }
protected PacketDecoder createPacketDecoder() { protected PacketDecoder createPacketDecoder() {
return ClientPacketDecoder.INSTANCE; return new ClientPacketDecoder();
} }
private void forceReturnChannel1(ActiveMQException cause) { private void forceReturnChannel1(ActiveMQException cause) {

View File

@ -109,12 +109,18 @@ public class MessageUtil {
return message.getSimpleStringProperty(REPLYTO_HEADER_NAME); return message.getSimpleStringProperty(REPLYTO_HEADER_NAME);
} }
public static void setJMSReplyTo(Message message, final SimpleString dest) { public static void setJMSReplyTo(Message message, final String dest) {
if (dest == null) { if (dest == null) {
message.removeProperty(REPLYTO_HEADER_NAME); message.removeProperty(REPLYTO_HEADER_NAME);
} else { } else {
message.putStringProperty(REPLYTO_HEADER_NAME, dest);
}
}
public static void setJMSReplyTo(Message message, final SimpleString dest) {
if (dest == null) {
message.removeProperty(REPLYTO_HEADER_NAME);
} else {
message.putStringProperty(REPLYTO_HEADER_NAME, dest); message.putStringProperty(REPLYTO_HEADER_NAME, dest);
} }
} }

View File

@ -337,7 +337,7 @@ public class CoreMessageTest {
public String generate(String body) throws Exception { public String generate(String body) throws Exception {
ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024); ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024, null);
TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body)); TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body));
message.setAddress(ADDRESS); message.setAddress(ADDRESS);

View File

@ -99,26 +99,28 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
} }
} }
public static String createQueueNameForSubscription(final boolean isDurable, public static SimpleString createQueueNameForSubscription(final boolean isDurable,
final String clientID, final String clientID,
final String subscriptionName) { final String subscriptionName) {
final String queueName;
if (clientID != null) { if (clientID != null) {
if (isDurable) { if (isDurable) {
return ActiveMQDestination.escape(clientID) + SEPARATOR + queueName = ActiveMQDestination.escape(clientID) + SEPARATOR +
ActiveMQDestination.escape(subscriptionName); ActiveMQDestination.escape(subscriptionName);
} else { } else {
return "nonDurable" + SEPARATOR + queueName = "nonDurable" + SEPARATOR +
ActiveMQDestination.escape(clientID) + SEPARATOR + ActiveMQDestination.escape(clientID) + SEPARATOR +
ActiveMQDestination.escape(subscriptionName); ActiveMQDestination.escape(subscriptionName);
} }
} else { } else {
if (isDurable) { if (isDurable) {
return ActiveMQDestination.escape(subscriptionName); queueName = ActiveMQDestination.escape(subscriptionName);
} else { } else {
return "nonDurable" + SEPARATOR + queueName = "nonDurable" + SEPARATOR +
ActiveMQDestination.escape(subscriptionName); ActiveMQDestination.escape(subscriptionName);
} }
} }
return SimpleString.toSimpleString(queueName);
} }
public static String createQueueNameForSharedSubscription(final boolean isDurable, public static String createQueueNameForSharedSubscription(final boolean isDurable,
@ -192,10 +194,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return new ActiveMQQueue(address); return new ActiveMQQueue(address);
} }
public static ActiveMQQueue createQueue(final SimpleString address) {
return new ActiveMQQueue(address);
}
public static ActiveMQTopic createTopic(final String address) { public static ActiveMQTopic createTopic(final String address) {
return new ActiveMQTopic(address); return new ActiveMQTopic(address);
} }
public static ActiveMQTopic createTopic(final SimpleString address) {
return new ActiveMQTopic(address);
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final String address, final ActiveMQSession session) { public static ActiveMQTemporaryQueue createTemporaryQueue(final String address, final ActiveMQSession session) {
return new ActiveMQTemporaryQueue(address, session); return new ActiveMQTemporaryQueue(address, session);
} }

View File

@ -372,7 +372,7 @@ public class ActiveMQMessage implements javax.jms.Message {
public void setJMSReplyTo(final Destination dest) throws JMSException { public void setJMSReplyTo(final Destination dest) throws JMSException {
if (dest == null) { if (dest == null) {
MessageUtil.setJMSReplyTo(message, null); MessageUtil.setJMSReplyTo(message, (String) null);
replyTo = null; replyTo = null;
} else { } else {
if (dest instanceof ActiveMQDestination == false) { if (dest instanceof ActiveMQDestination == false) {
@ -391,7 +391,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} }
ActiveMQDestination jbd = (ActiveMQDestination) dest; ActiveMQDestination jbd = (ActiveMQDestination) dest;
MessageUtil.setJMSReplyTo(message, SimpleString.toSimpleString(prefix + jbd.getAddress())); MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
replyTo = jbd; replyTo = jbd;
} }
@ -401,14 +401,15 @@ public class ActiveMQMessage implements javax.jms.Message {
public Destination getJMSDestination() throws JMSException { public Destination getJMSDestination() throws JMSException {
if (dest == null) { if (dest == null) {
SimpleString address = message.getAddressSimpleString(); SimpleString address = message.getAddressSimpleString();
String prefix = ""; if (address == null) {
if (RoutingType.ANYCAST.equals(message.getRoutingType())) { dest = null;
prefix = QUEUE_QUALIFIED_PREFIX; } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createQueue(address);
} else if (RoutingType.MULTICAST.equals(message.getRoutingType())) { } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
prefix = TOPIC_QUALIFIED_PREFIX; dest = ActiveMQDestination.createTopic(address);
} else {
dest = ActiveMQDestination.fromPrefixedName(address.toString());
} }
dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());
} }
return dest; return dest;
@ -513,7 +514,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override @Override
public boolean getBooleanProperty(final String name) throws JMSException { public boolean getBooleanProperty(final String name) throws JMSException {
try { try {
return message.getBooleanProperty(new SimpleString(name)); return message.getBooleanProperty(name);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
} }
@ -522,7 +523,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override @Override
public byte getByteProperty(final String name) throws JMSException { public byte getByteProperty(final String name) throws JMSException {
try { try {
return message.getByteProperty(new SimpleString(name)); return message.getByteProperty(name);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
} }
@ -531,7 +532,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override @Override
public short getShortProperty(final String name) throws JMSException { public short getShortProperty(final String name) throws JMSException {
try { try {
return message.getShortProperty(new SimpleString(name)); return message.getShortProperty(name);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
} }
@ -544,7 +545,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} }
try { try {
return message.getIntProperty(new SimpleString(name)); return message.getIntProperty(name);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
} }
@ -557,7 +558,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} }
try { try {
return message.getLongProperty(new SimpleString(name)); return message.getLongProperty(name);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
} }
@ -566,7 +567,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override @Override
public float getFloatProperty(final String name) throws JMSException { public float getFloatProperty(final String name) throws JMSException {
try { try {
return message.getFloatProperty(new SimpleString(name)); return message.getFloatProperty(name);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
} }
@ -575,7 +576,7 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override @Override
public double getDoubleProperty(final String name) throws JMSException { public double getDoubleProperty(final String name) throws JMSException {
try { try {
return message.getDoubleProperty(new SimpleString(name)); return message.getDoubleProperty(name);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
} }
@ -593,7 +594,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} else if (MessageUtil.JMSXUSERID.equals(name)) { } else if (MessageUtil.JMSXUSERID.equals(name)) {
return message.getValidatedUserID(); return message.getValidatedUserID();
} else { } else {
return message.getStringProperty(new SimpleString(name)); return message.getStringProperty(name);
} }
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
@ -608,7 +609,7 @@ public class ActiveMQMessage implements javax.jms.Message {
Object val = message.getObjectProperty(name); Object val = message.getObjectProperty(name);
if (val instanceof SimpleString) { if (val instanceof SimpleString) {
val = ((SimpleString) val).toString(); val = val.toString();
} }
return val; return val;
} }
@ -622,43 +623,43 @@ public class ActiveMQMessage implements javax.jms.Message {
public void setBooleanProperty(final String name, final boolean value) throws JMSException { public void setBooleanProperty(final String name, final boolean value) throws JMSException {
checkProperty(name); checkProperty(name);
message.putBooleanProperty(new SimpleString(name), value); message.putBooleanProperty(name, value);
} }
@Override @Override
public void setByteProperty(final String name, final byte value) throws JMSException { public void setByteProperty(final String name, final byte value) throws JMSException {
checkProperty(name); checkProperty(name);
message.putByteProperty(new SimpleString(name), value); message.putByteProperty(name, value);
} }
@Override @Override
public void setShortProperty(final String name, final short value) throws JMSException { public void setShortProperty(final String name, final short value) throws JMSException {
checkProperty(name); checkProperty(name);
message.putShortProperty(new SimpleString(name), value); message.putShortProperty(name, value);
} }
@Override @Override
public void setIntProperty(final String name, final int value) throws JMSException { public void setIntProperty(final String name, final int value) throws JMSException {
checkProperty(name); checkProperty(name);
message.putIntProperty(new SimpleString(name), value); message.putIntProperty(name, value);
} }
@Override @Override
public void setLongProperty(final String name, final long value) throws JMSException { public void setLongProperty(final String name, final long value) throws JMSException {
checkProperty(name); checkProperty(name);
message.putLongProperty(new SimpleString(name), value); message.putLongProperty(name, value);
} }
@Override @Override
public void setFloatProperty(final String name, final float value) throws JMSException { public void setFloatProperty(final String name, final float value) throws JMSException {
checkProperty(name); checkProperty(name);
message.putFloatProperty(new SimpleString(name), value); message.putFloatProperty(name, value);
} }
@Override @Override
public void setDoubleProperty(final String name, final double value) throws JMSException { public void setDoubleProperty(final String name, final double value) throws JMSException {
checkProperty(name); checkProperty(name);
message.putDoubleProperty(new SimpleString(name), value); message.putDoubleProperty(name, value);
} }
@Override @Override
@ -670,7 +671,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) { } else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
return; return;
} else { } else {
message.putStringProperty(new SimpleString(name), SimpleString.toSimpleString(value)); message.putStringProperty(name, value);
} }
} }
@ -703,7 +704,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} }
try { try {
message.putObjectProperty(new SimpleString(name), value); message.putObjectProperty(name, value);
} catch (ActiveMQPropertyConversionException e) { } catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage()); throw new MessageFormatException(e.getMessage());
} }
@ -964,7 +965,7 @@ public class ActiveMQMessage implements javax.jms.Message {
boolean result = false; boolean result = false;
if (jmsPropertyName.equals(name)) { if (jmsPropertyName.equals(name)) {
message.putStringProperty(corePropertyName, SimpleString.toSimpleString(value.toString())); message.putStringProperty(corePropertyName, value.toString());
result = true; result = true;
} }

View File

@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client;
import javax.jms.Queue; import javax.jms.Queue;
import org.apache.activemq.artemis.api.core.SimpleString;
/** /**
* ActiveMQ Artemis implementation of a JMS Queue. * ActiveMQ Artemis implementation of a JMS Queue.
* <br> * <br>
@ -34,13 +36,17 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
// Constructors -------------------------------------------------- // Constructors --------------------------------------------------
public ActiveMQQueue() { public ActiveMQQueue() {
this(null); this((SimpleString) null);
} }
public ActiveMQQueue(final String address) { public ActiveMQQueue(final String address) {
super(address, TYPE.QUEUE, null); super(address, TYPE.QUEUE, null);
} }
public ActiveMQQueue(final SimpleString address) {
super(address, TYPE.QUEUE, null);
}
public ActiveMQQueue(final String address, boolean temporary) { public ActiveMQQueue(final String address, boolean temporary) {
super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null); super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null);
} }

View File

@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
} }
queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName)); queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName);
if (durability == ConsumerDurability.DURABLE) { if (durability == ConsumerDurability.DURABLE) {
try { try {
@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
} }
queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName)); queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName);
QueueQuery subResponse = session.queueQuery(queueName); QueueQuery subResponse = session.queueQuery(queueName);
@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new IllegalStateException("Cannot unsubscribe using a QueueSession"); throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
} }
SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name)); SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name);
try { try {
QueueQuery response = session.queueQuery(queueName); QueueQuery response = session.queueQuery(queueName);

View File

@ -73,7 +73,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
// For testing only // For testing only
public ActiveMQStreamMessage() { public ActiveMQStreamMessage() {
message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500); message = new ClientMessageImpl((byte) 0, false, 0, 0, (byte) 4, 1500, null);
} }
// Public -------------------------------------------------------- // Public --------------------------------------------------------

View File

@ -18,6 +18,8 @@ package org.apache.activemq.artemis.jms.client;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.SimpleString;
/** /**
* ActiveMQ Artemis implementation of a JMS Topic. * ActiveMQ Artemis implementation of a JMS Topic.
* <br> * <br>
@ -33,13 +35,17 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
// Constructors -------------------------------------------------- // Constructors --------------------------------------------------
public ActiveMQTopic() { public ActiveMQTopic() {
this(null); this((SimpleString) null);
} }
public ActiveMQTopic(final String address) { public ActiveMQTopic(final String address) {
this(address, false); this(address, false);
} }
public ActiveMQTopic(final SimpleString address) {
super(address, TYPE.TOPIC, null);
}
public ActiveMQTopic(final String address, boolean temporary) { public ActiveMQTopic(final String address, boolean temporary) {
super(address, TYPE.TOPIC, null); super(address, TYPE.TOPIC, null);
} }

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
@ -70,7 +71,7 @@ public class AMQPMessage extends RefCountMessage {
boolean bufferValid; boolean bufferValid;
Boolean durable; Boolean durable;
long messageID; long messageID;
String address; SimpleString address;
MessageImpl protonMessage; MessageImpl protonMessage;
private volatile int memoryEstimate = -1; private volatile int memoryEstimate = -1;
private long expiration = 0; private long expiration = 0;
@ -90,6 +91,7 @@ public class AMQPMessage extends RefCountMessage {
private ApplicationProperties applicationProperties; private ApplicationProperties applicationProperties;
private long scheduledTime = -1; private long scheduledTime = -1;
private String connectionID; private String connectionID;
private final CoreMessageObjectPools coreMessageObjectPools;
Set<Object> rejectedConsumers; Set<Object> rejectedConsumers;
@ -98,9 +100,14 @@ public class AMQPMessage extends RefCountMessage {
private volatile TypedProperties extraProperties; private volatile TypedProperties extraProperties;
public AMQPMessage(long messageFormat, byte[] data) { public AMQPMessage(long messageFormat, byte[] data) {
this(messageFormat, data, null);
}
public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
this.data = Unpooled.wrappedBuffer(data); this.data = Unpooled.wrappedBuffer(data);
this.messageFormat = messageFormat; this.messageFormat = messageFormat;
this.bufferValid = true; this.bufferValid = true;
this.coreMessageObjectPools = coreMessageObjectPools;
parseHeaders(); parseHeaders();
} }
@ -108,12 +115,14 @@ public class AMQPMessage extends RefCountMessage {
public AMQPMessage(long messageFormat) { public AMQPMessage(long messageFormat) {
this.messageFormat = messageFormat; this.messageFormat = messageFormat;
this.bufferValid = false; this.bufferValid = false;
this.coreMessageObjectPools = null;
} }
public AMQPMessage(long messageFormat, Message message) { public AMQPMessage(long messageFormat, Message message) {
this.messageFormat = messageFormat; this.messageFormat = messageFormat;
this.protonMessage = (MessageImpl) message; this.protonMessage = (MessageImpl) message;
this.bufferValid = false; this.bufferValid = false;
this.coreMessageObjectPools = null;
} }
public AMQPMessage(Message message) { public AMQPMessage(Message message) {
@ -301,7 +310,7 @@ public class AMQPMessage extends RefCountMessage {
parseHeaders(); parseHeaders();
if (_properties != null && _properties.getGroupId() != null) { if (_properties != null && _properties.getGroupId() != null) {
return SimpleString.toSimpleString(_properties.getGroupId()); return SimpleString.toSimpleString(_properties.getGroupId(), coreMessageObjectPools == null ? null : coreMessageObjectPools.getGroupIdStringSimpleStringPool());
} else { } else {
return null; return null;
} }
@ -588,36 +597,33 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public String getAddress() { public String getAddress() {
if (address == null) { SimpleString addressSimpleString = getAddressSimpleString();
Properties properties = getProtonMessage().getProperties(); return addressSimpleString == null ? null : addressSimpleString.toString();
if (properties != null) {
return properties.getTo();
} else {
return null;
}
} else {
return address;
}
} }
@Override @Override
public AMQPMessage setAddress(String address) { public AMQPMessage setAddress(String address) {
this.address = address; this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
return this; return this;
} }
@Override @Override
public AMQPMessage setAddress(SimpleString address) { public AMQPMessage setAddress(SimpleString address) {
if (address != null) { this.address = address;
return setAddress(address.toString()); return this;
} else {
return setAddress((String) null);
}
} }
@Override @Override
public SimpleString getAddressSimpleString() { public SimpleString getAddressSimpleString() {
return SimpleString.toSimpleString(getAddress()); if (address == null) {
Properties properties = getProtonMessage().getProperties();
if (properties != null) {
setAddress(properties.getTo());
} else {
return null;
}
}
return address;
} }
@Override @Override
@ -977,7 +983,7 @@ public class AMQPMessage extends RefCountMessage {
if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties); if (applicationProperties != null) getProtonMessage().setApplicationProperties(applicationProperties);
if (_properties != null) { if (_properties != null) {
if (address != null) { if (address != null) {
_properties.setTo(address); _properties.setTo(address.toString());
} }
getProtonMessage().setProperties(this._properties); getProtonMessage().setProperties(this._properties);
} }
@ -987,7 +993,7 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key)); return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key), getPropertyValuesPool());
} }
@Override @Override
@ -1065,11 +1071,16 @@ public class AMQPMessage extends RefCountMessage {
return putStringProperty(key.toString(), value.toString()); return putStringProperty(key.toString(), value.toString());
} }
@Override
public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, String value) {
return putStringProperty(key.toString(), value);
}
@Override @Override
public Set<SimpleString> getPropertyNames() { public Set<SimpleString> getPropertyNames() {
HashSet<SimpleString> values = new HashSet<>(); HashSet<SimpleString> values = new HashSet<>();
for (Object k : getApplicationPropertiesMap().keySet()) { for (Object k : getApplicationPropertiesMap().keySet()) {
values.add(SimpleString.toSimpleString(k.toString())); values.add(SimpleString.toSimpleString(k.toString(), getPropertyKeysPool()));
} }
return values; return values;
} }
@ -1084,17 +1095,22 @@ public class AMQPMessage extends RefCountMessage {
} }
@Override @Override
public ICoreMessage toCore() { public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
try { try {
return AMQPConverter.getInstance().toCore(this); return AMQPConverter.getInstance().toCore(this, coreMessageObjectPools);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e);
} }
} }
@Override
public ICoreMessage toCore() {
return toCore(null);
}
@Override @Override
public SimpleString getLastValueProperty() { public SimpleString getLastValueProperty() {
return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString()); return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
} }
@Override @Override
@ -1155,4 +1171,12 @@ public class AMQPMessage extends RefCountMessage {
", address=" + getAddress() + ", address=" + getAddress() +
"]"; "]";
} }
private SimpleString.StringSimpleStringPool getPropertyKeysPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyKeysPool();
}
private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
}
} }

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -101,6 +102,7 @@ public class AMQPSessionCallback implements SessionCallback {
private final AtomicBoolean draining = new AtomicBoolean(false); private final AtomicBoolean draining = new AtomicBoolean(false);
private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>(); private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
@ -210,14 +212,14 @@ public class AMQPSessionCallback implements SessionCallback {
} }
public Object createSender(ProtonServerSenderContext protonSender, public Object createSender(ProtonServerSenderContext protonSender,
String queue, SimpleString queue,
String filter, String filter,
boolean browserOnly) throws Exception { boolean browserOnly) throws Exception {
long consumerID = consumerIDGenerator.generateID(); long consumerID = consumerIDGenerator.generateID();
filter = SelectorTranslator.convertToActiveMQFilterString(filter); filter = SelectorTranslator.convertToActiveMQFilterString(filter);
ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null); ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null);
// AMQP handles its own flow control for when it's started // AMQP handles its own flow control for when it's started
consumer.setStarted(true); consumer.setStarted(true);
@ -233,48 +235,48 @@ public class AMQPSessionCallback implements SessionCallback {
serverConsumer.receiveCredits(-1); serverConsumer.receiveCredits(-1);
} }
public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception { public void createTemporaryQueue(SimpleString queueName, RoutingType routingType) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false); serverSession.createQueue(queueName, queueName, routingType, null, true, false);
} }
public void createTemporaryQueue(String address, public void createTemporaryQueue(SimpleString address,
String queueName, SimpleString queueName,
RoutingType routingType, RoutingType routingType,
String filter) throws Exception { SimpleString filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false); serverSession.createQueue(address, queueName, routingType, filter, true, false);
} }
public void createUnsharedDurableQueue(String address, public void createUnsharedDurableQueue(SimpleString address,
RoutingType routingType, RoutingType routingType,
String queueName, SimpleString queueName,
String filter) throws Exception { SimpleString filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false); serverSession.createQueue(address, queueName, routingType, filter, false, true, 1, false, false);
} }
public void createSharedDurableQueue(String address, public void createSharedDurableQueue(SimpleString address,
RoutingType routingType, RoutingType routingType,
String queueName, SimpleString queueName,
String filter) throws Exception { SimpleString filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false); serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false);
} }
public void createSharedVolatileQueue(String address, public void createSharedVolatileQueue(SimpleString address,
RoutingType routingType, RoutingType routingType,
String queueName, SimpleString queueName,
String filter) throws Exception { SimpleString filter) throws Exception {
serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true); serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
} }
public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception { public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName);
if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
try { try {
serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true, true); serverSession.createQueue(queueName, queueName, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing. // The queue may have been created by another thread in the mean time. Catch and do nothing.
} }
queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); queueQueryResult = serverSession.executeQueueQuery(queueName);
} }
// if auto-create we will return whatever type was used before // if auto-create we will return whatever type was used before
@ -287,32 +289,31 @@ public class AMQPSessionCallback implements SessionCallback {
public boolean bindingQuery(String address, RoutingType routingType) throws Exception { public boolean bindingQuery(SimpleString address, RoutingType routingType) throws Exception {
BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address); BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address);
if (bindingQueryResult != null) { if (bindingQueryResult != null) {
return bindingQueryResult.isExists(); return bindingQueryResult.isExists();
} }
SimpleString simpleAddress = SimpleString.toSimpleString(address); bindingQueryResult = serverSession.executeBindingQuery(address);
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) { if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
try { try {
serverSession.createAddress(simpleAddress, routingType, true); serverSession.createAddress(address, routingType, true);
} catch (ActiveMQAddressExistsException e) { } catch (ActiveMQAddressExistsException e) {
// The address may have been created by another thread in the mean time. Catch and do nothing. // The address may have been created by another thread in the mean time. Catch and do nothing.
} }
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); bindingQueryResult = serverSession.executeBindingQuery(address);
} else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) { } else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) {
QueueQueryResult queueBinding = serverSession.executeQueueQuery(simpleAddress); QueueQueryResult queueBinding = serverSession.executeQueueQuery(address);
if (!queueBinding.isExists()) { if (!queueBinding.isExists()) {
try { try {
serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true, true); serverSession.createQueue(address, address, routingType, null, false, true, true);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing. // The queue may have been created by another thread in the mean time. Catch and do nothing.
} }
} }
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); bindingQueryResult = serverSession.executeBindingQuery(address);
} }
bindingQueryCache.setResult(address, bindingQueryResult); bindingQueryCache.setResult(address, bindingQueryResult);
@ -320,7 +321,7 @@ public class AMQPSessionCallback implements SessionCallback {
} }
public AddressQueryResult addressQuery(String addressName, public AddressQueryResult addressQuery(SimpleString addressName,
RoutingType routingType, RoutingType routingType,
boolean autoCreate) throws Exception { boolean autoCreate) throws Exception {
@ -329,15 +330,15 @@ public class AMQPSessionCallback implements SessionCallback {
return addressQueryResult; return addressQueryResult;
} }
addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); addressQueryResult = serverSession.executeAddressQuery(addressName);
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) { if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
try { try {
serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true); serverSession.createAddress(addressName, routingType, true);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing. // The queue may have been created by another thread in the mean time. Catch and do nothing.
} }
addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); addressQueryResult = serverSession.executeAddressQuery(addressName);
} }
addressQueryCache.setResult(addressName, addressQueryResult); addressQueryCache.setResult(addressName, addressQueryResult);
@ -438,15 +439,15 @@ public class AMQPSessionCallback implements SessionCallback {
final Transaction transaction, final Transaction transaction,
final Receiver receiver, final Receiver receiver,
final Delivery delivery, final Delivery delivery,
String address, SimpleString address,
int messageFormat, int messageFormat,
byte[] data) throws Exception { byte[] data) throws Exception {
AMQPMessage message = new AMQPMessage(messageFormat, data); AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
if (address != null) { if (address != null) {
message.setAddress(new SimpleString(address)); message.setAddress(address);
} else { } else {
// Anonymous relay must set a To value // Anonymous relay must set a To value
address = message.getAddress(); address = message.getAddressSimpleString();
if (address == null) { if (address == null) {
rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return; return;
@ -552,7 +553,7 @@ public class AMQPSessionCallback implements SessionCallback {
}); });
} }
public void offerProducerCredit(final String address, public void offerProducerCredit(final SimpleString address,
final int credits, final int credits,
final int threshold, final int threshold,
final Receiver receiver) { final Receiver receiver) {
@ -567,7 +568,7 @@ public class AMQPSessionCallback implements SessionCallback {
connection.flush(); connection.flush();
return; return;
} }
final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address)); final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
store.checkMemory(new Runnable() { store.checkMemory(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -587,8 +588,8 @@ public class AMQPSessionCallback implements SessionCallback {
} }
} }
public void deleteQueue(String queueName) throws Exception { public void deleteQueue(SimpleString queueName) throws Exception {
manager.getServer().destroyQueue(new SimpleString(queueName)); manager.getServer().destroyQueue(queueName);
} }
public void resetContext(OperationContext oldContext) { public void resetContext(OperationContext oldContext) {
@ -657,7 +658,7 @@ public class AMQPSessionCallback implements SessionCallback {
} }
@Override @Override
public void disconnect(ServerConsumer consumer, String queueName) { public void disconnect(ServerConsumer consumer, SimpleString queueName) {
ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName); ErrorCondition ec = new ErrorCondition(AmqpSupport.RESOURCE_DELETED, "Queue was deleted: " + queueName);
connection.lock(); connection.lock();
try { try {
@ -703,12 +704,12 @@ public class AMQPSessionCallback implements SessionCallback {
return serverSession.getAddress(address); return serverSession.getAddress(address);
} }
public void removeTemporaryQueue(String address) throws Exception { public void removeTemporaryQueue(SimpleString address) throws Exception {
serverSession.deleteQueue(SimpleString.toSimpleString(address)); serverSession.deleteQueue(address);
} }
public RoutingType getDefaultRoutingType(String address) { public RoutingType getDefaultRoutingType(SimpleString address) {
return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType(); return manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultAddressRoutingType();
} }
public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception { public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {
@ -733,10 +734,10 @@ public class AMQPSessionCallback implements SessionCallback {
class AddressQueryCache<T> { class AddressQueryCache<T> {
String address; SimpleString address;
T result; T result;
public synchronized T getResult(String parameterAddress) { public synchronized T getResult(SimpleString parameterAddress) {
if (address != null && address.equals(parameterAddress)) { if (address != null && address.equals(parameterAddress)) {
return result; return result;
} else { } else {
@ -746,7 +747,7 @@ public class AMQPSessionCallback implements SessionCallback {
} }
} }
public synchronized void setResult(String parameterAddress, T result) { public synchronized void setResult(SimpleString parameterAddress, T result) {
this.address = parameterAddress; this.address = parameterAddress;
this.result = result; this.result = result;
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.converter; package org.apache.activemq.artemis.protocol.amqp.converter;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
@ -38,7 +39,7 @@ public class AMQPConverter implements MessageConverter<AMQPMessage> {
} }
@Override @Override
public ICoreMessage toCore(AMQPMessage messageSource) throws Exception { public ICoreMessage toCore(AMQPMessage messageSource, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
return AmqpCoreConverter.toCore(messageSource); return AmqpCoreConverter.toCore(messageSource, coreMessageObjectPools);
} }
} }

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@ -242,56 +243,56 @@ public final class AMQPMessageSupport {
return null; return null;
} }
public static ServerJMSBytesMessage createBytesMessage(long id) { public static ServerJMSBytesMessage createBytesMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE)); return new ServerJMSBytesMessage(newMessage(id, BYTES_TYPE, coreMessageObjectPools));
} }
public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length) throws JMSException { public static ServerJMSBytesMessage createBytesMessage(long id, byte[] array, int arrayOffset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
ServerJMSBytesMessage message = createBytesMessage(id); ServerJMSBytesMessage message = createBytesMessage(id, coreMessageObjectPools);
message.writeBytes(array, arrayOffset, length); message.writeBytes(array, arrayOffset, length);
return message; return message;
} }
public static ServerJMSStreamMessage createStreamMessage(long id) { public static ServerJMSStreamMessage createStreamMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE)); return new ServerJMSStreamMessage(newMessage(id, STREAM_TYPE, coreMessageObjectPools));
} }
public static ServerJMSMessage createMessage(long id) { public static ServerJMSMessage createMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE)); return new ServerJMSMessage(newMessage(id, DEFAULT_TYPE, coreMessageObjectPools));
} }
public static ServerJMSTextMessage createTextMessage(long id) { public static ServerJMSTextMessage createTextMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE)); return new ServerJMSTextMessage(newMessage(id, TEXT_TYPE, coreMessageObjectPools));
} }
public static ServerJMSTextMessage createTextMessage(long id, String text) throws JMSException { public static ServerJMSTextMessage createTextMessage(long id, String text, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
ServerJMSTextMessage message = createTextMessage(id); ServerJMSTextMessage message = createTextMessage(id, coreMessageObjectPools);
message.setText(text); message.setText(text);
return message; return message;
} }
public static ServerJMSObjectMessage createObjectMessage(long id) { public static ServerJMSObjectMessage createObjectMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE)); return new ServerJMSObjectMessage(newMessage(id, OBJECT_TYPE, coreMessageObjectPools));
} }
public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm) throws JMSException { public static ServerJMSMessage createObjectMessage(long id, Binary serializedForm, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
ServerJMSObjectMessage message = createObjectMessage(id); ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
message.setSerializedForm(serializedForm); message.setSerializedForm(serializedForm);
return message; return message;
} }
public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length) throws JMSException { public static ServerJMSMessage createObjectMessage(long id, byte[] array, int offset, int length, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
ServerJMSObjectMessage message = createObjectMessage(id); ServerJMSObjectMessage message = createObjectMessage(id, coreMessageObjectPools);
message.setSerializedForm(new Binary(array, offset, length)); message.setSerializedForm(new Binary(array, offset, length));
return message; return message;
} }
public static ServerJMSMapMessage createMapMessage(long id) { public static ServerJMSMapMessage createMapMessage(long id, CoreMessageObjectPools coreMessageObjectPools) {
return new ServerJMSMapMessage(newMessage(id, MAP_TYPE)); return new ServerJMSMapMessage(newMessage(id, MAP_TYPE, coreMessageObjectPools));
} }
public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content) throws JMSException { public static ServerJMSMapMessage createMapMessage(long id, Map<String, Object> content, CoreMessageObjectPools coreMessageObjectPools) throws JMSException {
ServerJMSMapMessage message = createMapMessage(id); ServerJMSMapMessage message = createMapMessage(id, coreMessageObjectPools);
final Set<Map.Entry<String, Object>> set = content.entrySet(); final Set<Map.Entry<String, Object>> set = content.entrySet();
for (Map.Entry<String, Object> entry : set) { for (Map.Entry<String, Object> entry : set) {
Object value = entry.getValue(); Object value = entry.getValue();
@ -304,8 +305,8 @@ public final class AMQPMessageSupport {
return message; return message;
} }
private static CoreMessage newMessage(long id, byte messageType) { private static CoreMessage newMessage(long id, byte messageType, CoreMessageObjectPools coreMessageObjectPools) {
CoreMessage message = new CoreMessage(id, 512); CoreMessage message = new CoreMessage(id, 512, coreMessageObjectPools);
message.setType(messageType); message.setType(messageType);
// ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); // ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
return message; return message;

View File

@ -52,6 +52,7 @@ import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@ -89,31 +90,31 @@ import io.netty.buffer.PooledByteBufAllocator;
public class AmqpCoreConverter { public class AmqpCoreConverter {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message) throws Exception { public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
Section body = message.getProtonMessage().getBody(); Section body = message.getProtonMessage().getBody();
ServerJMSMessage result; ServerJMSMessage result;
if (body == null) { if (body == null) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID()); result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) { } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
result = createBytesMessage(message.getMessageID()); result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
} else { } else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
if (charset != null) { if (charset != null) {
result = createTextMessage(message.getMessageID()); result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
} else { } else {
result = createMessage(message.getMessageID()); result = createMessage(message.getMessageID(), coreMessageObjectPools);
} }
} }
} else if (body instanceof Data) { } else if (body instanceof Data) {
Binary payload = ((Data) body).getValue(); Binary payload = ((Data) body).getValue();
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) { } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else { } else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType()); Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
if (StandardCharsets.UTF_8.equals(charset)) { if (StandardCharsets.UTF_8.equals(charset)) {
@ -121,18 +122,18 @@ public class AmqpCoreConverter {
try { try {
CharBuffer chars = charset.newDecoder().decode(buf); CharBuffer chars = charset.newDecoder().decode(buf);
result = createTextMessage(message.getMessageID(), String.valueOf(chars)); result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
} catch (CharacterCodingException e) { } catch (CharacterCodingException e) {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} }
} else { } else {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} }
} }
} else if (body instanceof AmqpSequence) { } else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body; AmqpSequence sequence = (AmqpSequence) body;
ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
for (Object item : sequence.getValue()) { for (Object item : sequence.getValue()) {
m.writeObject(item); m.writeObject(item);
} }
@ -141,31 +142,31 @@ public class AmqpCoreConverter {
} else if (body instanceof AmqpValue) { } else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue(); Object value = ((AmqpValue) body).getValue();
if (value == null || value instanceof String) { if (value == null || value instanceof String) {
result = createTextMessage(message.getMessageID(), (String) value); result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
} else if (value instanceof Binary) { } else if (value instanceof Binary) {
Binary payload = (Binary) value; Binary payload = (Binary) value;
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) { if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), payload); result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
} else { } else {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength()); result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} }
} else if (value instanceof List) { } else if (value instanceof List) {
ServerJMSStreamMessage m = createStreamMessage(message.getMessageID()); ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
for (Object item : (List<Object>) value) { for (Object item : (List<Object>) value) {
m.writeObject(item); m.writeObject(item);
} }
result = m; result = m;
} else if (value instanceof Map) { } else if (value instanceof Map) {
result = createMapMessage(message.getMessageID(), (Map<String, Object>) value); result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
} else { } else {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try { try {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf)); TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
TLSEncode.getEncoder().writeObject(body); TLSEncode.getEncoder().writeObject(body);
result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex()); result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
} finally { } finally {
buf.release(); buf.release();
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@ -186,7 +187,7 @@ public class AmqpCoreConverter {
result.getInnerMessage().setReplyTo(message.getReplyTo()); result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable()); result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority()); result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddress()); result.getInnerMessage().setAddress(message.getAddressSimpleString());
result.encode(); result.encode();

View File

@ -54,7 +54,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
protected final Receiver receiver; protected final Receiver receiver;
protected String address; protected SimpleString address;
protected final AMQPSessionCallback sessionSPI; protected final AMQPSessionCallback sessionSPI;
@ -102,7 +102,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (target.getDynamic()) { if (target.getDynamic()) {
// if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session // will be deleted on closing of the session
address = sessionSPI.tempQueueName(); address = SimpleString.toSimpleString(sessionSPI.tempQueueName());
defRoutingType = getRoutingType(target.getCapabilities(), address); defRoutingType = getRoutingType(target.getCapabilities(), address);
try { try {
@ -113,12 +113,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
} }
expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH; expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
target.setAddress(address); target.setAddress(address.toString());
} else { } else {
// the target will have an address unless the remote is requesting an anonymous // the target will have an address unless the remote is requesting an anonymous
// relay in which case the address in the incoming message's to field will be // relay in which case the address in the incoming message's to field will be
// matched on receive of the message. // matched on receive of the message.
address = target.getAddress(); address = SimpleString.toSimpleString(target.getAddress());
if (address != null && !address.isEmpty()) { if (address != null && !address.isEmpty()) {
defRoutingType = getRoutingType(target.getCapabilities(), address); defRoutingType = getRoutingType(target.getCapabilities(), address);
@ -134,7 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
} }
try { try {
sessionSPI.check(SimpleString.toSimpleString(address), CheckType.SEND, new SecurityAuth() { sessionSPI.check(address, CheckType.SEND, new SecurityAuth() {
@Override @Override
public String getUsername() { public String getUsername() {
String username = null; String username = null;
@ -181,12 +181,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
flow(amqpCredits, minCreditRefresh); flow(amqpCredits, minCreditRefresh);
} }
public RoutingType getRoutingType(Receiver receiver, String address) { public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address); return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address);
} }
private RoutingType getRoutingType(Symbol[] symbols, String address) { private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
if (symbols != null) { if (symbols != null) {
for (Symbol symbol : symbols) { for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) { if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@ -264,7 +264,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
try { try {
sessionSPI.removeTemporaryQueue(target.getAddress()); sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(target.getAddress()));
} catch (Exception e) { } catch (Exception e) {
//ignore on close, its temp anyway and will be removed later //ignore on close, its temp anyway and will be removed later
} }

View File

@ -102,7 +102,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean shared = false; private boolean shared = false;
private boolean global = false; private boolean global = false;
private boolean isVolatile = false; private boolean isVolatile = false;
private String tempQueueName; private SimpleString tempQueueName;
public ProtonServerSenderContext(AMQPConnectionContext connection, public ProtonServerSenderContext(AMQPConnectionContext connection,
Sender sender, Sender sender,
@ -157,7 +157,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
super.initialise(); super.initialise();
Source source = (Source) sender.getRemoteSource(); Source source = (Source) sender.getRemoteSource();
String queue = null; SimpleString queue = null;
String selector = null; String selector = null;
final Map<Symbol, Object> supportedFilters = new HashMap<>(); final Map<Symbol, Object> supportedFilters = new HashMap<>();
@ -199,7 +199,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// the lifetime policy and capabilities of the new subscription. // the lifetime policy and capabilities of the new subscription.
if (result.isExists()) { if (result.isExists()) {
source = new org.apache.qpid.proton.amqp.messaging.Source(); source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(queue); source.setAddress(queue.toString());
source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(COPY); source.setDistributionMode(COPY);
@ -240,7 +240,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else if (source.getDynamic()) { } else if (source.getDynamic()) {
// if dynamic we have to create the node (queue) and set the address on the target, the // if dynamic we have to create the node (queue) and set the address on the target, the
// node is temporary and will be deleted on closing of the session // node is temporary and will be deleted on closing of the session
queue = java.util.UUID.randomUUID().toString(); queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
tempQueueName = queue; tempQueueName = queue;
try { try {
sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST); sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
@ -248,7 +248,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
} }
source.setAddress(queue); source.setAddress(queue.toString());
} else { } else {
SimpleString addressToUse; SimpleString addressToUse;
SimpleString queueNameToUse = null; SimpleString queueNameToUse = null;
@ -269,7 +269,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
multicast = hasCapabilities(TOPIC, source); multicast = hasCapabilities(TOPIC, source);
AddressQueryResult addressQueryResult = null; AddressQueryResult addressQueryResult = null;
try { try {
addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true); addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
} catch (ActiveMQSecurityException e) { } catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (ActiveMQAMQPException e) { } catch (ActiveMQAMQPException e) {
@ -294,7 +294,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// if not we look up the address // if not we look up the address
AddressQueryResult addressQueryResult = null; AddressQueryResult addressQueryResult = null;
try { try {
addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true); addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true);
} catch (ActiveMQSecurityException e) { } catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (ActiveMQAMQPException e) { } catch (ActiveMQAMQPException e) {
@ -333,6 +333,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST); queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
SimpleString simpleStringSelector = SimpleString.toSimpleString(selector);
//if the address specifies a broker configured queue then we always use this, treat it as a queue //if the address specifies a broker configured queue then we always use this, treat it as a queue
if (queue != null) { if (queue != null) {
@ -345,24 +346,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
String pubId = sender.getName(); String pubId = sender.getName();
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false); queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
if (result.isExists()) { if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local // If a client reattaches to a durable subscription with a different no-local
// filter value, selector or address then we must recreate the queue (JMS semantics). // filter value, selector or address then we must recreate the queue (JMS semantics).
if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
if (result.getConsumerCount() == 0) { if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue); sessionSPI.deleteQueue(queue);
sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} else { } else {
throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
} }
} }
} else { } else {
if (shared) { if (shared) {
sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} else { } else {
sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} }
} }
} else { } else {
@ -371,15 +371,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (shared && sender.getName() != null) { if (shared && sender.getName() != null) {
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile); queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
try { try {
sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
//this is ok, just means its shared //this is ok, just means its shared
} }
} else { } else {
queue = java.util.UUID.randomUUID().toString(); queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
tempQueueName = queue; tempQueueName = queue;
try { try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector);
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
} }
@ -387,18 +387,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} else { } else {
if (queueNameToUse != null) { if (queueNameToUse != null) {
SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST)); SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST);
if (matchingAnycastQueue != null) { if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue.toString(); queue = matchingAnycastQueue;
} else { } else {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
} }
} else { } else {
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST); SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
if (matchingAnycastQueue != null) { if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue.toString(); queue = matchingAnycastQueue;
} else { } else {
queue = addressToUse.toString(); queue = addressToUse;
} }
} }
@ -437,16 +437,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception { private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
if (queueName != null) { if (queueName != null) {
QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false); QueueQueryResult result = sessionSPI.queueQuery(queueName, routingType, false);
if (!result.isExists()) { if (!result.isExists()) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist"); throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
} else { } else {
if (!result.getAddress().equals(address)) { if (!result.getAddress().equals(address)) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'"); throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
} }
return sessionSPI.getMatchingQueue(address, queueName, routingType).toString(); return sessionSPI.getMatchingQueue(address, queueName, routingType);
} }
} }
return null; return null;
@ -495,7 +495,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (remoteLinkClose) { if (remoteLinkClose) {
Source source = (Source) sender.getSource(); Source source = (Source) sender.getSource();
if (source != null && source.getAddress() != null && multicast) { if (source != null && source.getAddress() != null && multicast) {
String queueName = source.getAddress(); SimpleString queueName = SimpleString.toSimpleString(source.getAddress());
QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false); QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false);
if (result.isExists() && source.getDynamic()) { if (result.isExists() && source.getDynamic()) {
sessionSPI.deleteQueue(queueName); sessionSPI.deleteQueue(queueName);
@ -508,7 +508,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (pubId.contains("|")) { if (pubId.contains("|")) {
pubId = pubId.split("\\|")[0]; pubId = pubId.split("\\|")[0];
} }
String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile); SimpleString queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
//only delete if it isn't volatile and has no consumers //only delete if it isn't volatile and has no consumers
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
@ -518,7 +518,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
try { try {
sessionSPI.removeTemporaryQueue(source.getAddress()); sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress()));
} catch (Exception e) { } catch (Exception e) {
//ignore on close, its temp anyway and will be removed later //ignore on close, its temp anyway and will be removed later
} }
@ -760,7 +760,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return false; return false;
} }
private static String createQueueName(boolean useCoreSubscriptionNaming, private static SimpleString createQueueName(boolean useCoreSubscriptionNaming,
String clientId, String clientId,
String pubId, String pubId,
boolean shared, boolean shared,
@ -784,7 +784,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
queue += ":global"; queue += ":global";
} }
} }
return queue; return SimpleString.toSimpleString(queue);
} }
} }

View File

@ -620,7 +620,7 @@ public class JMSMappingOutboundTransformerTest {
} }
private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) { private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) {
ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0); ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0, null);
if (compression) { if (compression) {
// TODO // TODO
@ -647,7 +647,7 @@ public class JMSMappingOutboundTransformerTest {
} }
private ServerJMSTextMessage createTextMessage(String text, boolean compression) { private ServerJMSTextMessage createTextMessage(String text, boolean compression) {
ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0); ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0, null);
if (compression) { if (compression) {
// TODO // TODO

View File

@ -22,6 +22,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@ -56,11 +57,12 @@ public class MQTTSession {
private MQTTProtocolManager protocolManager; private MQTTProtocolManager protocolManager;
private boolean isClean; private boolean isClean;
private WildcardConfiguration wildcardConfiguration; private WildcardConfiguration wildcardConfiguration;
private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
public MQTTSession(MQTTProtocolHandler protocolHandler, public MQTTSession(MQTTProtocolHandler protocolHandler,
MQTTConnection connection, MQTTConnection connection,
MQTTProtocolManager protocolManager, MQTTProtocolManager protocolManager,
@ -195,4 +197,8 @@ public class MQTTSession {
public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) { public void setWildcardConfiguration(WildcardConfiguration wildcardConfiguration) {
this.wildcardConfiguration = wildcardConfiguration; this.wildcardConfiguration = wildcardConfiguration;
} }
public CoreMessageObjectPools getCoreMessageObjectPools() {
return coreMessageObjectPools;
}
} }

View File

@ -78,7 +78,7 @@ public class MQTTSessionCallback implements SessionCallback {
} }
@Override @Override
public void disconnect(ServerConsumer consumer, String queueName) { public void disconnect(ServerConsumer consumer, SimpleString queueName) {
try { try {
consumer.removeItself(); consumer.removeItself();
} catch (Exception e) { } catch (Exception e) {

View File

@ -64,13 +64,13 @@ public class MQTTUtil {
public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain."; public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level"; public static final SimpleString MQTT_QOS_LEVEL_KEY = SimpleString.toSimpleString("mqtt.qos.level");
public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id"; public static final SimpleString MQTT_MESSAGE_ID_KEY = SimpleString.toSimpleString("mqtt.message.id");
public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type"; public static final SimpleString MQTT_MESSAGE_TYPE_KEY = SimpleString.toSimpleString("mqtt.message.type");
public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = new SimpleString("mqtt.message.retain"); public static final SimpleString MQTT_MESSAGE_RETAIN_KEY = SimpleString.toSimpleString("mqtt.message.retain");
public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; public static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
@ -113,10 +113,10 @@ public class MQTTUtil {
int qos) { int qos) {
long id = session.getServer().getStorageManager().generateID(); long id = session.getServer().getStorageManager().generateID();
CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE, session.getCoreMessageObjectPools());
message.setAddress(address); message.setAddress(address);
message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain); message.putBooleanProperty(MQTT_MESSAGE_RETAIN_KEY, retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); message.putIntProperty(MQTT_QOS_LEVEL_KEY, qos);
message.setType(Message.BYTES_TYPE); message.setType(Message.BYTES_TYPE);
return message; return message;
} }
@ -127,7 +127,8 @@ public class MQTTUtil {
int qos, int qos,
ByteBuf payload) { ByteBuf payload) {
String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration());
ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
ICoreMessage message = createServerMessage(session, address, retain, qos);
message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes()); message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes());
return message; return message;
@ -135,8 +136,8 @@ public class MQTTUtil {
public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) {
Message message = createServerMessage(session, address, false, 1); Message message = createServerMessage(session, address, false, 1);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId); message.putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId);
message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value()); message.putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, MqttMessageType.PUBREL.value());
return message; return message;
} }

View File

@ -1121,7 +1121,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override @Override
public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception { public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); SimpleString subQueueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName());
server.destroyQueue(subQueueName); server.destroyQueue(subQueueName);
return null; return null;

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -108,10 +109,11 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
} }
@Override @Override
public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception { public ICoreMessage toCore(OpenwireMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
return null; return null;
} }
// @Override // @Override
public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) { public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
// TODO: implement this // TODO: implement this
@ -119,10 +121,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
} }
// @Override // @Override
public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception { public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
Message messageSend = (Message) message; Message messageSend = (Message) message;
CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize()); CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
String type = messageSend.getType(); String type = messageSend.getType();
if (type != null) { if (type != null) {

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener; import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
// TODO: Implement this // TODO: Implement this
@ -441,6 +442,11 @@ public class OpenwireMessage implements Message {
return null; return null;
} }
@Override
public Message putStringProperty(SimpleString key, String value) {
return null;
}
@Override @Override
public int getEncodeSize() { public int getEncodeSize() {
return 0; return 0;
@ -478,6 +484,11 @@ public class OpenwireMessage implements Message {
@Override @Override
public ICoreMessage toCore() { public ICoreMessage toCore() {
return toCore(null);
}
@Override
public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null; return null;
} }

View File

@ -154,7 +154,7 @@ public class AMQConsumer {
} }
addressInfo.setInternal(internalAddress); addressInfo.setInternal(internalAddress);
if (isDurable) { if (isDurable) {
queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName)); queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName);
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName); QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isExists()) { if (result.isExists()) {
// Already exists // Already exists

View File

@ -27,6 +27,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
@ -82,6 +83,8 @@ public class AMQSession implements SessionCallback {
private final OpenWireProtocolManager protocolManager; private final OpenWireProtocolManager protocolManager;
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
public AMQSession(ConnectionInfo connInfo, public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo, SessionInfo sessInfo,
ActiveMQServer server, ActiveMQServer server,
@ -295,7 +298,7 @@ public class AMQSession implements SessionCallback {
} }
@Override @Override
public void disconnect(ServerConsumer consumerId, String queueName) { public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
@ -315,7 +318,7 @@ public class AMQSession implements SessionCallback {
actualDestinations = new ActiveMQDestination[]{destination}; actualDestinations = new ActiveMQDestination[]{destination};
} }
org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend); org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId()); originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
@ -338,12 +341,12 @@ public class AMQSession implements SessionCallback {
for (int i = 0; i < actualDestinations.length; i++) { for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i]; ActiveMQDestination dest = actualDestinations[i];
SimpleString address = new SimpleString(dest.getPhysicalName()); SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy(); org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
coreMsg.setAddress(address); coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) { if (actualDestinations[i].isQueue()) {
checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary()); checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
coreMsg.setRoutingType(RoutingType.ANYCAST); coreMsg.setRoutingType(RoutingType.ANYCAST);
} else { } else {
coreMsg.setRoutingType(RoutingType.MULTICAST); coreMsg.setRoutingType(RoutingType.MULTICAST);

View File

@ -239,7 +239,7 @@ public class StompSession implements SessionCallback {
} }
@Override @Override
public void disconnect(ServerConsumer consumerId, String queueName) { public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
StompSubscription stompSubscription = subscriptions.remove(consumerId.getID()); StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());
if (stompSubscription != null) { if (stompSubscription != null) {
StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR); StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);

View File

@ -111,7 +111,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
// Create the message consumer // Create the message consumer
SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector); SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
if (activation.isTopic() && spec.isSubscriptionDurable()) { if (activation.isTopic() && spec.isSubscriptionDurable()) {
SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName())); SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName());
QueueQuery subResponse = session.queueQuery(queueName); QueueQuery subResponse = session.queueQuery(queueName);

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol; package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@ -85,34 +83,15 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ServerPacketDecoder extends ClientPacketDecoder { public class ServerPacketDecoder extends ClientPacketDecoder {
private static final int UUID_LENGTH = 36;
private static final int DEFAULT_INTERNER_CAPACITY = 32;
private static final long serialVersionUID = 3348673114388400766L; private static final long serialVersionUID = 3348673114388400766L;
private SimpleString.Interner keysInterner;
private TypedProperties.StringValue.Interner valuesInterner;
public ServerPacketDecoder() {
this.keysInterner = null;
this.valuesInterner = null;
}
private void initializeInternersIfNeeded() {
if (this.keysInterner == null) {
this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
}
if (this.valuesInterner == null) {
this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
}
}
private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionSendMessage sendMessage; final SessionSendMessage sendMessage;
initializeInternersIfNeeded();
if (connection.isVersionBeforeAddressChange()) { if (connection.isVersionBeforeAddressChange()) {
sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner)); sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
} else { } else {
sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner)); sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
} }
sendMessage.decode(in); sendMessage.decode(in);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler; import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
@ -48,6 +49,8 @@ public final class CoreSessionCallback implements SessionCallback {
private ServerSessionPacketHandler handler; private ServerSessionPacketHandler handler;
private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
public CoreSessionCallback(String name, public CoreSessionCallback(String name,
ProtocolManager protocolManager, ProtocolManager protocolManager,
Channel channel, Channel channel,
@ -115,9 +118,9 @@ public final class CoreSessionCallback implements SessionCallback {
Packet packet; Packet packet;
if (channel.getConnection().isVersionBeforeAddressChange()) { if (channel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount); packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
} else { } else {
packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount); packet = new SessionReceiveMessage(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
} }
int size = 0; int size = 0;
@ -159,11 +162,11 @@ public final class CoreSessionCallback implements SessionCallback {
} }
@Override @Override
public void disconnect(ServerConsumer consumerId, String queueName) { public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) { if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {
channel.send(new DisconnectConsumerMessage(consumerId.getID())); channel.send(new DisconnectConsumerMessage(consumerId.getID()));
} else { } else {
ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName); ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName.toString());
} }
} }

View File

@ -1045,7 +1045,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override @Override
public void disconnect() { public void disconnect() {
callback.disconnect(this, getQueue().getName().toString()); callback.disconnect(this, getQueue().getName());
} }
public float getRate() { public float getRate() {

View File

@ -18,10 +18,11 @@ package org.apache.activemq.artemis.spi.core.protocol;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
public interface MessageConverter<ProtocolMessage extends Message> { public interface MessageConverter<ProtocolMessage extends Message> {
ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception; ICoreMessage toCore(ProtocolMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception;
ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception; ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception;
} }

View File

@ -81,7 +81,7 @@ public interface SessionCallback {
void closed(); void closed();
void disconnect(ServerConsumer consumerId, String queueName); void disconnect(ServerConsumer consumerId, SimpleString queueName);
boolean isWritable(ReadyListener callback, Object protocolContext); boolean isWritable(ReadyListener callback, Object protocolContext);

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
@ -334,6 +335,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Override @Override
public CoreMessage toCore() { public CoreMessage toCore() {
return toCore(null);
}
@Override
public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null; return null;
} }
@ -590,6 +596,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return null; return null;
} }
@Override
public Message putStringProperty(SimpleString key, String value) {
return null;
}
@Override @Override
public Message putStringProperty(String key, String value) { public Message putStringProperty(String key, String value) {
return null; return null;

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -386,6 +387,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
@Override @Override
public ICoreMessage toCore() { public ICoreMessage toCore() {
return toCore(null);
}
@Override
public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return null; return null;
} }
@ -647,6 +653,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
return null; return null;
} }
@Override
public Message putStringProperty(SimpleString key, String value) {
return null;
}
@Override @Override
public Message putStringProperty(String key, String value) { public Message putStringProperty(String key, String value) {
return null; return null;

View File

@ -585,7 +585,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
} }
@Override @Override
public void disconnect(ServerConsumer consumerId, String queueName) { public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
//To change body of implemented methods use File | Settings | File Templates. //To change body of implemented methods use File | Settings | File Templates.
} }

View File

@ -128,7 +128,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
msg.putStringProperty("myNonAsciiStringProperty", international.toString()); msg.putStringProperty("myNonAsciiStringProperty", international.toString());
msg.putStringProperty("mySpecialCharacters", special); msg.putStringProperty("mySpecialCharacters", special);
msg.putStringProperty(new SimpleString("mySimpleStringProperty"), new SimpleString("mySimpleStringPropertyValue_" + i)); msg.putStringProperty(new SimpleString("mySimpleStringProperty"), new SimpleString("mySimpleStringPropertyValue_" + i));
msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), null); msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), (SimpleString) null);
producer.send(msg); producer.send(msg);
} }