This commit is contained in:
Justin Bertram 2020-12-10 09:10:17 -06:00
commit d1738cd165
7 changed files with 269 additions and 29 deletions

View File

@ -87,6 +87,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
}
private boolean reencoded = false;
/**
* AMQPLargeMessagePersister will save the buffer here.
* */
@ -622,7 +624,15 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
@Override
public void reencode() {
reencoded = true;
}
public void setReencoded(boolean reencoded) {
this.reencoded = reencoded;
}
public boolean isReencoded() {
return reencoded;
}
@Override

View File

@ -59,7 +59,8 @@ public class AMQPLargeMessagePersister extends MessagePersister {
ByteBuf buf = msgEncode.getSavedEncodeBuffer();
try {
int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex();
int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex() +
DataConstants.SIZE_BOOLEAN; // this last one is for is Reencoded
TypedProperties properties = ((AMQPMessage) record).getExtraProperties();
@ -92,6 +93,7 @@ public class AMQPLargeMessagePersister extends MessagePersister {
ByteBuf savedEncodeBuffer = msgEncode.getSavedEncodeBuffer();
buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex());
buffer.writeBoolean(msgEncode.isReencoded());
msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as getSavedEncodedBuffer will keep 1 for himself until encoding has happened
// which this is the expected event where we need to release the extra refCounter
}
@ -124,6 +126,11 @@ public class AMQPLargeMessagePersister extends MessagePersister {
largeMessage.readSavedEncoding(buffer.byteBuf());
if (buffer.readable()) {
boolean reEncoded = buffer.readBoolean();
largeMessage.setReencoded(reEncoded);
}
return largeMessage;
}

View File

@ -242,7 +242,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
/** This will return application properties without attempting to decode it.
* That means, if applicationProperties were never parsed before, this will return null, even if there is application properties.
* This was created as an internal method for testing, as we need to validate if the application properties are not decoded until needed. */
public ApplicationProperties getDecodedApplicationProperties() {
protected ApplicationProperties getDecodedApplicationProperties() {
return applicationProperties;
}

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
@ -40,6 +41,16 @@ public class AMQPMessageBrokerAccessor {
return message.getCurrentHeader();
}
/** Warning: this is a method specific to the broker. Do not use it on user's application. */
public static ApplicationProperties getDecodedApplicationProperties(AMQPMessage message) {
return message.getDecodedApplicationProperties();
}
/** Warning: this is a method specific to the broker. Do not use it on user's application. */
public static int getRemainingBodyPosition(AMQPMessage message) {
return message.remainingBodyPosition;
}
/** Warning: this is a method specific to the broker. Do not use it on user's application. */
public static Properties getCurrentProperties(AMQPMessage message) {
return message.getCurrentProperties();

View File

@ -24,6 +24,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
@ -55,6 +57,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResource
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
@ -64,10 +67,12 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@ -571,7 +576,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
deliveryAnnotationsToEncode = null;
}
// Let the Message decide how to present the message bytes
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
@ -579,8 +583,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
context.position(position);
long bodySize = context.getSize();
// TODO: it would be nice to use pooled buffer here,
// however I would need a version of ReadableBuffer for Netty
ByteBuffer buf = ByteBuffer.allocate(frameSize);
for (; position < bodySize; ) {
@ -589,20 +591,37 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return;
}
buf.clear();
int size = 0;
try {
if (position == 0) {
replaceInitialHeader(deliveryAnnotationsToEncode, context, WritableBuffer.ByteBufferWrapper.wrap(buf));
}
size = context.readInto(buf);
if (position == 0) {
writeHeaderAndAnnotations(context, buf, deliveryAnnotationsToEncode);
sender.send(new ReadableBuffer.ByteBufferReader(buf));
position += size;
} catch (java.nio.BufferOverflowException overflowException) {
if (position == 0) {
if (log.isDebugEnabled()) {
log.debug("Delivery of message failed with an overFlowException, retrying again with expandable buffer");
}
// on the very first packet, if the initial header was replaced with a much bigger header (re-encoding)
// we could recover the situation with a retry using an expandable buffer.
// this is tested on org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
size = retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, buf);
} else {
// if this is not the position 0, something is going on
// we just forward the exception as this is not supposed to happen
throw overflowException;
}
}
int size = context.readInto(buf);
if (size > 0) {
sender.send(new ReadableBuffer.ByteBufferReader(buf));
position += size;
if (position < bodySize) {
connection.instantFlush();
if (position < bodySize) {
connection.instantFlush();
}
}
}
} finally {
@ -634,24 +653,90 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
private void writeHeaderAndAnnotations(LargeBodyReader context,
ByteBuffer buf,
DeliveryAnnotations deliveryAnnotationsToEncode) throws ActiveMQException {
TLSEncode.getEncoder().setByteBuffer(WritableBuffer.ByteBufferWrapper.wrap(buf));
/**
* This is a retry logic when either the delivery annotations or re-encoded buffer is bigger than the frame size
* This will create one expandable buffer.
* It will then let Proton to do the framing correctly
*/
private int retryInitialPacketWithExpandableBuffer(DeliveryAnnotations deliveryAnnotationsToEncode,
LargeBodyReader context,
ByteBuffer buf) throws Exception {
int size;
buf.clear();
// if the buffer overflow happened during the initial position
// this means the replaced headers are bigger then the frame size
// on this case we do with an expandable netty buffer
ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.buffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message) * 2);
try {
Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message);
if (header != null) {
TLSEncode.getEncoder().writeObject(header);
}
if (deliveryAnnotationsToEncode != null) {
TLSEncode.getEncoder().writeObject(deliveryAnnotationsToEncode);
}
context.position(message.getPositionAfterDeliveryAnnotations());
position = message.getPositionAfterDeliveryAnnotations();
replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(nettyBuffer));
size = context.readInto(buf);
position += size;
nettyBuffer.writeBytes(buf);
ByteBuffer nioBuffer = nettyBuffer.nioBuffer();
nioBuffer.position(nettyBuffer.writerIndex());
nioBuffer = (ByteBuffer) nioBuffer.flip();
sender.send(new ReadableBuffer.ByteBufferReader(nioBuffer));
} finally {
nettyBuffer.release();
}
return size;
}
private int replaceInitialHeader(DeliveryAnnotations deliveryAnnotationsToEncode,
LargeBodyReader context,
WritableBuffer buf) throws Exception {
TLSEncode.getEncoder().setByteBuffer(buf);
try {
int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode);
if (message.isReencoded()) {
proposedPosition = writePropertiesAndApplicationProperties(context, message);
}
context.position(proposedPosition);
position = proposedPosition;
return (int)position;
} finally {
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
}
}
/**
* Write properties and application properties when the message is flagged as re-encoded.
*/
private int writePropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception {
int bodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(message);
assert bodyPosition > 0;
writePropertiesAndApplicationPropertiesInternal(message);
return bodyPosition;
}
private void writePropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) {
Properties amqpProperties = AMQPMessageBrokerAccessor.getCurrentProperties(message);
if (amqpProperties != null) {
TLSEncode.getEncoder().writeObject(amqpProperties);
}
ApplicationProperties applicationProperties = AMQPMessageBrokerAccessor.getDecodedApplicationProperties(message);
if (applicationProperties != null) {
TLSEncode.getEncoder().writeObject(applicationProperties);
}
}
private int writeHeaderAndAnnotations(LargeBodyReader context,
DeliveryAnnotations deliveryAnnotationsToEncode) throws ActiveMQException {
Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message);
if (header != null) {
TLSEncode.getEncoder().writeObject(header);
}
if (deliveryAnnotationsToEncode != null) {
TLSEncode.getEncoder().writeObject(deliveryAnnotationsToEncode);
}
return message.getPositionAfterDeliveryAnnotations();
}
}
private void finishLargeMessage() {

View File

@ -17,20 +17,41 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
public class AmqpMessageDivertsTest extends AmqpClientTestSupport {
public class AmqpMessageDivertsTest extends AmqpClientTestSupport implements Transformer {
static final AtomicInteger divertCount = new AtomicInteger(0);
String largeString = createLargeString();
protected String createLargeString() {
StringBuffer bufferLarge = new StringBuffer();
for (int i = 0; i < 500 * 1024; i++) {
bufferLarge.append((char) ('a' + (i % 20)));
}
String largeString = bufferLarge.toString();
return largeString;
}
@Test(timeout = 60000)
public void testQueueReceiverReadMessageWithDivert() throws Exception {
@ -67,4 +88,109 @@ public class AmqpMessageDivertsTest extends AmqpClientTestSupport {
connection.close();
}
@Test
public void testDivertTransformerWithProperties() throws Exception {
testDivertTransformerWithProperties(false);
}
@Test
public void testDivertTransformerWithPropertiesRebootServer() throws Exception {
testDivertTransformerWithProperties(true);
}
public void testDivertTransformerWithProperties(boolean rebootServer) throws Exception {
divertCount.set(0);
final String forwardingAddress = getQueueName() + "Divert";
final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
server.createQueue(new QueueConfiguration(simpleForwardingAddress).setRoutingType(RoutingType.ANYCAST));
server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(),
forwardingAddress, true, null, AmqpMessageDivertsTest.class.getName(),
ComponentConfigurationRoutingType.ANYCAST.toString());
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Queue queueView = getProxyToQueue(forwardingAddress);
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
message.setApplicationProperty("addLarge", false);
message.setApplicationProperty("always", "here");
message.setBytes(new byte[10]); // one small
sender.send(message);
Wait.assertEquals(1, queueView::getMessageCount);
message = new AmqpMessage();
message.setDurable(true);
message.setApplicationProperty("addLarge", false);
message.setApplicationProperty("always", "here");
message.setBytes(new byte[300 * 1024]); // one large
sender.send(message);
Wait.assertEquals(2, queueView::getMessageCount);
if (rebootServer) {
Wait.assertEquals(2, divertCount::get);
connection.close();
server.stop();
server.start();
// reopen connections
client = createAmqpClient();
connection = addConnection(client.connect());
session = connection.createSession();
} else {
message = new AmqpMessage();
message.setDurable(false);
message.setBytes(new byte[300 * 1024]); // one large
message.setApplicationProperty("addLarge", true);
message.setApplicationProperty("always", "here");
sender.send(message);
Wait.assertEquals(3, divertCount::get);
}
AmqpReceiver receiver = session.createReceiver(forwardingAddress);
queueView = getProxyToQueue(forwardingAddress);
assertEquals(rebootServer ? 2 : 3, queueView.getMessageCount());
receiver.flow(2);
for (int i = 0; i < 2; i++) {
AmqpMessage receivedMessage = receiver.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(receivedMessage);
Assert.assertEquals("here", receivedMessage.getApplicationProperty("always"));
Assert.assertEquals("mundo", receivedMessage.getApplicationProperty("oi"));
receivedMessage.accept();
}
if (!rebootServer) {
// if we did not reboot the server a third message was sent
receiver.flow(1);
AmqpMessage receivedMessage = receiver.receive(5, TimeUnit.SECONDS);
receivedMessage.accept();
Assert.assertEquals("mundo", receivedMessage.getApplicationProperty("oi"));
Assert.assertEquals(largeString, receivedMessage.getApplicationProperty("largeString"));
}
receiver.close();
Wait.assertEquals(0, queueView::getMessageCount);
connection.close();
}
@Override
public Message transform(Message message) {
divertCount.incrementAndGet();
if (message.getBooleanProperty("addLarge")) {
message.putStringProperty("largeString", largeString);
}
message.putBooleanProperty("oi", true);
message.putStringProperty("oi", "mundo");
message.reencode();
return message;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -93,7 +94,7 @@ public class PropertyParseOptimizationTest extends AmqpClientTestSupport {
// if this rule fails it means something is requesting the application property for the message,
// or the optimization is gone.
// be careful if you decide to change this rule, as we have done extensive test to get this in place.
Assert.assertNull("Application properties on AMQP Messages should only be parsed over demand", message.getDecodedApplicationProperties());
Assert.assertNull("Application properties on AMQP Messages should only be parsed over demand", AMQPMessageBrokerAccessor.getDecodedApplicationProperties(message));
}
AmqpReceiver receiver = session.createReceiver(getQueueName(), "odd=true");