This commit is contained in:
Clebert Suconic 2021-09-08 15:30:03 -04:00
commit 20c1836fa2
5 changed files with 87 additions and 89 deletions

View File

@ -27,7 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
* *
* None of these methods should be caleld from Clients * None of these methods should be caleld from Clients
*/ */
public interface LargeBodyReader { public interface LargeBodyReader extends AutoCloseable {
/** /**
* This method must not be called directly by ActiveMQ Artemis clients. * This method must not be called directly by ActiveMQ Artemis clients.
@ -51,6 +51,7 @@ public interface LargeBodyReader {
/** /**
* This method must not be called directly by ActiveMQ Artemis clients. * This method must not be called directly by ActiveMQ Artemis clients.
*/ */
@Override
void close() throws ActiveMQException; void close() throws ActiveMQException;
/** /**

View File

@ -192,7 +192,15 @@ public class JDBCSequentialFile implements SequentialFile {
} }
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
return internalWrite(buffer.array(), callback, true); final byte[] data;
if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position() == 0 && buffer.limit() == buffer.array().length) {
data = buffer.array();
} else {
byte[] copy = new byte[buffer.remaining()];
buffer.get(copy);
data = copy;
}
return internalWrite(data, callback, true);
} }
private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback, boolean append) { private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback, boolean append) {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.message.LargeBodyReader; import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
@ -107,7 +108,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
private StorageManager storageManager; private StorageManager storageManager;
/** this is used to parse the initial packets from the buffer */ /** this is used to parse the initial packets from the buffer */
CompositeReadableBuffer parsingBuffer; private CompositeReadableBuffer parsingBuffer;
public AMQPLargeMessage(long id, public AMQPLargeMessage(long id,
long messageFormat, long messageFormat,
@ -146,16 +147,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
setMessageID(newID); setMessageID(newID);
} }
public void openLargeMessage() throws Exception {
this.parsingData = new AmqpReadableBuffer(largeBody.map());
}
public void closeLargeMessage() throws Exception {
largeBody.releaseResources(false, true);
parsingData.freeDirectBuffer();
parsingData = null;
}
public void releaseEncodedBuffer() { public void releaseEncodedBuffer() {
internalReleaseBuffer(1); internalReleaseBuffer(1);
} }
@ -347,14 +338,14 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
public void addBytes(ReadableBuffer data) throws Exception { public void addBytes(ReadableBuffer data) throws Exception {
parseLargeMessage(data); parseLargeMessage(data);
if (data.hasArray() && data.remaining() == data.array().length) { final int remaining = data.remaining();
//System.out.println("Received " + data.array().length + "::" + ByteUtil.formatGroup(ByteUtil.bytesToHex(data.array()), 8, 16)); final ByteBuf writeBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(remaining, remaining);
largeBody.addBytes(data.array()); try {
} else { // perform copy of data
byte[] bytes = new byte[data.remaining()]; data.get(new NettyWritable(writeBuffer));
data.get(bytes); largeBody.addBytes(new ChannelBufferWrapper(writeBuffer, true, true));
//System.out.println("Finishing " + bytes.length + ByteUtil.formatGroup(ByteUtil.bytesToHex(bytes), 8, 16)); } finally {
largeBody.addBytes(bytes); writeBuffer.release();
} }
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton; package org.apache.activemq.artemis.protocol.amqp.proton;
import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -572,7 +571,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
void deliver() { void deliver() {
// This is discounting some bytes due to Transfer payload // This is discounting some bytes due to Transfer payload
int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0); final int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
DeliveryAnnotations deliveryAnnotationsToEncode; DeliveryAnnotations deliveryAnnotationsToEncode;
@ -584,48 +583,37 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
deliveryAnnotationsToEncode = null; deliveryAnnotationsToEncode = null;
} }
LargeBodyReader context = message.getLargeBodyReader();
try { try {
final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
final NettyReadable frameView = new NettyReadable(frameBuffer);
try (LargeBodyReader context = message.getLargeBodyReader()) {
context.open(); context.open();
try {
context.position(position); context.position(position);
long bodySize = context.getSize(); long bodySize = context.getSize();
// materialize it so we can use its internal NIO buffer
frameBuffer.ensureWritable(frameSize);
ByteBuffer buf = ByteBuffer.allocate(frameSize); if (position == 0 && sender.getLocalState() != EndpointState.CLOSED && position < bodySize) {
if (!deliverInitialPacket(context, deliveryAnnotationsToEncode, frameBuffer)) {
return;
}
}
for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) { for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) {
if (!connection.flowControl(this::resume)) { if (!connection.flowControl(this::resume)) {
context.close();
return; return;
} }
buf.clear(); frameBuffer.clear();
int size = 0;
try { final int readSize = context.readInto(frameBuffer.internalNioBuffer(0, frameSize));
if (position == 0) {
replaceInitialHeader(deliveryAnnotationsToEncode, context, WritableBuffer.ByteBufferWrapper.wrap(buf));
}
size = context.readInto(buf);
sender.send(new ReadableBuffer.ByteBufferReader(buf)); frameBuffer.writerIndex(readSize);
position += size;
} catch (java.nio.BufferOverflowException overflowException) {
if (position == 0) {
if (log.isDebugEnabled()) {
log.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer");
}
// on the very first packet, if the initial header was replaced with a much bigger header (re-encoding)
// we could recover the situation with a retry using an expandable buffer.
// this is tested on org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
size = retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, buf);
} else {
// if this is not the position 0, something is going on
// we just forward the exception as this is not supposed to happen
throw overflowException;
}
}
if (size > 0) { sender.send(frameView);
position += readSize;
if (readSize > 0) {
if (position < bodySize) { if (position < bodySize) {
connection.instantFlush(); connection.instantFlush();
@ -633,7 +621,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
} finally { } finally {
context.close(); frameBuffer.release();
} }
if (preSettle) { if (preSettle) {
@ -661,35 +649,59 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
private boolean deliverInitialPacket(final LargeBodyReader context,
final DeliveryAnnotations deliveryAnnotationsToEncode,
final ByteBuf frameBuffer) throws Exception {
assert position == 0 && context.position() == 0;
if (!connection.flowControl(this::resume)) {
return false;
}
frameBuffer.clear();
try {
replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(frameBuffer));
} catch (IndexOutOfBoundsException indexOutOfBoundsException) {
assert position == 0 : "this shouldn't happen unless replaceInitialHeader is updating position before modifying frameBuffer";
if (log.isDebugEnabled()) {
log.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer");
}
// on the very first packet, if the initial header was replaced with a much bigger header (re-encoding)
// we could recover the situation with a retry using an expandable buffer.
// this is tested on org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context);
return true;
}
final int writableBytes = frameBuffer.writableBytes();
if (writableBytes == 0) {
sender.send(new NettyReadable(frameBuffer));
connection.instantFlush();
return true;
}
final int writtenBytes = frameBuffer.writerIndex();
final int readSize = context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
frameBuffer.writerIndex(writtenBytes + readSize);
sender.send(new NettyReadable(frameBuffer));
position += readSize;
connection.instantFlush();
return true;
}
/** /**
* This is a retry logic when either the delivery annotations or re-encoded buffer is bigger than the frame size * This must be used when either the delivery annotations or re-encoded buffer is bigger than the frame size.<br>
* This will create one expandable buffer. * This will create one expandable buffer, send and flush it.
* It will then let Proton to do the framing correctly
*/ */
private int retryInitialPacketWithExpandableBuffer(DeliveryAnnotations deliveryAnnotationsToEncode, private void sendAndFlushInitialPacket(DeliveryAnnotations deliveryAnnotationsToEncode,
LargeBodyReader context, LargeBodyReader context) throws Exception {
ByteBuffer buf) throws Exception {
int size;
buf.clear();
// if the buffer overflow happened during the initial position // if the buffer overflow happened during the initial position
// this means the replaced headers are bigger then the frame size // this means the replaced headers are bigger then the frame size
// on this case we do with an expandable netty buffer // on this case we do with an expandable netty buffer
ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.buffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message) * 2); final ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message) * 2);
try { try {
replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(nettyBuffer)); replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(nettyBuffer));
size = context.readInto(buf); sender.send(new NettyReadable(nettyBuffer));
position += size;
nettyBuffer.writeBytes(buf);
ByteBuffer nioBuffer = nettyBuffer.nioBuffer();
nioBuffer.position(nettyBuffer.writerIndex());
nioBuffer = (ByteBuffer) nioBuffer.flip();
sender.send(new ReadableBuffer.ByteBufferReader(nioBuffer));
} finally { } finally {
nettyBuffer.release(); nettyBuffer.release();
connection.instantFlush();
} }
return size;
} }
private int replaceInitialHeader(DeliveryAnnotations deliveryAnnotationsToEncode, private int replaceInitialHeader(DeliveryAnnotations deliveryAnnotationsToEncode,
@ -697,7 +709,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
WritableBuffer buf) throws Exception { WritableBuffer buf) throws Exception {
TLSEncode.getEncoder().setByteBuffer(buf); TLSEncode.getEncoder().setByteBuffer(buf);
try { try {
int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode); int proposedPosition = writeHeaderAndAnnotations(deliveryAnnotationsToEncode);
if (message.isReencoded()) { if (message.isReencoded()) {
proposedPosition = writeMessageAnnotationsPropertiesAndApplicationProperties(context, message); proposedPosition = writeMessageAnnotationsPropertiesAndApplicationProperties(context, message);
} }
@ -740,8 +752,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
private int writeHeaderAndAnnotations(LargeBodyReader context, private int writeHeaderAndAnnotations(DeliveryAnnotations deliveryAnnotationsToEncode) {
DeliveryAnnotations deliveryAnnotationsToEncode) throws ActiveMQException {
Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message); Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message);
if (header != null) { if (header != null) {
TLSEncode.getEncoder().writeObject(header); TLSEncode.getEncoder().writeObject(header);

View File

@ -74,19 +74,6 @@ public class LargeBody {
this.storageManager = storageManager; this.storageManager = storageManager;
} }
public ByteBuffer map() throws Exception {
ensureFileExists(true);
if (!file.isOpen()) {
file.open();
}
return file.map(0, file.size());
}
public LargeBody(long messageID, JournalStorageManager storageManager) {
this(null, storageManager);
this.messageID = messageID;
}
public void setMessage(LargeServerMessage message) { public void setMessage(LargeServerMessage message) {
this.message = message; this.message = message;