diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java index 71d4f5f2f0..54fb53facc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java @@ -29,6 +29,11 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; */ public interface ICoreMessage extends Message { + /** The buffer will belong to this message, until release is called. */ + Message setBuffer(ByteBuf buffer); + + ByteBuf getBuffer(); + LargeBodyReader getLargeBodyReader() throws ActiveMQException; int getHeadersAndPropertiesEncodeSize(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 5b53be8012..aba66d6f52 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -298,11 +298,6 @@ public interface Message { Message setReplyTo(SimpleString address); - /** The buffer will belong to this message, until release is called. */ - Message setBuffer(ByteBuf buffer); - - ByteBuf getBuffer(); - /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ Message copy(); @@ -462,14 +457,6 @@ public interface Message { void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools); - default void releaseBuffer() { - ByteBuf buffer = getBuffer(); - if (buffer != null) { - buffer.release(); - } - setBuffer(null); - } - default void reencode() { // only valid probably on AMQP } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java index 936e0932ca..bdcb170189 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java @@ -130,12 +130,10 @@ public class MessageInternalImpl implements MessageInternal { * * @param buffer */ - @Override public Message setBuffer(ByteBuf buffer) { throw new UnsupportedOperationException(); } - @Override public ByteBuf getBuffer() { return message.getBuffer(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 247a3ecc58..4f0c4d6005 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -103,8 +103,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage */ private Boolean fileDurable; - private volatile AmqpReadableBuffer parsingData; - private StorageManager storageManager; /** this is used to parse the initial packets from the buffer */ @@ -306,11 +304,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage @Override public ReadableBuffer getData() { - if (parsingData == null) { - throw new RuntimeException("AMQP Large Message is not open"); - } - - return parsingData; + throw new UnsupportedOperationException("Method not supported with Large Messages"); } public void parseHeader(ReadableBuffer buffer) { @@ -406,11 +400,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } } - @Override - public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) { - return getData().rewind(); - } - @Override public Message toMessage() { return this; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 38d48528d5..3926fa5d94 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -26,7 +26,6 @@ import java.util.Set; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; @@ -812,26 +811,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. modified = true; } - @Override - public final ByteBuf getBuffer() { - if (getData() == null) { - return null; - } else { - if (getData() instanceof NettyReadable) { - return ((NettyReadable) getData()).getByteBuf(); - } else { - return Unpooled.wrappedBuffer(getData().byteBuffer()); - } - } - } - - @Override - public final AMQPMessage setBuffer(ByteBuf buffer) { - // If this is ever called we would be in a highly unfortunate state - //this.data = null; - return this; - } - @Override public abstract int getEncodeSize(); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java index a2a6f23c13..01149cd024 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManagerTest.java @@ -258,7 +258,7 @@ public class MQTTRetainMessageManagerTest { final LinkedListIterator browserIterator = queue.browserIterator(); browserIterator.forEachRemaining(messageReference -> { final Message message = messageReference.getMessage(); - final String body = message.getBuffer().toString(StandardCharsets.UTF_8); + final String body = message.toCore().getBuffer().toString(StandardCharsets.UTF_8); log.infof("[MQTT][%s][%s][%s]", retainAddress, message, body); }); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java index 1d88a40abe..506b9f5503 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java @@ -68,21 +68,11 @@ public class OpenwireMessage implements Message { return null; } - @Override - public Message setBuffer(ByteBuf buffer) { - return null; - } - @Override public int getDurableCount() { return 0; } - @Override - public ByteBuf getBuffer() { - return null; - } - @Override public Message copy() { return null; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index f3b215f349..a9e506f095 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -431,15 +431,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } - @Override - public Message setBuffer(ByteBuf buffer) { - return null; - } - - @Override - public ByteBuf getBuffer() { - return null; - } @Override public Message setAddress(String address) { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index 6b6eb10552..237e770b01 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -500,16 +500,6 @@ public class AcknowledgeTest extends ActiveMQTestBase { return null; } - @Override - public Message setBuffer(ByteBuf buffer) { - return null; - } - - @Override - public ByteBuf getBuffer() { - return null; - } - @Override public Message setAddress(String address) { return null;