From 8d776eddfcc12bfc73771c04e376583c9fa221e1 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 4 Jan 2018 15:22:05 +0100 Subject: [PATCH] ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol The commit contains: - a general purpose interner implementation - StringValue/SimpleString internrs specializations - TypedProperties keys/values string interning for SessionSendMessage decoding --- .../artemis/api/core/SimpleString.java | 138 ++++++++++++++- .../artemis/utils/AbstractInterner.java | 157 ++++++++++++++++++ .../utils/collections/TypedProperties.java | 60 ++++++- .../core/message/impl/CoreMessage.java | 36 +++- .../core/protocol/ClientPacketDecoder.java | 4 + .../impl/ActiveMQClientProtocolManager.java | 4 +- .../core/protocol/ServerPacketDecoder.java | 29 +++- .../core/impl/CoreProtocolManager.java | 2 +- ...iveMQServerSideProtocolManagerFactory.java | 4 +- 9 files changed, 404 insertions(+), 30 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java index 79909c7acc..e24e245f81 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.List; import io.netty.buffer.ByteBuf; +import io.netty.util.internal.PlatformDependent; +import org.apache.activemq.artemis.utils.AbstractInterner; import org.apache.activemq.artemis.utils.DataConstants; /** @@ -31,6 +33,129 @@ import org.apache.activemq.artemis.utils.DataConstants; */ public final class SimpleString implements CharSequence, Serializable, Comparable { + public static final class Interner extends AbstractInterner { + + private final int maxLength; + + public Interner(final int capacity, final int maxCharsLength) { + super(capacity); + this.maxLength = maxCharsLength; + } + + @Override + protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) { + return SimpleString.isEqual(entry, byteBuf, offset, length); + } + + @Override + protected boolean canIntern(final ByteBuf byteBuf, final int length) { + assert length % 2 == 0 : "length must be a multiple of 2"; + final int expectedStringLength = length >> 1; + return expectedStringLength <= maxLength; + } + + @Override + protected SimpleString create(final ByteBuf byteBuf, final int length) { + return readSimpleString(byteBuf, length); + } + } + + /** + * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s}, + * {@code false} otherwise. + *

+ * It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the + * length field. + */ + public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) { + if (s == null) { + return false; + } + final byte[] chars = s.getData(); + if (chars.length != length) + return false; + if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { + if ((offset + length) > bytes.writerIndex()) { + throw new IndexOutOfBoundsException(); + } + if (bytes.hasArray()) { + return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length); + } else if (bytes.hasMemoryAddress()) { + return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length); + } + } + return byteBufIsEqual(chars, bytes, offset, length); + } + + private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) { + for (int i = 0; i < length; i++) + if (chars[i] != bytes.getByte(offset + i)) + return false; + return true; + } + + private static boolean batchOnHeapIsEqual(final byte[] chars, + final byte[] array, + final int arrayOffset, + final int length) { + final int longCount = length >>> 3; + final int bytesCount = length & 7; + int bytesIndex = arrayOffset; + int charsIndex = 0; + for (int i = 0; i < longCount; i++) { + final long charsLong = PlatformDependent.getLong(chars, charsIndex); + final long bytesLong = PlatformDependent.getLong(array, bytesIndex); + if (charsLong != bytesLong) { + return false; + + } + bytesIndex += 8; + charsIndex += 8; + } + for (int i = 0; i < bytesCount; i++) { + final byte charsByte = PlatformDependent.getByte(chars, charsIndex); + final byte bytesByte = PlatformDependent.getByte(array, bytesIndex); + if (charsByte != bytesByte) { + return false; + + } + bytesIndex++; + charsIndex++; + } + return true; + } + + private static boolean batchOffHeapIsEqual(final byte[] chars, + final long address, + final int offset, + final int length) { + final int longCount = length >>> 3; + final int bytesCount = length & 7; + long bytesAddress = address + offset; + int charsIndex = 0; + for (int i = 0; i < longCount; i++) { + final long charsLong = PlatformDependent.getLong(chars, charsIndex); + final long bytesLong = PlatformDependent.getLong(bytesAddress); + if (charsLong != bytesLong) { + return false; + + } + bytesAddress += 8; + charsIndex += 8; + } + for (int i = 0; i < bytesCount; i++) { + final byte charsByte = PlatformDependent.getByte(chars, charsIndex); + final byte bytesByte = PlatformDependent.getByte(bytesAddress); + if (charsByte != bytesByte) { + return false; + + } + bytesAddress++; + charsIndex++; + } + return true; + } + private static final long serialVersionUID = 4204223851422244307L; // Attributes @@ -134,7 +259,6 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl return subSeq(start, end); } - public static SimpleString readNullableSimpleString(ByteBuf buffer) { int b = buffer.readByte(); if (b == DataConstants.NULL) { @@ -143,13 +267,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl return readSimpleString(buffer); } - public static SimpleString readSimpleString(ByteBuf buffer) { int len = buffer.readInt(); - if (len > buffer.readableBytes()) { - throw new IndexOutOfBoundsException(); - } - byte[] data = new byte[len]; + return readSimpleString(buffer, len); + } + + public static SimpleString readSimpleString(final ByteBuf buffer, final int length) { + byte[] data = new byte[length]; buffer.readBytes(data); return new SimpleString(data); } @@ -169,8 +293,6 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl buffer.writeBytes(data); } - - public SimpleString subSeq(final int start, final int end) { int len = data.length >> 1; diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java new file mode 100644 index 0000000000..7e1fe404a5 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AbstractInterner.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.MathUtil; +import io.netty.util.internal.PlatformDependent; + +/** + * Thread-safe {@code } interner. + *

+ * Differently from {@link String#intern()} it contains a fixed amount of entries and + * when used by concurrent threads it doesn't ensure the uniqueness of the entries ie + * the same entry could be allocated multiple times by concurrent calls. + */ +public abstract class AbstractInterner { + + private final T[] entries; + private final int mask; + private final int shift; + + public AbstractInterner(final int capacity) { + entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)]; + mask = entries.length - 1; + //log2 of entries.length + shift = 31 - Integer.numberOfLeadingZeros(entries.length); + } + + /** + * Batch hash code implementation that works at its best if {@code bytes} + * contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded. + */ + private static int hashCode(final ByteBuf bytes, final int offset, final int length) { + if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) { + //if the platform allows it, the hash code could be computed without bounds checking + if (bytes.hasArray()) { + return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length); + } else if (bytes.hasMemoryAddress()) { + return offHeapHashCode(bytes.memoryAddress(), offset, length); + } + } + return byteBufHashCode(bytes, offset, length); + } + + private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) { + final int intCount = length >>> 1; + final int byteCount = length & 1; + int hashCode = 1; + int arrayIndex = offset; + for (int i = 0; i < intCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex); + arrayIndex += 2; + } + for (int i = 0; i < byteCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++); + } + return hashCode; + } + + private static int offHeapHashCode(final long address, final int offset, final int length) { + final int intCount = length >>> 1; + final int byteCount = length & 1; + int hashCode = 1; + int arrayIndex = offset; + for (int i = 0; i < intCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex); + arrayIndex += 2; + } + for (int i = 0; i < byteCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++); + } + return hashCode; + } + + private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) { + final int intCount = length >>> 1; + final int byteCount = length & 1; + int hashCode = 1; + int arrayIndex = offset; + for (int i = 0; i < intCount; i++) { + final short shortLE = byteBuf.getShortLE(arrayIndex); + final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE; + hashCode = 31 * hashCode + nativeShort; + arrayIndex += 2; + } + for (int i = 0; i < byteCount; i++) { + hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++); + } + return hashCode; + } + + /** + * Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned, + * {@code false} otherwise. + */ + protected abstract boolean canIntern(ByteBuf byteBuf, int length); + + /** + * Create a new entry. + */ + protected abstract T create(ByteBuf byteBuf, int length); + + /** + * Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset} + * and {@code length} {@code false} otherwise. + */ + protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length); + + /** + * Returns and interned entry if possible, a new one otherwise. + *

+ * The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it. + */ + public final T intern(final ByteBuf byteBuf, final int length) { + if (!canIntern(byteBuf, length)) { + return create(byteBuf, length); + } else { + if (!byteBuf.isReadable(length)) { + throw new IndexOutOfBoundsException(); + } + final int bytesOffset = byteBuf.readerIndex(); + final int hashCode = hashCode(byteBuf, bytesOffset, length); + //fast % operation with power of 2 entries.length + final int firstIndex = hashCode & mask; + final T firstEntry = entries[firstIndex]; + if (isEqual(firstEntry, byteBuf, bytesOffset, length)) { + byteBuf.skipBytes(length); + return firstEntry; + } + final int secondIndex = (hashCode >> shift) & mask; + final T secondEntry = entries[secondIndex]; + if (isEqual(secondEntry, byteBuf, bytesOffset, length)) { + byteBuf.skipBytes(length); + return secondEntry; + } + final T internedEntry = create(byteBuf, length); + final int entryIndex = firstEntry == null ? firstIndex : secondIndex; + entries[entryIndex] = internedEntry; + return internedEntry; + } + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index b17156e520..a3e4876bdb 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.logs.ActiveMQUtilBundle; +import org.apache.activemq.artemis.utils.AbstractInterner; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; @@ -93,6 +94,7 @@ public class TypedProperties { } public void putByteProperty(final SimpleString key, final byte value) { + checkCreateProperties(); checkCreateProperties(); doPutValue(key, ByteValue.valueOf(value)); } @@ -329,7 +331,9 @@ public class TypedProperties { } } - public synchronized void decode(final ByteBuf buffer) { + public synchronized void decode(final ByteBuf buffer, + final SimpleString.Interner keyInterner, + final StringValue.Interner valueInterner) { byte b = buffer.readByte(); if (b == DataConstants.NULL) { @@ -342,10 +346,15 @@ public class TypedProperties { size = 0; for (int i = 0; i < numHeaders; i++) { + final SimpleString key; int len = buffer.readInt(); - byte[] data = new byte[len]; - buffer.readBytes(data); - SimpleString key = new SimpleString(data); + if (keyInterner != null) { + key = keyInterner.intern(buffer, len); + } else { + byte[] data = new byte[len]; + buffer.readBytes(data); + key = new SimpleString(data); + } byte type = buffer.readByte(); @@ -403,7 +412,12 @@ public class TypedProperties { break; } case STRING: { - val = new StringValue(buffer); + if (valueInterner != null) { + final int length = buffer.readInt(); + val = valueInterner.intern(buffer, length); + } else { + val = new StringValue(buffer); + } doPutValue(key, val); break; } @@ -415,6 +429,10 @@ public class TypedProperties { } } + public synchronized void decode(final ByteBuf buffer) { + decode(buffer, null, null); + } + public synchronized void encode(final ByteBuf buffer) { if (properties == null) { buffer.writeByte(DataConstants.NULL); @@ -881,7 +899,37 @@ public class TypedProperties { } } - private static final class StringValue extends PropertyValue { + public static final class StringValue extends PropertyValue { + + public static final class Interner extends AbstractInterner { + + private final int maxLength; + + public Interner(final int capacity, final int maxCharsLength) { + super(capacity); + this.maxLength = maxCharsLength; + } + + @Override + protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) { + if (entry == null) { + return false; + } + return SimpleString.isEqual(entry.val, byteBuf, offset, length); + } + + @Override + protected boolean canIntern(final ByteBuf byteBuf, final int length) { + assert length % 2 == 0 : "length must be a multiple of 2"; + final int expectedStringLength = length >> 1; + return expectedStringLength <= maxLength; + } + + @Override + protected StringValue create(final ByteBuf byteBuf, final int length) { + return new StringValue(SimpleString.readSimpleString(byteBuf, length)); + } + } final SimpleString val; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index b0656b6926..4ebf97ed25 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Set; +import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -42,8 +43,6 @@ import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; - /** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple * consumers */ public class CoreMessage extends RefCountMessage implements ICoreMessage { @@ -94,7 +93,18 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { protected volatile TypedProperties properties; + private final SimpleString.Interner keysInterner; + private final TypedProperties.StringValue.Interner valuesInterner; + + public CoreMessage(final SimpleString.Interner keysInterner, + final TypedProperties.StringValue.Interner valuesInterner) { + this.keysInterner = keysInterner; + this.valuesInterner = valuesInterner; + } + public CoreMessage() { + this.keysInterner = null; + this.valuesInterner = null; } /** On core there's no delivery annotation */ @@ -318,6 +328,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public CoreMessage(long id, int bufferSize) { this.initBuffer(bufferSize); this.setMessageID(id); + this.keysInterner = null; + this.valuesInterner = null; } protected CoreMessage(CoreMessage other, TypedProperties copyProperties) { @@ -331,6 +343,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { this.timestamp = other.timestamp; this.priority = other.priority; this.userID = other.userID; + this.keysInterner = other.keysInterner; + this.valuesInterner = other.valuesInterner; if (copyProperties != null) { this.properties = new TypedProperties(copyProperties); } @@ -464,7 +478,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { if (properties == null) { TypedProperties properties = new TypedProperties(); if (buffer != null && propertiesLocation >= 0) { - properties.decode(buffer.duplicate().readerIndex(propertiesLocation)); + final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation); + properties.decode(byteBuf, keysInterner, valuesInterner); } this.properties = properties; } @@ -528,8 +543,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) { messageIDPosition = buffer.readerIndex(); messageID = buffer.readLong(); - - address = SimpleString.readNullableSimpleString(buffer); + int b = buffer.readByte(); + if (b != DataConstants.NULL) { + final int length = buffer.readInt(); + if (keysInterner != null) { + address = keysInterner.intern(buffer, length); + } else { + address = SimpleString.readSimpleString(buffer, length); + } + } else { + address = null; + } if (buffer.readByte() == DataConstants.NOT_NULL) { byte[] bytes = new byte[16]; buffer.readBytes(bytes); @@ -547,7 +571,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { propertiesLocation = buffer.readerIndex(); } else { properties = new TypedProperties(); - properties.decode(buffer); + properties.decode(buffer, keysInterner, valuesInterner); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java index 10220306bf..787e4997c6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java @@ -34,6 +34,10 @@ public class ClientPacketDecoder extends PacketDecoder { private static final long serialVersionUID = 6952614096979334582L; public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder(); + protected ClientPacketDecoder() { + + } + @Override public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) { final byte packetType = in.readByte(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index 93432b87f7..f0005ff9ce 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -409,7 +409,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { List incomingInterceptors, List outgoingInterceptors, TopologyResponseHandler topologyResponseHandler) { - this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors); + this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors); this.topologyResponseHandler = topologyResponseHandler; @@ -510,7 +510,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { } } - protected PacketDecoder getPacketDecoder() { + protected PacketDecoder createPacketDecoder() { return ClientPacketDecoder.INSTANCE; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index 05844769b0..2276fdba60 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; @@ -53,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; +import org.apache.activemq.artemis.utils.collections.TypedProperties; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -83,16 +85,34 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES public class ServerPacketDecoder extends ClientPacketDecoder { + private static final int UUID_LENGTH = 36; + private static final int DEFAULT_INTERNER_CAPACITY = 32; private static final long serialVersionUID = 3348673114388400766L; - public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder(); + private SimpleString.Interner keysInterner; + private TypedProperties.StringValue.Interner valuesInterner; - private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { + public ServerPacketDecoder() { + this.keysInterner = null; + this.valuesInterner = null; + } + + private void initializeInternersIfNeeded() { + if (this.keysInterner == null) { + this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH); + } + if (this.valuesInterner == null) { + this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH); + } + } + + private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { final SessionSendMessage sendMessage; + initializeInternersIfNeeded(); if (connection.isVersionBeforeAddressChange()) { - sendMessage = new SessionSendMessage_1X(new CoreMessage()); + sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner)); } else { - sendMessage = new SessionSendMessage(new CoreMessage()); + sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner)); } sendMessage.decode(in); @@ -259,5 +279,4 @@ public class ServerPacketDecoder extends ClientPacketDecoder { return packet; } - } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index c9262fa36e..af9e1316a8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -116,7 +116,7 @@ public class CoreProtocolManager implements ProtocolManager { Executor connectionExecutor = server.getExecutorFactory().getExecutor(); - final CoreRemotingConnection rc = new RemotingConnectionImpl(ServerPacketDecoder.INSTANCE, connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID()); + final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID()); Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java index 85ad3a37ad..209f68f333 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java @@ -65,8 +65,8 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM class ActiveMQReplicationProtocolManager extends ActiveMQClientProtocolManager { @Override - protected PacketDecoder getPacketDecoder() { - return ServerPacketDecoder.INSTANCE; + protected PacketDecoder createPacketDecoder() { + return new ServerPacketDecoder(); } } }