From a4c311f3681b31ae706a8843278746d8420185c6 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Fri, 27 Aug 2021 18:04:29 +0200 Subject: [PATCH] ARTEMIS-3449 Speedup AMQP large message streamig --- .../artemis/core/message/LargeBodyReader.java | 3 +- .../jdbc/store/file/JDBCSequentialFile.java | 10 +- .../amqp/broker/AMQPLargeMessage.java | 29 ++--- .../proton/ProtonServerSenderContext.java | 121 ++++++++++-------- .../persistence/impl/journal/LargeBody.java | 13 -- 5 files changed, 87 insertions(+), 89 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java index 42de11be8c..7491aa529a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java @@ -27,7 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; * * None of these methods should be caleld from Clients */ -public interface LargeBodyReader { +public interface LargeBodyReader extends AutoCloseable { /** * This method must not be called directly by ActiveMQ Artemis clients. @@ -51,6 +51,7 @@ public interface LargeBodyReader { /** * This method must not be called directly by ActiveMQ Artemis clients. */ + @Override void close() throws ActiveMQException; /** diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index 6f0b297818..1ee9c77677 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -192,7 +192,15 @@ public class JDBCSequentialFile implements SequentialFile { } private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { - return internalWrite(buffer.array(), callback, true); + final byte[] data; + if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position() == 0 && buffer.limit() == buffer.array().length) { + data = buffer.array(); + } else { + byte[] copy = new byte[buffer.remaining()]; + buffer.get(copy); + data = copy; + } + return internalWrite(data, callback, true); } private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback, boolean append) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 526ad543d6..247a3ecc58 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.message.LargeBodyReader; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; @@ -107,7 +108,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage private StorageManager storageManager; /** this is used to parse the initial packets from the buffer */ - CompositeReadableBuffer parsingBuffer; + private CompositeReadableBuffer parsingBuffer; public AMQPLargeMessage(long id, long messageFormat, @@ -146,16 +147,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage setMessageID(newID); } - public void openLargeMessage() throws Exception { - this.parsingData = new AmqpReadableBuffer(largeBody.map()); - } - - public void closeLargeMessage() throws Exception { - largeBody.releaseResources(false, true); - parsingData.freeDirectBuffer(); - parsingData = null; - } - public void releaseEncodedBuffer() { internalReleaseBuffer(1); } @@ -347,14 +338,14 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage public void addBytes(ReadableBuffer data) throws Exception { parseLargeMessage(data); - if (data.hasArray() && data.remaining() == data.array().length) { - //System.out.println("Received " + data.array().length + "::" + ByteUtil.formatGroup(ByteUtil.bytesToHex(data.array()), 8, 16)); - largeBody.addBytes(data.array()); - } else { - byte[] bytes = new byte[data.remaining()]; - data.get(bytes); - //System.out.println("Finishing " + bytes.length + ByteUtil.formatGroup(ByteUtil.bytesToHex(bytes), 8, 16)); - largeBody.addBytes(bytes); + final int remaining = data.remaining(); + final ByteBuf writeBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(remaining, remaining); + try { + // perform copy of data + data.get(new NettyWritable(writeBuffer)); + largeBody.addBytes(new ChannelBufferWrapper(writeBuffer, true, true)); + } finally { + writeBuffer.release(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index b46bcc3c69..32585d7604 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -572,7 +571,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr void deliver() { // This is discounting some bytes due to Transfer payload - int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0); + final int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0); DeliveryAnnotations deliveryAnnotationsToEncode; @@ -584,48 +583,37 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr deliveryAnnotationsToEncode = null; } - LargeBodyReader context = message.getLargeBodyReader(); try { - context.open(); - try { + final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize); + final NettyReadable frameView = new NettyReadable(frameBuffer); + try (LargeBodyReader context = message.getLargeBodyReader()) { + context.open(); context.position(position); long bodySize = context.getSize(); + // materialize it so we can use its internal NIO buffer + frameBuffer.ensureWritable(frameSize); - ByteBuffer buf = ByteBuffer.allocate(frameSize); + if (position == 0 && sender.getLocalState() != EndpointState.CLOSED && position < bodySize) { + if (!deliverInitialPacket(context, deliveryAnnotationsToEncode, frameBuffer)) { + return; + } + } for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) { if (!connection.flowControl(this::resume)) { - context.close(); return; } - buf.clear(); - int size = 0; + frameBuffer.clear(); - try { - if (position == 0) { - replaceInitialHeader(deliveryAnnotationsToEncode, context, WritableBuffer.ByteBufferWrapper.wrap(buf)); - } - size = context.readInto(buf); + final int readSize = context.readInto(frameBuffer.internalNioBuffer(0, frameSize)); - sender.send(new ReadableBuffer.ByteBufferReader(buf)); - position += size; - } catch (java.nio.BufferOverflowException overflowException) { - if (position == 0) { - if (log.isDebugEnabled()) { - log.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer"); - } - // on the very first packet, if the initial header was replaced with a much bigger header (re-encoding) - // we could recover the situation with a retry using an expandable buffer. - // this is tested on org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest - size = retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, buf); - } else { - // if this is not the position 0, something is going on - // we just forward the exception as this is not supposed to happen - throw overflowException; - } - } + frameBuffer.writerIndex(readSize); - if (size > 0) { + sender.send(frameView); + + position += readSize; + + if (readSize > 0) { if (position < bodySize) { connection.instantFlush(); @@ -633,7 +621,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } } finally { - context.close(); + frameBuffer.release(); } if (preSettle) { @@ -661,35 +649,59 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } + private boolean deliverInitialPacket(final LargeBodyReader context, + final DeliveryAnnotations deliveryAnnotationsToEncode, + final ByteBuf frameBuffer) throws Exception { + assert position == 0 && context.position() == 0; + if (!connection.flowControl(this::resume)) { + return false; + } + frameBuffer.clear(); + try { + replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(frameBuffer)); + } catch (IndexOutOfBoundsException indexOutOfBoundsException) { + assert position == 0 : "this shouldn't happen unless replaceInitialHeader is updating position before modifying frameBuffer"; + if (log.isDebugEnabled()) { + log.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer"); + } + // on the very first packet, if the initial header was replaced with a much bigger header (re-encoding) + // we could recover the situation with a retry using an expandable buffer. + // this is tested on org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest + sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context); + return true; + } + final int writableBytes = frameBuffer.writableBytes(); + if (writableBytes == 0) { + sender.send(new NettyReadable(frameBuffer)); + connection.instantFlush(); + return true; + } + final int writtenBytes = frameBuffer.writerIndex(); + final int readSize = context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes)); + frameBuffer.writerIndex(writtenBytes + readSize); + sender.send(new NettyReadable(frameBuffer)); + position += readSize; + connection.instantFlush(); + return true; + } + /** - * This is a retry logic when either the delivery annotations or re-encoded buffer is bigger than the frame size - * This will create one expandable buffer. - * It will then let Proton to do the framing correctly + * This must be used when either the delivery annotations or re-encoded buffer is bigger than the frame size.
+ * This will create one expandable buffer, send and flush it. */ - private int retryInitialPacketWithExpandableBuffer(DeliveryAnnotations deliveryAnnotationsToEncode, - LargeBodyReader context, - ByteBuffer buf) throws Exception { - int size; - buf.clear(); + private void sendAndFlushInitialPacket(DeliveryAnnotations deliveryAnnotationsToEncode, + LargeBodyReader context) throws Exception { // if the buffer overflow happened during the initial position // this means the replaced headers are bigger then the frame size // on this case we do with an expandable netty buffer - ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.buffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message) * 2); + final ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message) * 2); try { replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(nettyBuffer)); - size = context.readInto(buf); - position += size; - - nettyBuffer.writeBytes(buf); - - ByteBuffer nioBuffer = nettyBuffer.nioBuffer(); - nioBuffer.position(nettyBuffer.writerIndex()); - nioBuffer = (ByteBuffer) nioBuffer.flip(); - sender.send(new ReadableBuffer.ByteBufferReader(nioBuffer)); + sender.send(new NettyReadable(nettyBuffer)); } finally { nettyBuffer.release(); + connection.instantFlush(); } - return size; } private int replaceInitialHeader(DeliveryAnnotations deliveryAnnotationsToEncode, @@ -697,7 +709,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr WritableBuffer buf) throws Exception { TLSEncode.getEncoder().setByteBuffer(buf); try { - int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode); + int proposedPosition = writeHeaderAndAnnotations(deliveryAnnotationsToEncode); if (message.isReencoded()) { proposedPosition = writeMessageAnnotationsPropertiesAndApplicationProperties(context, message); } @@ -740,8 +752,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - private int writeHeaderAndAnnotations(LargeBodyReader context, - DeliveryAnnotations deliveryAnnotationsToEncode) throws ActiveMQException { + private int writeHeaderAndAnnotations(DeliveryAnnotations deliveryAnnotationsToEncode) { Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message); if (header != null) { TLSEncode.getEncoder().writeObject(header); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java index a2e4273122..1eacecd1bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java @@ -74,19 +74,6 @@ public class LargeBody { this.storageManager = storageManager; } - public ByteBuffer map() throws Exception { - ensureFileExists(true); - if (!file.isOpen()) { - file.open(); - } - return file.map(0, file.size()); - } - - public LargeBody(long messageID, JournalStorageManager storageManager) { - this(null, storageManager); - this.messageID = messageID; - } - public void setMessage(LargeServerMessage message) { this.message = message;