ARTEMIS-1036 Streaming huge messages would cause OME
This commit is contained in:
parent
f798178c6c
commit
759d3b78d9
|
@ -18,8 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.utils.DataConstants;
|
import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
|
|
||||||
public abstract class SessionContinuationMessage extends PacketImpl {
|
public abstract class SessionContinuationMessage extends PacketImpl {
|
||||||
|
@ -61,6 +64,30 @@ public abstract class SessionContinuationMessage extends PacketImpl {
|
||||||
return continues;
|
return continues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the exact expected encoded size of {@code this} packet.
|
||||||
|
* It will be used to allocate the proper encoding buffer in {@link #createPacket}, hence any
|
||||||
|
* wrong value will result in a thrown exception or a resize of the encoding
|
||||||
|
* buffer during the encoding process, depending to the implementation of {@link #createPacket}.
|
||||||
|
* Any child of {@code this} class are required to override this method if their encoded size is changed
|
||||||
|
* from the base class.
|
||||||
|
*
|
||||||
|
* @return the size in bytes of the expected encoded packet
|
||||||
|
*/
|
||||||
|
protected int expectedEncodedSize() {
|
||||||
|
return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
|
||||||
|
final int expectedEncodedSize = expectedEncodedSize();
|
||||||
|
if (connection == null) {
|
||||||
|
return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
|
||||||
|
} else {
|
||||||
|
return connection.createTransportBuffer(expectedEncodedSize, usePooled);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||||
buffer.writeInt(body.length);
|
buffer.writeInt(body.length);
|
||||||
|
|
|
@ -67,6 +67,13 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
|
||||||
return consumerID;
|
return consumerID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Protected -----------------------------------------------------
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final int expectedEncodedSize() {
|
||||||
|
return super.expectedEncodedSize() + DataConstants.SIZE_LONG;
|
||||||
|
}
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||||
|
import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A SessionSendContinuationMessage<br>
|
* A SessionSendContinuationMessage<br>
|
||||||
|
@ -91,6 +92,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final int expectedEncodedSize() {
|
||||||
|
return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||||
super.encodeRest(buffer);
|
super.encodeRest(buffer);
|
||||||
|
|
Loading…
Reference in New Issue