From ad4f6a133af9130f206a78e43ad387d4e3ee5f8f Mon Sep 17 00:00:00 2001 From: franz1981 Date: Sun, 6 Dec 2020 00:07:29 +0100 Subject: [PATCH] ARTEMIS-3021 OOM due to wrong CORE clustered message memory estimation --- .../activemq/artemis/utils/ByteUtil.java | 10 +++ .../utils/collections/TypedProperties.java | 14 +++- .../activemq/artemis/utils/ByteUtilTest.java | 28 +++++++- .../core/message/impl/CoreMessage.java | 71 +++++++++++++------ .../artemis/message/CoreMessageTest.java | 31 ++++++++ .../persistence/impl/journal/LargeBody.java | 2 + .../impl/journal/LargeServerMessageImpl.java | 16 +++-- 7 files changed, 142 insertions(+), 30 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index 11e895f852..8c5fec9e6d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -389,6 +389,16 @@ public class ByteUtil { return true; } + /** + * This ensure a more exact resizing then {@link ByteBuf#ensureWritable(int)}, if needed.
+ * It won't try to trim a large enough buffer. + */ + public static void ensureExactWritable(ByteBuf buffer, int minWritableBytes) { + if (buffer.maxFastWritableBytes() < minWritableBytes) { + buffer.capacity(buffer.writerIndex() + minWritableBytes); + } + } + /** * Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s}, diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index cde5e1e783..ac7a09047f 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.utils.AbstractByteBufPool; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; +import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable; import static org.apache.activemq.artemis.utils.DataConstants.BOOLEAN; import static org.apache.activemq.artemis.utils.DataConstants.BYTE; import static org.apache.activemq.artemis.utils.DataConstants.BYTES; @@ -546,11 +547,18 @@ public class TypedProperties { decode(buffer, null); } - - public synchronized void encode(final ByteBuf buffer) { + public synchronized int encode(final ByteBuf buffer) { + final int encodedSize; + // it's a trick to not pay the cost of buffer.writeIndex without assertions enabled + int writerIndex = 0; + assert (writerIndex = buffer.writerIndex()) >= 0 : "Always true"; if (properties == null || size == 0) { + encodedSize = DataConstants.SIZE_BYTE; + ensureExactWritable(buffer, encodedSize); buffer.writeByte(DataConstants.NULL); } else { + encodedSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size; + ensureExactWritable(buffer, encodedSize); buffer.writeByte(DataConstants.NOT_NULL); buffer.writeInt(properties.size()); @@ -563,6 +571,8 @@ public class TypedProperties { value.write(buffer); }); } + assert buffer.writerIndex() == (writerIndex + encodedSize) : "Bad encode size estimation"; + return encodedSize; } public synchronized int getEncodeSize() { diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java index 83fab14938..b4eea724ed 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.util.internal.PlatformDependent; import org.jboss.logging.Logger; import org.junit.Assert; @@ -37,7 +39,7 @@ import static org.junit.Assert.fail; public class ByteUtilTest { - private static Logger log = Logger.getLogger(ByteUtilTest.class); + private static final Logger log = Logger.getLogger(ByteUtilTest.class); @Test public void testBytesToString() { @@ -409,5 +411,29 @@ public class ByteUtilTest { assertArrayEquals(assertContent, convertedContent); } + @Test(expected = IllegalArgumentException.class) + public void shouldEnsureExactWritableFailToEnlargeWrappedByteBuf() { + byte[] wrapped = new byte[32]; + ByteBuf buffer = Unpooled.wrappedBuffer(wrapped); + buffer.writerIndex(wrapped.length); + ByteUtil.ensureExactWritable(buffer, 1); + } + + @Test + public void shouldEnsureExactWritableNotEnlargeBufferWithEnoughSpace() { + byte[] wrapped = new byte[32]; + ByteBuf buffer = Unpooled.wrappedBuffer(wrapped); + buffer.writerIndex(wrapped.length - 1); + ByteUtil.ensureExactWritable(buffer, 1); + Assert.assertSame(wrapped, buffer.array()); + } + + @Test + public void shouldEnsureExactWritableEnlargeBufferWithoutEnoughSpace() { + ByteBuf buffer = Unpooled.buffer(32); + buffer.writerIndex(32); + ByteUtil.ensureExactWritable(buffer, 1); + Assert.assertEquals(33, buffer.capacity()); + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 3b21011a7e..6c55c3f075 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -60,13 +60,15 @@ import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable; + /** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple * consumers */ public class CoreMessage extends RefCountMessage implements ICoreMessage { public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; - private volatile int memoryEstimate = -1; + protected volatile int memoryEstimate = -1; private static final Logger logger = Logger.getLogger(CoreMessage.class); @@ -81,8 +83,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer; - Object body; - protected int endOfBodyPosition = -1; protected int messageIDPosition = -1; @@ -445,7 +445,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to // many subscriptions and bridging to other nodes in a cluster synchronized (other) { - this.body = other.body; this.endOfBodyPosition = other.endOfBodyPosition; internalSetMessageID(other.messageID); this.address = other.address; @@ -641,6 +640,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { @Override public int getMemoryEstimate() { if (memoryEstimate == -1) { + if (buffer != null && !isLargeMessage()) { + if (!validBuffer) { + // this can happen if a message is modified + // eg clustered messages get additional routing information + // that need to be correctly accounted in memory + checkEncode(); + } + } + final TypedProperties properties = this.properties; memoryEstimate = memoryOffset + (buffer != null ? buffer.capacity() : 0) + (properties != null ? properties.getMemoryOffset() : 0); @@ -733,11 +741,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { endOfBodyPosition = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT; } - buffer.setIndex(0, 0); - buffer.writeInt(endOfBodyPosition); - + buffer.setInt(0, endOfBodyPosition); // The end of body position - buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT); + buffer.setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT); encodeHeadersAndProperties(buffer); @@ -748,21 +754,42 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { public void encodeHeadersAndProperties(final ByteBuf buffer) { final TypedProperties properties = getProperties(); - messageIDPosition = buffer.writerIndex(); - buffer.writeLong(messageID); - SimpleString.writeNullableSimpleString(buffer, address); - if (userID == null) { - buffer.writeByte(DataConstants.NULL); - } else { - buffer.writeByte(DataConstants.NOT_NULL); - buffer.writeBytes(userID.asBytes()); + final int initialWriterIndex = buffer.writerIndex(); + messageIDPosition = initialWriterIndex; + final UUID userID = this.userID; + final int userIDEncodedSize = userID == null ? Byte.BYTES : Byte.BYTES + userID.asBytes().length; + final SimpleString address = this.address; + final int addressEncodedBytes = SimpleString.sizeofNullableString(address); + final int headersSize = + Long.BYTES + // messageID + addressEncodedBytes + // address + userIDEncodedSize + // userID + Byte.BYTES + // type + Byte.BYTES + // durable + Long.BYTES + // expiration + Long.BYTES + // timestamp + Byte.BYTES; // priority + synchronized (properties) { + final int propertiesEncodeSize = properties.getEncodeSize(); + final int totalEncodedSize = headersSize + propertiesEncodeSize; + ensureExactWritable(buffer, totalEncodedSize); + buffer.writeLong(messageID); + SimpleString.writeNullableSimpleString(buffer, address); + if (userID == null) { + buffer.writeByte(DataConstants.NULL); + } else { + buffer.writeByte(DataConstants.NOT_NULL); + buffer.writeBytes(userID.asBytes()); + } + buffer.writeByte(type); + buffer.writeBoolean(durable); + buffer.writeLong(expiration); + buffer.writeLong(timestamp); + buffer.writeByte(priority); + assert buffer.writerIndex() == initialWriterIndex + headersSize : "Bad Headers encode size estimation"; + final int realPropertiesEncodeSize = properties.encode(buffer); + assert realPropertiesEncodeSize == propertiesEncodeSize : "TypedProperties has a wrong encode size estimation or is being modified concurrently"; } - buffer.writeByte(type); - buffer.writeBoolean(durable); - buffer.writeLong(expiration); - buffer.writeLong(timestamp); - buffer.writeByte(priority); - properties.encode(buffer); } @Override diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java index c52daa411b..ef0ef049f6 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java @@ -38,6 +38,9 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; + public class CoreMessageTest { public static final SimpleString ADDRESS = new SimpleString("this.local.address"); @@ -359,6 +362,34 @@ public class CoreMessageTest { } + @Test + public void testMemoryEstimateChangedAfterModifiedAlreadyEncodedCopy() { + final CoreMessage msg = new CoreMessage(1, 44); + msg.getEncodeSize(); + final int memoryEstimate = msg.getMemoryEstimate(); + final CoreMessage copy = (CoreMessage) msg.copy(2); + copy.getEncodeSize(); + copy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, new byte[Long.BYTES]); + final int increasedMemoryFootprint = copy.getMemoryEstimate() - memoryEstimate; + final int increasedPropertyFootprint = copy.getProperties().getMemoryOffset() - msg.getProperties().getMemoryOffset(); + assertThat("memory estimation isn't accounting for the additional encoded property", + increasedMemoryFootprint, greaterThan(increasedPropertyFootprint)); + } + + @Test + public void testMessageBufferCapacityMatchEncodedSizeAfterModifiedCopy() { + final CoreMessage msg = new CoreMessage(1, 4155); + msg.setAddress("a"); + msg.putBytesProperty("_", new byte[4096]); + final CoreMessage copy = (CoreMessage) msg.copy(2); + Assert.assertEquals(msg.getEncodeSize(), copy.getBuffer().capacity()); + copy.setAddress("b"); + copy.setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, "a"); + copy.setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, msg.getAddressSimpleString()); + copy.setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, msg.getMessageID()); + Assert.assertEquals(copy.getEncodeSize(), copy.getBuffer().capacity()); + } + private void printVariable(String body, String encode) { System.out.println("// body = \"" + body + "\";"); System.out.println("private final String STRING_ENCODE = \"" + encode + "\";"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java index 1eacecd1bd..e394660c1b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java @@ -37,6 +37,8 @@ import org.jboss.logging.Logger; public class LargeBody { + static final int MEMORY_OFFSET = 56; + private static final Logger logger = Logger.getLogger(LargeBody.class); private long bodySize = -1; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 267e1efb61..6b9e9f1366 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -33,6 +33,11 @@ import org.jboss.logging.Logger; public final class LargeServerMessageImpl extends CoreMessage implements CoreLargeServerMessage { + // Given that LargeBody is never null it needs to be accounted on this instance footprint. + // This value has been computed using https://github.com/openjdk/jol + // with HotSpot 64-bit COOPS 8-byte align + private static final int MEMORY_OFFSET = 112 + LargeBody.MEMORY_OFFSET; + @Override public Message toMessage() { return this; @@ -73,9 +78,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar private final StorageManager storageManager; - // We cache this - private volatile int memoryEstimate = -1; - public LargeServerMessageImpl(final StorageManager storageManager) { largeBody = new LargeBody(this, storageManager); this.storageManager = storageManager; @@ -243,8 +245,12 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar public int getMemoryEstimate() { synchronized (largeBody) { if (memoryEstimate == -1) { - // The body won't be on memory (aways on-file), so we don't consider this for paging - memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + getEncodeSize() + (16 + 4) * 2 + 1; + // The body won't be on memory (always on-file), so we don't consider this for paging + memoryEstimate = MEMORY_OFFSET + + getHeadersAndPropertiesEncodeSize() + + DataConstants.SIZE_INT + + getEncodeSize() + + (16 + 4) * 2 + 1; } return memoryEstimate;