ARTEMIS-1586 Reduce GC pressure due to String allocations on Core protocol

The commit contains:
- a general purpose interner implementation
- StringValue/SimpleString internrs specializations
- TypedProperties keys/values string interning for SessionSendMessage decoding
This commit is contained in:
Francesco Nigro 2018-01-04 15:22:05 +01:00 committed by Michael Pearce
parent 00bd989f9f
commit 8d776eddfc
9 changed files with 404 additions and 30 deletions

View File

@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.utils.AbstractInterner;
import org.apache.activemq.artemis.utils.DataConstants;
/**
@ -31,6 +33,129 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
public static final class Interner extends AbstractInterner<SimpleString> {
private final int maxLength;
public Interner(final int capacity, final int maxCharsLength) {
super(capacity);
this.maxLength = maxCharsLength;
}
@Override
protected boolean isEqual(final SimpleString entry, final ByteBuf byteBuf, final int offset, final int length) {
return SimpleString.isEqual(entry, byteBuf, offset, length);
}
@Override
protected boolean canIntern(final ByteBuf byteBuf, final int length) {
assert length % 2 == 0 : "length must be a multiple of 2";
final int expectedStringLength = length >> 1;
return expectedStringLength <= maxLength;
}
@Override
protected SimpleString create(final ByteBuf byteBuf, final int length) {
return readSimpleString(byteBuf, length);
}
}
/**
* Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},
* {@code false} otherwise.
* <p>
* It assumes that the {@code bytes} content is read using {@link SimpleString#readSimpleString(ByteBuf, int)} ie starting right after the
* length field.
*/
public static boolean isEqual(final SimpleString s, final ByteBuf bytes, final int offset, final int length) {
if (s == null) {
return false;
}
final byte[] chars = s.getData();
if (chars.length != length)
return false;
if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
if ((offset + length) > bytes.writerIndex()) {
throw new IndexOutOfBoundsException();
}
if (bytes.hasArray()) {
return batchOnHeapIsEqual(chars, bytes.array(), bytes.arrayOffset() + offset, length);
} else if (bytes.hasMemoryAddress()) {
return batchOffHeapIsEqual(chars, bytes.memoryAddress(), offset, length);
}
}
return byteBufIsEqual(chars, bytes, offset, length);
}
private static boolean byteBufIsEqual(final byte[] chars, final ByteBuf bytes, final int offset, final int length) {
for (int i = 0; i < length; i++)
if (chars[i] != bytes.getByte(offset + i))
return false;
return true;
}
private static boolean batchOnHeapIsEqual(final byte[] chars,
final byte[] array,
final int arrayOffset,
final int length) {
final int longCount = length >>> 3;
final int bytesCount = length & 7;
int bytesIndex = arrayOffset;
int charsIndex = 0;
for (int i = 0; i < longCount; i++) {
final long charsLong = PlatformDependent.getLong(chars, charsIndex);
final long bytesLong = PlatformDependent.getLong(array, bytesIndex);
if (charsLong != bytesLong) {
return false;
}
bytesIndex += 8;
charsIndex += 8;
}
for (int i = 0; i < bytesCount; i++) {
final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
final byte bytesByte = PlatformDependent.getByte(array, bytesIndex);
if (charsByte != bytesByte) {
return false;
}
bytesIndex++;
charsIndex++;
}
return true;
}
private static boolean batchOffHeapIsEqual(final byte[] chars,
final long address,
final int offset,
final int length) {
final int longCount = length >>> 3;
final int bytesCount = length & 7;
long bytesAddress = address + offset;
int charsIndex = 0;
for (int i = 0; i < longCount; i++) {
final long charsLong = PlatformDependent.getLong(chars, charsIndex);
final long bytesLong = PlatformDependent.getLong(bytesAddress);
if (charsLong != bytesLong) {
return false;
}
bytesAddress += 8;
charsIndex += 8;
}
for (int i = 0; i < bytesCount; i++) {
final byte charsByte = PlatformDependent.getByte(chars, charsIndex);
final byte bytesByte = PlatformDependent.getByte(bytesAddress);
if (charsByte != bytesByte) {
return false;
}
bytesAddress++;
charsIndex++;
}
return true;
}
private static final long serialVersionUID = 4204223851422244307L;
// Attributes
@ -134,7 +259,6 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return subSeq(start, end);
}
public static SimpleString readNullableSimpleString(ByteBuf buffer) {
int b = buffer.readByte();
if (b == DataConstants.NULL) {
@ -143,13 +267,13 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
return readSimpleString(buffer);
}
public static SimpleString readSimpleString(ByteBuf buffer) {
int len = buffer.readInt();
if (len > buffer.readableBytes()) {
throw new IndexOutOfBoundsException();
}
byte[] data = new byte[len];
return readSimpleString(buffer, len);
}
public static SimpleString readSimpleString(final ByteBuf buffer, final int length) {
byte[] data = new byte[length];
buffer.readBytes(data);
return new SimpleString(data);
}
@ -169,8 +293,6 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
buffer.writeBytes(data);
}
public SimpleString subSeq(final int start, final int end) {
int len = data.length >> 1;

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.PlatformDependent;
/**
* Thread-safe {@code <T>} interner.
* <p>
* Differently from {@link String#intern()} it contains a fixed amount of entries and
* when used by concurrent threads it doesn't ensure the uniqueness of the entries ie
* the same entry could be allocated multiple times by concurrent calls.
*/
public abstract class AbstractInterner<T> {
private final T[] entries;
private final int mask;
private final int shift;
public AbstractInterner(final int capacity) {
entries = (T[]) new Object[MathUtil.findNextPositivePowerOfTwo(capacity)];
mask = entries.length - 1;
//log2 of entries.length
shift = 31 - Integer.numberOfLeadingZeros(entries.length);
}
/**
* Batch hash code implementation that works at its best if {@code bytes}
* contains a {@link org.apache.activemq.artemis.api.core.SimpleString} encoded.
*/
private static int hashCode(final ByteBuf bytes, final int offset, final int length) {
if (PlatformDependent.isUnaligned() && PlatformDependent.hasUnsafe()) {
//if the platform allows it, the hash code could be computed without bounds checking
if (bytes.hasArray()) {
return onHeapHashCode(bytes.array(), bytes.arrayOffset() + offset, length);
} else if (bytes.hasMemoryAddress()) {
return offHeapHashCode(bytes.memoryAddress(), offset, length);
}
}
return byteBufHashCode(bytes, offset, length);
}
private static int onHeapHashCode(final byte[] bytes, final int offset, final int length) {
final int intCount = length >>> 1;
final int byteCount = length & 1;
int hashCode = 1;
int arrayIndex = offset;
for (int i = 0; i < intCount; i++) {
hashCode = 31 * hashCode + PlatformDependent.getShort(bytes, arrayIndex);
arrayIndex += 2;
}
for (int i = 0; i < byteCount; i++) {
hashCode = 31 * hashCode + PlatformDependent.getByte(bytes, arrayIndex++);
}
return hashCode;
}
private static int offHeapHashCode(final long address, final int offset, final int length) {
final int intCount = length >>> 1;
final int byteCount = length & 1;
int hashCode = 1;
int arrayIndex = offset;
for (int i = 0; i < intCount; i++) {
hashCode = 31 * hashCode + PlatformDependent.getShort(address + arrayIndex);
arrayIndex += 2;
}
for (int i = 0; i < byteCount; i++) {
hashCode = 31 * hashCode + PlatformDependent.getByte(address + arrayIndex++);
}
return hashCode;
}
private static int byteBufHashCode(final ByteBuf byteBuf, final int offset, final int length) {
final int intCount = length >>> 1;
final int byteCount = length & 1;
int hashCode = 1;
int arrayIndex = offset;
for (int i = 0; i < intCount; i++) {
final short shortLE = byteBuf.getShortLE(arrayIndex);
final short nativeShort = PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? Short.reverseBytes(shortLE) : shortLE;
hashCode = 31 * hashCode + nativeShort;
arrayIndex += 2;
}
for (int i = 0; i < byteCount; i++) {
hashCode = 31 * hashCode + byteBuf.getByte(arrayIndex++);
}
return hashCode;
}
/**
* Returns {@code true} if {@code length}'s {@code byteBuf} content from {@link ByteBuf#readerIndex()} can be interned,
* {@code false} otherwise.
*/
protected abstract boolean canIntern(ByteBuf byteBuf, int length);
/**
* Create a new entry.
*/
protected abstract T create(ByteBuf byteBuf, int length);
/**
* Returns {@code true} if the {@code entry} content is the same of {@code byteBuf} at the specified {@code offset}
* and {@code length} {@code false} otherwise.
*/
protected abstract boolean isEqual(T entry, ByteBuf byteBuf, int offset, int length);
/**
* Returns and interned entry if possible, a new one otherwise.
* <p>
* The {@code byteBuf}'s {@link ByteBuf#readerIndex()} is incremented by {@code length} after it.
*/
public final T intern(final ByteBuf byteBuf, final int length) {
if (!canIntern(byteBuf, length)) {
return create(byteBuf, length);
} else {
if (!byteBuf.isReadable(length)) {
throw new IndexOutOfBoundsException();
}
final int bytesOffset = byteBuf.readerIndex();
final int hashCode = hashCode(byteBuf, bytesOffset, length);
//fast % operation with power of 2 entries.length
final int firstIndex = hashCode & mask;
final T firstEntry = entries[firstIndex];
if (isEqual(firstEntry, byteBuf, bytesOffset, length)) {
byteBuf.skipBytes(length);
return firstEntry;
}
final int secondIndex = (hashCode >> shift) & mask;
final T secondEntry = entries[secondIndex];
if (isEqual(secondEntry, byteBuf, bytesOffset, length)) {
byteBuf.skipBytes(length);
return secondEntry;
}
final T internedEntry = create(byteBuf, length);
final int entryIndex = firstEntry == null ? firstIndex : secondIndex;
entries[entryIndex] = internedEntry;
return internedEntry;
}
}
}

View File

@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
import org.apache.activemq.artemis.utils.AbstractInterner;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
@ -93,6 +94,7 @@ public class TypedProperties {
}
public void putByteProperty(final SimpleString key, final byte value) {
checkCreateProperties();
checkCreateProperties();
doPutValue(key, ByteValue.valueOf(value));
}
@ -329,7 +331,9 @@ public class TypedProperties {
}
}
public synchronized void decode(final ByteBuf buffer) {
public synchronized void decode(final ByteBuf buffer,
final SimpleString.Interner keyInterner,
final StringValue.Interner valueInterner) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
@ -342,10 +346,15 @@ public class TypedProperties {
size = 0;
for (int i = 0; i < numHeaders; i++) {
final SimpleString key;
int len = buffer.readInt();
byte[] data = new byte[len];
buffer.readBytes(data);
SimpleString key = new SimpleString(data);
if (keyInterner != null) {
key = keyInterner.intern(buffer, len);
} else {
byte[] data = new byte[len];
buffer.readBytes(data);
key = new SimpleString(data);
}
byte type = buffer.readByte();
@ -403,7 +412,12 @@ public class TypedProperties {
break;
}
case STRING: {
val = new StringValue(buffer);
if (valueInterner != null) {
final int length = buffer.readInt();
val = valueInterner.intern(buffer, length);
} else {
val = new StringValue(buffer);
}
doPutValue(key, val);
break;
}
@ -415,6 +429,10 @@ public class TypedProperties {
}
}
public synchronized void decode(final ByteBuf buffer) {
decode(buffer, null, null);
}
public synchronized void encode(final ByteBuf buffer) {
if (properties == null) {
buffer.writeByte(DataConstants.NULL);
@ -881,7 +899,37 @@ public class TypedProperties {
}
}
private static final class StringValue extends PropertyValue {
public static final class StringValue extends PropertyValue {
public static final class Interner extends AbstractInterner<StringValue> {
private final int maxLength;
public Interner(final int capacity, final int maxCharsLength) {
super(capacity);
this.maxLength = maxCharsLength;
}
@Override
protected boolean isEqual(final StringValue entry, final ByteBuf byteBuf, final int offset, final int length) {
if (entry == null) {
return false;
}
return SimpleString.isEqual(entry.val, byteBuf, offset, length);
}
@Override
protected boolean canIntern(final ByteBuf byteBuf, final int length) {
assert length % 2 == 0 : "length must be a multiple of 2";
final int expectedStringLength = length >> 1;
return expectedStringLength <= maxLength;
}
@Override
protected StringValue create(final ByteBuf byteBuf, final int length) {
return new StringValue(SimpleString.readSimpleString(byteBuf, length));
}
}
final SimpleString val;

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -42,8 +43,6 @@ import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
* consumers */
public class CoreMessage extends RefCountMessage implements ICoreMessage {
@ -94,7 +93,18 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
protected volatile TypedProperties properties;
private final SimpleString.Interner keysInterner;
private final TypedProperties.StringValue.Interner valuesInterner;
public CoreMessage(final SimpleString.Interner keysInterner,
final TypedProperties.StringValue.Interner valuesInterner) {
this.keysInterner = keysInterner;
this.valuesInterner = valuesInterner;
}
public CoreMessage() {
this.keysInterner = null;
this.valuesInterner = null;
}
/** On core there's no delivery annotation */
@ -318,6 +328,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public CoreMessage(long id, int bufferSize) {
this.initBuffer(bufferSize);
this.setMessageID(id);
this.keysInterner = null;
this.valuesInterner = null;
}
protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
@ -331,6 +343,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
this.timestamp = other.timestamp;
this.priority = other.priority;
this.userID = other.userID;
this.keysInterner = other.keysInterner;
this.valuesInterner = other.valuesInterner;
if (copyProperties != null) {
this.properties = new TypedProperties(copyProperties);
}
@ -464,7 +478,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
if (properties == null) {
TypedProperties properties = new TypedProperties();
if (buffer != null && propertiesLocation >= 0) {
properties.decode(buffer.duplicate().readerIndex(propertiesLocation));
final ByteBuf byteBuf = buffer.duplicate().readerIndex(propertiesLocation);
properties.decode(byteBuf, keysInterner, valuesInterner);
}
this.properties = properties;
}
@ -528,8 +543,17 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProperties) {
messageIDPosition = buffer.readerIndex();
messageID = buffer.readLong();
address = SimpleString.readNullableSimpleString(buffer);
int b = buffer.readByte();
if (b != DataConstants.NULL) {
final int length = buffer.readInt();
if (keysInterner != null) {
address = keysInterner.intern(buffer, length);
} else {
address = SimpleString.readSimpleString(buffer, length);
}
} else {
address = null;
}
if (buffer.readByte() == DataConstants.NOT_NULL) {
byte[] bytes = new byte[16];
buffer.readBytes(bytes);
@ -547,7 +571,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
propertiesLocation = buffer.readerIndex();
} else {
properties = new TypedProperties();
properties.decode(buffer);
properties.decode(buffer, keysInterner, valuesInterner);
}
}

View File

@ -34,6 +34,10 @@ public class ClientPacketDecoder extends PacketDecoder {
private static final long serialVersionUID = 6952614096979334582L;
public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();
protected ClientPacketDecoder() {
}
@Override
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final byte packetType = in.readByte();

View File

@ -409,7 +409,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
List<Interceptor> incomingInterceptors,
List<Interceptor> outgoingInterceptors,
TopologyResponseHandler topologyResponseHandler) {
this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);
this.topologyResponseHandler = topologyResponseHandler;
@ -510,7 +510,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
}
}
protected PacketDecoder getPacketDecoder() {
protected PacketDecoder createPacketDecoder() {
return ClientPacketDecoder.INSTANCE;
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
@ -53,6 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@ -83,16 +85,34 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ServerPacketDecoder extends ClientPacketDecoder {
private static final int UUID_LENGTH = 36;
private static final int DEFAULT_INTERNER_CAPACITY = 32;
private static final long serialVersionUID = 3348673114388400766L;
public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
private SimpleString.Interner keysInterner;
private TypedProperties.StringValue.Interner valuesInterner;
private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
public ServerPacketDecoder() {
this.keysInterner = null;
this.valuesInterner = null;
}
private void initializeInternersIfNeeded() {
if (this.keysInterner == null) {
this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
}
if (this.valuesInterner == null) {
this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
}
}
private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final SessionSendMessage sendMessage;
initializeInternersIfNeeded();
if (connection.isVersionBeforeAddressChange()) {
sendMessage = new SessionSendMessage_1X(new CoreMessage());
sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner));
} else {
sendMessage = new SessionSendMessage(new CoreMessage());
sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner));
}
sendMessage.decode(in);
@ -259,5 +279,4 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
return packet;
}
}

View File

@ -116,7 +116,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
Executor connectionExecutor = server.getExecutorFactory().getExecutor();
final CoreRemotingConnection rc = new RemotingConnectionImpl(ServerPacketDecoder.INSTANCE, connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID());
final CoreRemotingConnection rc = new RemotingConnectionImpl(new ServerPacketDecoder(), connection, incomingInterceptors, outgoingInterceptors, config.isAsyncConnectionExecutionEnabled() ? connectionExecutor : null, server.getNodeID());
Channel channel1 = rc.getChannel(CHANNEL_ID.SESSION.id, -1);

View File

@ -65,8 +65,8 @@ public class ActiveMQServerSideProtocolManagerFactory implements ClientProtocolM
class ActiveMQReplicationProtocolManager extends ActiveMQClientProtocolManager {
@Override
protected PacketDecoder getPacketDecoder() {
return ServerPacketDecoder.INSTANCE;
protected PacketDecoder createPacketDecoder() {
return new ServerPacketDecoder();
}
}
}