diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 56b0e6cf0c..ba87ae688a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -665,16 +665,20 @@ public class AMQPMessage extends RefCountMessage { private synchronized void checkBuffer() { if (!bufferValid) { - int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0); - ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated); - try { - getProtonMessage().encode(new NettyWritable(buffer)); - byte[] bytes = new byte[buffer.writerIndex()]; - buffer.readBytes(bytes); - this.data = Unpooled.wrappedBuffer(bytes); - } finally { - buffer.release(); - } + encodeProtonMessage(); + } + } + + private void encodeProtonMessage() { + int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0); + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated); + try { + getProtonMessage().encode(new NettyWritable(buffer)); + byte[] bytes = new byte[buffer.writerIndex()]; + buffer.readBytes(bytes); + this.data = Unpooled.wrappedBuffer(bytes); + } finally { + buffer.release(); } } @@ -691,15 +695,16 @@ public class AMQPMessage extends RefCountMessage { int amqpDeliveryCount = deliveryCount - 1; - Header header = getHeader(); - if (header == null && (amqpDeliveryCount > 0)) { - header = new Header(); - header.setDurable(durable); - } - // If the re-delivering the message then the header must be re-encoded // otherwise we want to write the original header if present. if (amqpDeliveryCount > 0) { + + Header header = getHeader(); + if (header == null) { + header = new Header(); + header.setDurable(durable); + } + synchronized (header) { header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount)); TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer)); @@ -713,6 +718,76 @@ public class AMQPMessage extends RefCountMessage { buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart); } + /** + * Gets a ByteBuf from the Message that contains the encoded bytes to be sent on the wire. + *

+ * When possible this method will present the bytes to the caller without copying them into + * another buffer copy. If copying is needed a new Netty buffer is created and returned. The + * caller should ensure that the reference count on the returned buffer is always decremented + * to avoid a leak in the case of a copied buffer being returned. + * + * @param deliveryCount + * The new delivery count for this message. + * + * @return a Netty ByteBuf containing the encoded bytes of this Message instance. + */ + public ByteBuf getSendBuffer(int deliveryCount) { + checkBuffer(); + + if (deliveryCount > 1) { + return createCopyWithNewDeliveryCount(deliveryCount); + } else if (headerEnds != messagePaylodStart) { + return createCopyWithoutDeliveryAnnotations(); + } else { + // Common case message has no delivery annotations and this is the first delivery + // so no re-encoding or section skipping needed. + return data.retainedDuplicate(); + } + } + + private ByteBuf createCopyWithoutDeliveryAnnotations() { + assert headerEnds != messagePaylodStart; + + // The original message had delivery annotations and so we must copy into a new + // buffer skipping the delivery annotations section as that is not meant to survive + // beyond this hop. + final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize()); + result.writeBytes(data, 0, headerEnds); + result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart); + return result; + } + + private ByteBuf createCopyWithNewDeliveryCount(int deliveryCount) { + assert deliveryCount > 1; + + final int amqpDeliveryCount = deliveryCount - 1; + // If the re-delivering the message then the header must be re-encoded + // (or created if not previously present). Any delivery annotations should + // be skipped as well in the resulting buffer. + + final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize()); + + Header header = getHeader(); + if (header == null) { + header = new Header(); + header.setDurable(durable); + } + + synchronized (header) { + // Updates or adds a Header section with the correct delivery count + header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount)); + TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result)); + TLSEncode.getEncoder().writeObject(header); + TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); + } + + // This will skip any existing delivery annotations that might have been present + // in the original message. + result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart); + + return result; + } + public TypedProperties createExtraProperties() { if (extraProperties == null) { extraProperties = new TypedProperties(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 1823168f34..990a2172f9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -22,8 +22,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; @@ -48,6 +46,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; +import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; @@ -75,6 +74,8 @@ import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Sender; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; + /** * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links */ @@ -690,11 +691,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // we only need a tag if we are going to settle later byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); - ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(message.getEncodeSize()); - try { - message.sendBuffer(nettyBuffer, deliveryCount); + // Let the Message decide how to present the message bytes + ByteBuf sendBuffer = message.getSendBuffer(deliveryCount); - int size = nettyBuffer.writerIndex(); + try { + int size = sendBuffer.writerIndex(); while (!connection.tryLock(1, TimeUnit.SECONDS)) { if (closed || sender.getLocalState() == EndpointState.CLOSED) { @@ -714,8 +715,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr delivery.setMessageFormat((int) message.getMessageFormat()); delivery.setContext(messageReference); - // this will avoid a copy.. patch provided by Norman using buffer.array() - sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); + if (sendBuffer.hasArray()) { + // this will avoid a copy.. patch provided by Norman using buffer.array() + sender.send(sendBuffer.array(), sendBuffer.arrayOffset() + sendBuffer.readerIndex(), sendBuffer.readableBytes()); + } else { + sender.send(new NettyReadable(sendBuffer)); + } if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. @@ -731,7 +736,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return size; } finally { - nettyBuffer.release(); + sendBuffer.release(); } }