diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 688c637fc6..4e5e60cf19 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -87,6 +87,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } } + private boolean reencoded = false; + /** * AMQPLargeMessagePersister will save the buffer here. * */ @@ -622,7 +624,15 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage @Override public void reencode() { + reencoded = true; + } + public void setReencoded(boolean reencoded) { + this.reencoded = reencoded; + } + + public boolean isReencoded() { + return reencoded; } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java index 61b41c501e..0a059ce760 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java @@ -59,7 +59,8 @@ public class AMQPLargeMessagePersister extends MessagePersister { ByteBuf buf = msgEncode.getSavedEncodeBuffer(); try { - int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex(); + int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex() + + DataConstants.SIZE_BOOLEAN; // this last one is for is Reencoded TypedProperties properties = ((AMQPMessage) record).getExtraProperties(); @@ -92,6 +93,7 @@ public class AMQPLargeMessagePersister extends MessagePersister { ByteBuf savedEncodeBuffer = msgEncode.getSavedEncodeBuffer(); buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex()); + buffer.writeBoolean(msgEncode.isReencoded()); msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as getSavedEncodedBuffer will keep 1 for himself until encoding has happened // which this is the expected event where we need to release the extra refCounter } @@ -124,6 +126,11 @@ public class AMQPLargeMessagePersister extends MessagePersister { largeMessage.readSavedEncoding(buffer.byteBuf()); + if (buffer.readable()) { + boolean reEncoded = buffer.readBoolean(); + largeMessage.setReencoded(reEncoded); + } + return largeMessage; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 6c9e6fb9fb..c10f1a7fb3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -242,7 +242,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. /** This will return application properties without attempting to decode it. * That means, if applicationProperties were never parsed before, this will return null, even if there is application properties. * This was created as an internal method for testing, as we need to validate if the application properties are not decoded until needed. */ - public ApplicationProperties getDecodedApplicationProperties() { + protected ApplicationProperties getDecodedApplicationProperties() { return applicationProperties; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java index 9b68f4e08d..a2cd44d90c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Properties; @@ -40,6 +41,16 @@ public class AMQPMessageBrokerAccessor { return message.getCurrentHeader(); } + /** Warning: this is a method specific to the broker. Do not use it on user's application. */ + public static ApplicationProperties getDecodedApplicationProperties(AMQPMessage message) { + return message.getDecodedApplicationProperties(); + } + + /** Warning: this is a method specific to the broker. Do not use it on user's application. */ + public static int getRemainingBodyPosition(AMQPMessage message) { + return message.remainingBodyPosition; + } + /** Warning: this is a method specific to the broker. Do not use it on user's application. */ public static Properties getCurrentProperties(AMQPMessage message) { return message.getCurrentProperties(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 2ec4742ab3..fbe503bf3b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; @@ -55,6 +57,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResource import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.selector.filter.FilterException; @@ -64,10 +67,12 @@ import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Outcome; +import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; @@ -571,7 +576,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr deliveryAnnotationsToEncode = null; } - // Let the Message decide how to present the message bytes LargeBodyReader context = message.getLargeBodyReader(); try { context.open(); @@ -579,8 +583,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr context.position(position); long bodySize = context.getSize(); - // TODO: it would be nice to use pooled buffer here, - // however I would need a version of ReadableBuffer for Netty ByteBuffer buf = ByteBuffer.allocate(frameSize); for (; position < bodySize; ) { @@ -589,20 +591,37 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return; } buf.clear(); + int size = 0; + try { + if (position == 0) { + replaceInitialHeader(deliveryAnnotationsToEncode, context, WritableBuffer.ByteBufferWrapper.wrap(buf)); + } + size = context.readInto(buf); - if (position == 0) { - writeHeaderAndAnnotations(context, buf, deliveryAnnotationsToEncode); + sender.send(new ReadableBuffer.ByteBufferReader(buf)); + position += size; + } catch (java.nio.BufferOverflowException overflowException) { + if (position == 0) { + if (log.isDebugEnabled()) { + log.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer"); + } + // on the very first packet, if the initial header was replaced with a much bigger header (re-encoding) + // we could recover the situation with a retry using an expandable buffer. + // this is tested on org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest + size = retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, buf); + } else { + // if this is not the position 0, something is going on + // we just forward the exception as this is not supposed to happen + throw overflowException; + } } - int size = context.readInto(buf); + if (size > 0) { - sender.send(new ReadableBuffer.ByteBufferReader(buf)); - - position += size; - - if (position < bodySize) { - connection.instantFlush(); + if (position < bodySize) { + connection.instantFlush(); + } } } } finally { @@ -634,24 +653,90 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - private void writeHeaderAndAnnotations(LargeBodyReader context, - ByteBuffer buf, - DeliveryAnnotations deliveryAnnotationsToEncode) throws ActiveMQException { - TLSEncode.getEncoder().setByteBuffer(WritableBuffer.ByteBufferWrapper.wrap(buf)); + /** + * This is a retry logic when either the delivery annotations or re-encoded buffer is bigger than the frame size + * This will create one expandable buffer. + * It will then let Proton to do the framing correctly + */ + private int retryInitialPacketWithExpandableBuffer(DeliveryAnnotations deliveryAnnotationsToEncode, + LargeBodyReader context, + ByteBuffer buf) throws Exception { + int size; + buf.clear(); + // if the buffer overflow happened during the initial position + // this means the replaced headers are bigger then the frame size + // on this case we do with an expandable netty buffer + ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.buffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message) * 2); try { - Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message); - if (header != null) { - TLSEncode.getEncoder().writeObject(header); - } - if (deliveryAnnotationsToEncode != null) { - TLSEncode.getEncoder().writeObject(deliveryAnnotationsToEncode); - } - context.position(message.getPositionAfterDeliveryAnnotations()); - position = message.getPositionAfterDeliveryAnnotations(); + replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(nettyBuffer)); + size = context.readInto(buf); + position += size; + + nettyBuffer.writeBytes(buf); + + ByteBuffer nioBuffer = nettyBuffer.nioBuffer(); + nioBuffer.position(nettyBuffer.writerIndex()); + nioBuffer = (ByteBuffer) nioBuffer.flip(); + sender.send(new ReadableBuffer.ByteBufferReader(nioBuffer)); } finally { + nettyBuffer.release(); + } + return size; + } + + private int replaceInitialHeader(DeliveryAnnotations deliveryAnnotationsToEncode, + LargeBodyReader context, + WritableBuffer buf) throws Exception { + TLSEncode.getEncoder().setByteBuffer(buf); + try { + int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode); + if (message.isReencoded()) { + proposedPosition = writePropertiesAndApplicationProperties(context, message); + } + + context.position(proposedPosition); + position = proposedPosition; + return (int)position; + } finally { + TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); } } + + /** + * Write properties and application properties when the message is flagged as re-encoded. + */ + private int writePropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception { + int bodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(message); + assert bodyPosition > 0; + writePropertiesAndApplicationPropertiesInternal(message); + return bodyPosition; + } + + private void writePropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) { + Properties amqpProperties = AMQPMessageBrokerAccessor.getCurrentProperties(message); + if (amqpProperties != null) { + TLSEncode.getEncoder().writeObject(amqpProperties); + } + + ApplicationProperties applicationProperties = AMQPMessageBrokerAccessor.getDecodedApplicationProperties(message); + + if (applicationProperties != null) { + TLSEncode.getEncoder().writeObject(applicationProperties); + } + } + + private int writeHeaderAndAnnotations(LargeBodyReader context, + DeliveryAnnotations deliveryAnnotationsToEncode) throws ActiveMQException { + Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message); + if (header != null) { + TLSEncode.getEncoder().writeObject(header); + } + if (deliveryAnnotationsToEncode != null) { + TLSEncode.getEncoder().writeObject(deliveryAnnotationsToEncode); + } + return message.getPositionAfterDeliveryAnnotations(); + } } private void finishLargeMessage() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java index 483d557fcc..1f19fd1c38 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java @@ -17,20 +17,41 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Assert; import org.junit.Test; -public class AmqpMessageDivertsTest extends AmqpClientTestSupport { +public class AmqpMessageDivertsTest extends AmqpClientTestSupport implements Transformer { + + static final AtomicInteger divertCount = new AtomicInteger(0); + + String largeString = createLargeString(); + + protected String createLargeString() { + StringBuffer bufferLarge = new StringBuffer(); + for (int i = 0; i < 500 * 1024; i++) { + bufferLarge.append((char) ('a' + (i % 20))); + } + String largeString = bufferLarge.toString(); + return largeString; + } + @Test(timeout = 60000) public void testQueueReceiverReadMessageWithDivert() throws Exception { @@ -67,4 +88,109 @@ public class AmqpMessageDivertsTest extends AmqpClientTestSupport { connection.close(); } + + @Test + public void testDivertTransformerWithProperties() throws Exception { + testDivertTransformerWithProperties(false); + } + + @Test + public void testDivertTransformerWithPropertiesRebootServer() throws Exception { + testDivertTransformerWithProperties(true); + } + + public void testDivertTransformerWithProperties(boolean rebootServer) throws Exception { + divertCount.set(0); + final String forwardingAddress = getQueueName() + "Divert"; + final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress); + server.createQueue(new QueueConfiguration(simpleForwardingAddress).setRoutingType(RoutingType.ANYCAST)); + server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), + forwardingAddress, true, null, AmqpMessageDivertsTest.class.getName(), + ComponentConfigurationRoutingType.ANYCAST.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Queue queueView = getProxyToQueue(forwardingAddress); + AmqpSender sender = session.createSender(getQueueName()); + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + message.setApplicationProperty("addLarge", false); + message.setApplicationProperty("always", "here"); + message.setBytes(new byte[10]); // one small + sender.send(message); + Wait.assertEquals(1, queueView::getMessageCount); + + message = new AmqpMessage(); + message.setDurable(true); + message.setApplicationProperty("addLarge", false); + message.setApplicationProperty("always", "here"); + message.setBytes(new byte[300 * 1024]); // one large + sender.send(message); + Wait.assertEquals(2, queueView::getMessageCount); + + if (rebootServer) { + Wait.assertEquals(2, divertCount::get); + connection.close(); + server.stop(); + server.start(); + + // reopen connections + client = createAmqpClient(); + connection = addConnection(client.connect()); + session = connection.createSession(); + } else { + message = new AmqpMessage(); + message.setDurable(false); + message.setBytes(new byte[300 * 1024]); // one large + message.setApplicationProperty("addLarge", true); + message.setApplicationProperty("always", "here"); + sender.send(message); + Wait.assertEquals(3, divertCount::get); + } + + + AmqpReceiver receiver = session.createReceiver(forwardingAddress); + + queueView = getProxyToQueue(forwardingAddress); + assertEquals(rebootServer ? 2 : 3, queueView.getMessageCount()); + + receiver.flow(2); + for (int i = 0; i < 2; i++) { + AmqpMessage receivedMessage = receiver.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(receivedMessage); + Assert.assertEquals("here", receivedMessage.getApplicationProperty("always")); + Assert.assertEquals("mundo", receivedMessage.getApplicationProperty("oi")); + receivedMessage.accept(); + } + + if (!rebootServer) { + // if we did not reboot the server a third message was sent + receiver.flow(1); + AmqpMessage receivedMessage = receiver.receive(5, TimeUnit.SECONDS); + receivedMessage.accept(); + Assert.assertEquals("mundo", receivedMessage.getApplicationProperty("oi")); + Assert.assertEquals(largeString, receivedMessage.getApplicationProperty("largeString")); + + } + + receiver.close(); + + Wait.assertEquals(0, queueView::getMessageCount); + + connection.close(); + } + + @Override + public Message transform(Message message) { + divertCount.incrementAndGet(); + if (message.getBooleanProperty("addLarge")) { + message.putStringProperty("largeString", largeString); + } + message.putBooleanProperty("oi", true); + message.putStringProperty("oi", "mundo"); + message.reencode(); + return message; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/PropertyParseOptimizationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/PropertyParseOptimizationTest.java index 8df3112e4f..bd269dde0b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/PropertyParseOptimizationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/PropertyParseOptimizationTest.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -93,7 +94,7 @@ public class PropertyParseOptimizationTest extends AmqpClientTestSupport { // if this rule fails it means something is requesting the application property for the message, // or the optimization is gone. // be careful if you decide to change this rule, as we have done extensive test to get this in place. - Assert.assertNull("Application properties on AMQP Messages should only be parsed over demand", message.getDecodedApplicationProperties()); + Assert.assertNull("Application properties on AMQP Messages should only be parsed over demand", AMQPMessageBrokerAccessor.getDecodedApplicationProperties(message)); } AmqpReceiver receiver = session.createReceiver(getQueueName(), "odd=true");