This commit is contained in:
Clebert Suconic 2017-03-29 09:59:09 -04:00
commit ea01aeb65e
3 changed files with 63 additions and 2 deletions

View File

@ -18,8 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.util.Arrays; import java.util.Arrays;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
public abstract class SessionContinuationMessage extends PacketImpl { public abstract class SessionContinuationMessage extends PacketImpl {
@ -61,6 +64,51 @@ public abstract class SessionContinuationMessage extends PacketImpl {
return continues; return continues;
} }
/**
* Returns the exact expected encoded size of {@code this} packet.
* It will be used to allocate the proper encoding buffer in {@link #createPacket}, hence any
* wrong value will result in a thrown exception or a resize of the encoding
* buffer during the encoding process, depending to the implementation of {@link #createPacket}.
* Any child of {@code this} class are required to override this method if their encoded size is changed
* from the base class.
*
* @return the size in bytes of the expected encoded packet
*/
protected int expectedEncodedSize() {
return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length);
}
@Override
public final ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
final ActiveMQBuffer buffer = createPacket(connection, usePooled);
// The standard header fields
buffer.writeInt(0); // The length gets filled in at the end
buffer.writeByte(getType());
buffer.writeLong(channelID);
encodeRest(buffer);
size = buffer.writerIndex();
// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
return buffer;
}
protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
final int expectedEncodedSize = expectedEncodedSize();
if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
} else {
return connection.createTransportBuffer(expectedEncodedSize, usePooled);
}
}
@Override @Override
public void encodeRest(final ActiveMQBuffer buffer) { public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(body.length); buffer.writeInt(body.length);
@ -110,4 +158,4 @@ public abstract class SessionContinuationMessage extends PacketImpl {
return true; return true;
} }
} }

View File

@ -67,6 +67,13 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
return consumerID; return consumerID;
} }
// Protected -----------------------------------------------------
@Override
protected final int expectedEncodedSize() {
return super.expectedEncodedSize() + DataConstants.SIZE_LONG;
}
// Public -------------------------------------------------------- // Public --------------------------------------------------------
@Override @Override
@ -121,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
return true; return true;
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.utils.DataConstants;
/** /**
* A SessionSendContinuationMessage<br> * A SessionSendContinuationMessage<br>
@ -91,6 +92,11 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
return message; return message;
} }
@Override
protected final int expectedEncodedSize() {
return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
}
@Override @Override
public void encodeRest(final ActiveMQBuffer buffer) { public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer); super.encodeRest(buffer);