ARTEMIS-3021 OOM due to wrong CORE clustered message memory estimation

This commit is contained in:
franz1981 2020-12-06 00:07:29 +01:00
parent 12a93e3c7e
commit ad4f6a133a
7 changed files with 142 additions and 30 deletions

View File

@ -389,6 +389,16 @@ public class ByteUtil {
return true;
}
/**
* This ensure a more exact resizing then {@link ByteBuf#ensureWritable(int)}, if needed.<br>
* It won't try to trim a large enough buffer.
*/
public static void ensureExactWritable(ByteBuf buffer, int minWritableBytes) {
if (buffer.maxFastWritableBytes() < minWritableBytes) {
buffer.capacity(buffer.writerIndex() + minWritableBytes);
}
}
/**
* Returns {@code true} if the {@link SimpleString} encoded content into {@code bytes} is equals to {@code s},

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.utils.AbstractByteBufPool;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable;
import static org.apache.activemq.artemis.utils.DataConstants.BOOLEAN;
import static org.apache.activemq.artemis.utils.DataConstants.BYTE;
import static org.apache.activemq.artemis.utils.DataConstants.BYTES;
@ -546,11 +547,18 @@ public class TypedProperties {
decode(buffer, null);
}
public synchronized void encode(final ByteBuf buffer) {
public synchronized int encode(final ByteBuf buffer) {
final int encodedSize;
// it's a trick to not pay the cost of buffer.writeIndex without assertions enabled
int writerIndex = 0;
assert (writerIndex = buffer.writerIndex()) >= 0 : "Always true";
if (properties == null || size == 0) {
encodedSize = DataConstants.SIZE_BYTE;
ensureExactWritable(buffer, encodedSize);
buffer.writeByte(DataConstants.NULL);
} else {
encodedSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + size;
ensureExactWritable(buffer, encodedSize);
buffer.writeByte(DataConstants.NOT_NULL);
buffer.writeInt(properties.size());
@ -563,6 +571,8 @@ public class TypedProperties {
value.write(buffer);
});
}
assert buffer.writerIndex() == (writerIndex + encodedSize) : "Bad encode size estimation";
return encodedSize;
}
public synchronized int getEncodeSize() {

View File

@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import org.jboss.logging.Logger;
import org.junit.Assert;
@ -37,7 +39,7 @@ import static org.junit.Assert.fail;
public class ByteUtilTest {
private static Logger log = Logger.getLogger(ByteUtilTest.class);
private static final Logger log = Logger.getLogger(ByteUtilTest.class);
@Test
public void testBytesToString() {
@ -409,5 +411,29 @@ public class ByteUtilTest {
assertArrayEquals(assertContent, convertedContent);
}
@Test(expected = IllegalArgumentException.class)
public void shouldEnsureExactWritableFailToEnlargeWrappedByteBuf() {
byte[] wrapped = new byte[32];
ByteBuf buffer = Unpooled.wrappedBuffer(wrapped);
buffer.writerIndex(wrapped.length);
ByteUtil.ensureExactWritable(buffer, 1);
}
@Test
public void shouldEnsureExactWritableNotEnlargeBufferWithEnoughSpace() {
byte[] wrapped = new byte[32];
ByteBuf buffer = Unpooled.wrappedBuffer(wrapped);
buffer.writerIndex(wrapped.length - 1);
ByteUtil.ensureExactWritable(buffer, 1);
Assert.assertSame(wrapped, buffer.array());
}
@Test
public void shouldEnsureExactWritableEnlargeBufferWithoutEnoughSpace() {
ByteBuf buffer = Unpooled.buffer(32);
buffer.writerIndex(32);
ByteUtil.ensureExactWritable(buffer, 1);
Assert.assertEquals(33, buffer.capacity());
}
}

View File

@ -60,13 +60,15 @@ import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.utils.ByteUtil.ensureExactWritable;
/** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple
* consumers */
public class CoreMessage extends RefCountMessage implements ICoreMessage {
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
private volatile int memoryEstimate = -1;
protected volatile int memoryEstimate = -1;
private static final Logger logger = Logger.getLogger(CoreMessage.class);
@ -81,8 +83,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
protected volatile ResetLimitWrappedActiveMQBuffer writableBuffer;
Object body;
protected int endOfBodyPosition = -1;
protected int messageIDPosition = -1;
@ -445,7 +445,6 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
// many subscriptions and bridging to other nodes in a cluster
synchronized (other) {
this.body = other.body;
this.endOfBodyPosition = other.endOfBodyPosition;
internalSetMessageID(other.messageID);
this.address = other.address;
@ -641,6 +640,15 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
@Override
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
if (buffer != null && !isLargeMessage()) {
if (!validBuffer) {
// this can happen if a message is modified
// eg clustered messages get additional routing information
// that need to be correctly accounted in memory
checkEncode();
}
}
final TypedProperties properties = this.properties;
memoryEstimate = memoryOffset +
(buffer != null ? buffer.capacity() : 0) +
(properties != null ? properties.getMemoryOffset() : 0);
@ -733,11 +741,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
endOfBodyPosition = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
}
buffer.setIndex(0, 0);
buffer.writeInt(endOfBodyPosition);
buffer.setInt(0, endOfBodyPosition);
// The end of body position
buffer.writerIndex(endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
buffer.setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
encodeHeadersAndProperties(buffer);
@ -748,21 +754,42 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
public void encodeHeadersAndProperties(final ByteBuf buffer) {
final TypedProperties properties = getProperties();
messageIDPosition = buffer.writerIndex();
buffer.writeLong(messageID);
SimpleString.writeNullableSimpleString(buffer, address);
if (userID == null) {
buffer.writeByte(DataConstants.NULL);
} else {
buffer.writeByte(DataConstants.NOT_NULL);
buffer.writeBytes(userID.asBytes());
final int initialWriterIndex = buffer.writerIndex();
messageIDPosition = initialWriterIndex;
final UUID userID = this.userID;
final int userIDEncodedSize = userID == null ? Byte.BYTES : Byte.BYTES + userID.asBytes().length;
final SimpleString address = this.address;
final int addressEncodedBytes = SimpleString.sizeofNullableString(address);
final int headersSize =
Long.BYTES + // messageID
addressEncodedBytes + // address
userIDEncodedSize + // userID
Byte.BYTES + // type
Byte.BYTES + // durable
Long.BYTES + // expiration
Long.BYTES + // timestamp
Byte.BYTES; // priority
synchronized (properties) {
final int propertiesEncodeSize = properties.getEncodeSize();
final int totalEncodedSize = headersSize + propertiesEncodeSize;
ensureExactWritable(buffer, totalEncodedSize);
buffer.writeLong(messageID);
SimpleString.writeNullableSimpleString(buffer, address);
if (userID == null) {
buffer.writeByte(DataConstants.NULL);
} else {
buffer.writeByte(DataConstants.NOT_NULL);
buffer.writeBytes(userID.asBytes());
}
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
buffer.writeLong(timestamp);
buffer.writeByte(priority);
assert buffer.writerIndex() == initialWriterIndex + headersSize : "Bad Headers encode size estimation";
final int realPropertiesEncodeSize = properties.encode(buffer);
assert realPropertiesEncodeSize == propertiesEncodeSize : "TypedProperties has a wrong encode size estimation or is being modified concurrently";
}
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
buffer.writeLong(timestamp);
buffer.writeByte(priority);
properties.encode(buffer);
}
@Override

View File

@ -38,6 +38,9 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
public class CoreMessageTest {
public static final SimpleString ADDRESS = new SimpleString("this.local.address");
@ -359,6 +362,34 @@ public class CoreMessageTest {
}
@Test
public void testMemoryEstimateChangedAfterModifiedAlreadyEncodedCopy() {
final CoreMessage msg = new CoreMessage(1, 44);
msg.getEncodeSize();
final int memoryEstimate = msg.getMemoryEstimate();
final CoreMessage copy = (CoreMessage) msg.copy(2);
copy.getEncodeSize();
copy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, new byte[Long.BYTES]);
final int increasedMemoryFootprint = copy.getMemoryEstimate() - memoryEstimate;
final int increasedPropertyFootprint = copy.getProperties().getMemoryOffset() - msg.getProperties().getMemoryOffset();
assertThat("memory estimation isn't accounting for the additional encoded property",
increasedMemoryFootprint, greaterThan(increasedPropertyFootprint));
}
@Test
public void testMessageBufferCapacityMatchEncodedSizeAfterModifiedCopy() {
final CoreMessage msg = new CoreMessage(1, 4155);
msg.setAddress("a");
msg.putBytesProperty("_", new byte[4096]);
final CoreMessage copy = (CoreMessage) msg.copy(2);
Assert.assertEquals(msg.getEncodeSize(), copy.getBuffer().capacity());
copy.setAddress("b");
copy.setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, "a");
copy.setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, msg.getAddressSimpleString());
copy.setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, msg.getMessageID());
Assert.assertEquals(copy.getEncodeSize(), copy.getBuffer().capacity());
}
private void printVariable(String body, String encode) {
System.out.println("// body = \"" + body + "\";");
System.out.println("private final String STRING_ENCODE = \"" + encode + "\";");

View File

@ -37,6 +37,8 @@ import org.jboss.logging.Logger;
public class LargeBody {
static final int MEMORY_OFFSET = 56;
private static final Logger logger = Logger.getLogger(LargeBody.class);
private long bodySize = -1;

View File

@ -33,6 +33,11 @@ import org.jboss.logging.Logger;
public final class LargeServerMessageImpl extends CoreMessage implements CoreLargeServerMessage {
// Given that LargeBody is never null it needs to be accounted on this instance footprint.
// This value has been computed using https://github.com/openjdk/jol
// with HotSpot 64-bit COOPS 8-byte align
private static final int MEMORY_OFFSET = 112 + LargeBody.MEMORY_OFFSET;
@Override
public Message toMessage() {
return this;
@ -73,9 +78,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
private final StorageManager storageManager;
// We cache this
private volatile int memoryEstimate = -1;
public LargeServerMessageImpl(final StorageManager storageManager) {
largeBody = new LargeBody(this, storageManager);
this.storageManager = storageManager;
@ -243,8 +245,12 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
public int getMemoryEstimate() {
synchronized (largeBody) {
if (memoryEstimate == -1) {
// The body won't be on memory (aways on-file), so we don't consider this for paging
memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + getEncodeSize() + (16 + 4) * 2 + 1;
// The body won't be on memory (always on-file), so we don't consider this for paging
memoryEstimate = MEMORY_OFFSET +
getHeadersAndPropertiesEncodeSize() +
DataConstants.SIZE_INT +
getEncodeSize() +
(16 + 4) * 2 + 1;
}
return memoryEstimate;