From 759d3b78d98001e3709fdca94676d021909eb328 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 15 Mar 2017 16:59:57 +0100 Subject: [PATCH] ARTEMIS-1036 Streaming huge messages would cause OME --- .../SessionContinuationMessage.java | 29 ++++++++++++++++++- .../SessionReceiveContinuationMessage.java | 9 +++++- .../SessionSendContinuationMessage.java | 8 ++++- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java index fcdd943457..a57cdb4c02 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java @@ -18,8 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import java.util.Arrays; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; public abstract class SessionContinuationMessage extends PacketImpl { @@ -61,6 +64,30 @@ public abstract class SessionContinuationMessage extends PacketImpl { return continues; } + /** + * Returns the exact expected encoded size of {@code this} packet. + * It will be used to allocate the proper encoding buffer in {@link #createPacket}, hence any + * wrong value will result in a thrown exception or a resize of the encoding + * buffer during the encoding process, depending to the implementation of {@link #createPacket}. + * Any child of {@code this} class are required to override this method if their encoded size is changed + * from the base class. + * + * @return the size in bytes of the expected encoded packet + */ + protected int expectedEncodedSize() { + return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length); + } + + @Override + protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) { + final int expectedEncodedSize = expectedEncodedSize(); + if (connection == null) { + return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize)); + } else { + return connection.createTransportBuffer(expectedEncodedSize, usePooled); + } + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(body.length); @@ -110,4 +137,4 @@ public abstract class SessionContinuationMessage extends PacketImpl { return true; } -} +} \ No newline at end of file diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java index 9141ae12b0..44ad1bbf49 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java @@ -67,6 +67,13 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag return consumerID; } + // Protected ----------------------------------------------------- + + @Override + protected final int expectedEncodedSize() { + return super.expectedEncodedSize() + DataConstants.SIZE_LONG; + } + // Public -------------------------------------------------------- @Override @@ -121,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag return true; } -} +} \ No newline at end of file diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index 0ecfe33389..1c600e9e18 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.utils.DataConstants; /** * A SessionSendContinuationMessage
@@ -91,6 +92,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { return message; } + @Override + protected final int expectedEncodedSize() { + return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN; + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); @@ -154,4 +160,4 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { public SendAcknowledgementHandler getHandler() { return handler; } -} +} \ No newline at end of file