From ec3ed04d0868ab9123926cbff7f3baab5280e5ff Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 15 Mar 2017 16:59:57 +0100 Subject: [PATCH] ARTEMIS-1036 Streaming huge messages between cluster nodes causes java.lang.OutOfMemoryError: Direct buffer memory (cherry picked from commit 1686b3545d14cdf591e00e6d04228b48b2b74a9f) --- .../SessionContinuationMessage.java | 50 ++++++++++++++++++- .../SessionReceiveContinuationMessage.java | 9 +++- .../SessionSendContinuationMessage.java | 6 +++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java index fcdd943457..faeed08ae1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java @@ -18,8 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import java.util.Arrays; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; public abstract class SessionContinuationMessage extends PacketImpl { @@ -61,6 +64,51 @@ public abstract class SessionContinuationMessage extends PacketImpl { return continues; } + /** + * Returns the exact expected encoded size of {@code this} packet. + * It will be used to allocate the proper encoding buffer in {@link #createPacket}, hence any + * wrong value will result in a thrown exception or a resize of the encoding + * buffer during the encoding process, depending to the implementation of {@link #createPacket}. + * Any child of {@code this} class are required to override this method if their encoded size is changed + * from the base class. + * + * @return the size in bytes of the expected encoded packet + */ + protected int expectedEncodedSize() { + return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length); + } + + @Override + public final ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) { + final ActiveMQBuffer buffer = createPacket(connection, usePooled); + + // The standard header fields + + buffer.writeInt(0); // The length gets filled in at the end + buffer.writeByte(getType()); + buffer.writeLong(channelID); + + encodeRest(buffer); + + size = buffer.writerIndex(); + + // The length doesn't include the actual length byte + int len = size - DataConstants.SIZE_INT; + + buffer.setInt(0, len); + + return buffer; + } + + protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { + final int expectedEncodedSize = expectedEncodedSize(); + if (connection == null) { + return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize)); + } else { + return connection.createTransportBuffer(expectedEncodedSize, usePooled); + } + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(body.length); @@ -110,4 +158,4 @@ public abstract class SessionContinuationMessage extends PacketImpl { return true; } -} +} \ No newline at end of file diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java index 9141ae12b0..44ad1bbf49 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java @@ -67,6 +67,13 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag return consumerID; } + // Protected ----------------------------------------------------- + + @Override + protected final int expectedEncodedSize() { + return super.expectedEncodedSize() + DataConstants.SIZE_LONG; + } + // Public -------------------------------------------------------- @Override @@ -121,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag return true; } -} +} \ No newline at end of file diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index b4ec027e4b..e718b3dc93 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.utils.DataConstants; /** * A SessionSendContinuationMessage
@@ -91,6 +92,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { return message; } + @Override + protected final int expectedEncodedSize() { + return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN; + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer);