mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-21 01:15:50 +00:00
ARTEMIS-3482 Removing non used methods and some reorg on message methods
This commit is contained in:
parent
6ec4ab6766
commit
c3b403a980
@ -29,6 +29,11 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
*/
|
||||
public interface ICoreMessage extends Message {
|
||||
|
||||
/** The buffer will belong to this message, until release is called. */
|
||||
Message setBuffer(ByteBuf buffer);
|
||||
|
||||
ByteBuf getBuffer();
|
||||
|
||||
LargeBodyReader getLargeBodyReader() throws ActiveMQException;
|
||||
|
||||
int getHeadersAndPropertiesEncodeSize();
|
||||
|
@ -298,11 +298,6 @@ public interface Message {
|
||||
|
||||
Message setReplyTo(SimpleString address);
|
||||
|
||||
/** The buffer will belong to this message, until release is called. */
|
||||
Message setBuffer(ByteBuf buffer);
|
||||
|
||||
ByteBuf getBuffer();
|
||||
|
||||
/** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
|
||||
Message copy();
|
||||
|
||||
@ -462,14 +457,6 @@ public interface Message {
|
||||
|
||||
void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);
|
||||
|
||||
default void releaseBuffer() {
|
||||
ByteBuf buffer = getBuffer();
|
||||
if (buffer != null) {
|
||||
buffer.release();
|
||||
}
|
||||
setBuffer(null);
|
||||
}
|
||||
|
||||
default void reencode() {
|
||||
// only valid probably on AMQP
|
||||
}
|
||||
|
@ -130,12 +130,10 @@ public class MessageInternalImpl implements MessageInternal {
|
||||
*
|
||||
* @param buffer
|
||||
*/
|
||||
@Override
|
||||
public Message setBuffer(ByteBuf buffer) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf getBuffer() {
|
||||
return message.getBuffer();
|
||||
}
|
||||
|
@ -103,8 +103,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
||||
*/
|
||||
private Boolean fileDurable;
|
||||
|
||||
private volatile AmqpReadableBuffer parsingData;
|
||||
|
||||
private StorageManager storageManager;
|
||||
|
||||
/** this is used to parse the initial packets from the buffer */
|
||||
@ -306,11 +304,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
||||
|
||||
@Override
|
||||
public ReadableBuffer getData() {
|
||||
if (parsingData == null) {
|
||||
throw new RuntimeException("AMQP Large Message is not open");
|
||||
}
|
||||
|
||||
return parsingData;
|
||||
throw new UnsupportedOperationException("Method not supported with Large Messages");
|
||||
}
|
||||
|
||||
public void parseHeader(ReadableBuffer buffer) {
|
||||
@ -406,11 +400,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference reference) {
|
||||
return getData().rewind();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message toMessage() {
|
||||
return this;
|
||||
|
@ -26,7 +26,6 @@ import java.util.Set;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
||||
@ -812,26 +811,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||
modified = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ByteBuf getBuffer() {
|
||||
if (getData() == null) {
|
||||
return null;
|
||||
} else {
|
||||
if (getData() instanceof NettyReadable) {
|
||||
return ((NettyReadable) getData()).getByteBuf();
|
||||
} else {
|
||||
return Unpooled.wrappedBuffer(getData().byteBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final AMQPMessage setBuffer(ByteBuf buffer) {
|
||||
// If this is ever called we would be in a highly unfortunate state
|
||||
//this.data = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract int getEncodeSize();
|
||||
|
||||
|
@ -258,7 +258,7 @@ public class MQTTRetainMessageManagerTest {
|
||||
final LinkedListIterator<MessageReference> browserIterator = queue.browserIterator();
|
||||
browserIterator.forEachRemaining(messageReference -> {
|
||||
final Message message = messageReference.getMessage();
|
||||
final String body = message.getBuffer().toString(StandardCharsets.UTF_8);
|
||||
final String body = message.toCore().getBuffer().toString(StandardCharsets.UTF_8);
|
||||
log.infof("[MQTT][%s][%s][%s]", retainAddress, message, body);
|
||||
});
|
||||
}
|
||||
|
@ -68,21 +68,11 @@ public class OpenwireMessage implements Message {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message setBuffer(ByteBuf buffer) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDurableCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf getBuffer() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message copy() {
|
||||
return null;
|
||||
|
@ -431,15 +431,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message setBuffer(ByteBuf buffer) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf getBuffer() {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public Message setAddress(String address) {
|
||||
return null;
|
||||
|
@ -500,16 +500,6 @@ public class AcknowledgeTest extends ActiveMQTestBase {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message setBuffer(ByteBuf buffer) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf getBuffer() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message setAddress(String address) {
|
||||
return null;
|
||||
|
Loading…
x
Reference in New Issue
Block a user