From 369e475af629d2159abae3a7b3e4162b924f7a3a Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 12 Sep 2018 12:42:45 -0400 Subject: [PATCH] ARTEMIS-2083 Decode only the relavent portions of the message Ensure that the Body of the message is never decoded in the partial decode phase of the message processing and also gaurd against the decode of ApplicationProperties which should be done lazily. Add lazy decode of DeliveryAnnotations as they are not used at present. --- .../protocol/amqp/broker/AMQPMessage.java | 134 +++++++++-------- .../amqp/message/AMQPMessageTest.java | 137 +++++++++++++++++- 2 files changed, 199 insertions(+), 72 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index c0f9d102c1..cff52290c2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -53,9 +53,9 @@ import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; -import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.ReadableBuffer; +import org.apache.qpid.proton.codec.TypeConstructor; import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; @@ -93,6 +93,7 @@ public class AMQPMessage extends RefCountMessage { private DeliveryAnnotations _deliveryAnnotations; private MessageAnnotations _messageAnnotations; private Properties _properties; + private int deliveryAnnotationsPosition = -1; private int appLocation = -1; private ApplicationProperties applicationProperties; private long scheduledTime = -1; @@ -195,16 +196,30 @@ public class AMQPMessage extends RefCountMessage { buffer.position(appLocation); TLSEncode.getDecoder().setBuffer(buffer); Object section = TLSEncode.getDecoder().readObject(); - if (section instanceof ApplicationProperties) { - this.applicationProperties = (ApplicationProperties) section; - } - this.appLocation = -1; + applicationProperties = (ApplicationProperties) section; + appLocation = -1; TLSEncode.getDecoder().setBuffer(null); } return applicationProperties; } + private DeliveryAnnotations getDeliveryAnnotations() { + parseHeaders(); + + if (_deliveryAnnotations == null && deliveryAnnotationsPosition >= 0) { + ReadableBuffer buffer = data.duplicate(); + buffer.position(deliveryAnnotationsPosition); + TLSEncode.getDecoder().setBuffer(buffer); + Object section = TLSEncode.getDecoder().readObject(); + _deliveryAnnotations = (DeliveryAnnotations) section; + deliveryAnnotationsPosition = -1; + TLSEncode.getDecoder().setBuffer(null); + } + + return _deliveryAnnotations; + } + private synchronized void parseHeaders() { if (!parsedHeaders) { if (data == null) { @@ -380,83 +395,63 @@ public class AMQPMessage extends RefCountMessage { decoder.setBuffer(buffer.rewind()); _header = null; + expiration = 0; + headerEnds = 0; + messagePaylodStart = 0; _deliveryAnnotations = null; _messageAnnotations = null; _properties = null; applicationProperties = null; - Section section = null; + appLocation = -1; + deliveryAnnotationsPosition = -1; try { - if (buffer.hasRemaining()) { - section = (Section) decoder.readObject(); - } + while (buffer.hasRemaining()) { + int constructorPos = buffer.position(); + TypeConstructor constructor = decoder.readConstructor(); + if (Header.class.equals(constructor.getTypeClass())) { + _header = (Header) constructor.readValue(); + headerEnds = messagePaylodStart = buffer.position(); + durable = _header.getDurable(); + if (_header.getTtl() != null) { + expiration = System.currentTimeMillis() + _header.getTtl().intValue(); + } + } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) { + // Don't decode these as they are not used by the broker at all and are + // discarded on send, mark for lazy decode if ever needed. + constructor.skipValue(); + deliveryAnnotationsPosition = constructorPos; + messagePaylodStart = buffer.position(); + } else if (MessageAnnotations.class.equals(constructor.getTypeClass())) { + _messageAnnotations = (MessageAnnotations) constructor.readValue(); + } else if (Properties.class.equals(constructor.getTypeClass())) { + _properties = (Properties) constructor.readValue(); - if (section instanceof Header) { - _header = (Header) section; - headerEnds = buffer.position(); - messagePaylodStart = headerEnds; - this.durable = _header.getDurable(); + if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) { + expiration = _properties.getAbsoluteExpiryTime().getTime(); + } - if (_header.getTtl() != null) { - this.expiration = System.currentTimeMillis() + _header.getTtl().intValue(); - } - - if (buffer.hasRemaining()) { - section = (Section) decoder.readObject(); + // Next is either Application Properties or the rest of the message, leave it for + // lazy decode of the ApplicationProperties should there be any. Check first though + // as we don't want to actually decode the body which could be expensive. + if (buffer.hasRemaining()) { + constructor = decoder.peekConstructor(); + if (ApplicationProperties.class.equals(constructor.getTypeClass())) { + appLocation = buffer.position(); + } + } + break; + } else if (ApplicationProperties.class.equals(constructor.getTypeClass())) { + // Lazy decoding will start at the TypeConstructor of these ApplicationProperties + appLocation = constructorPos; + break; } else { - section = null; - } - - } else { - // meaning there is no header - headerEnds = 0; - } - if (section instanceof DeliveryAnnotations) { - _deliveryAnnotations = (DeliveryAnnotations) section; - - // Advance the start beyond the delivery annotations so they are not written - // out on send of the message. - messagePaylodStart = buffer.position(); - - if (buffer.hasRemaining()) { - section = (Section) decoder.readObject(); - } else { - section = null; - } - } - if (section instanceof MessageAnnotations) { - _messageAnnotations = (MessageAnnotations) section; - - if (buffer.hasRemaining()) { - section = (Section) decoder.readObject(); - } else { - section = null; - } - } - if (section instanceof Properties) { - _properties = (Properties) section; - - if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) { - this.expiration = _properties.getAbsoluteExpiryTime().getTime(); - } - - // We don't read the next section on purpose, as we will parse ApplicationProperties - // lazily - section = null; - } - - if (section instanceof ApplicationProperties) { - applicationProperties = (ApplicationProperties) section; - } else { - if (buffer.hasRemaining()) { - this.appLocation = buffer.position(); - } else { - this.appLocation = -1; + break; } } } finally { decoder.setByteBuffer(null); - data.position(0); + buffer.position(0); } } @@ -1082,6 +1077,7 @@ public class AMQPMessage extends RefCountMessage { public void reencode() { parseHeaders(); getApplicationProperties(); + getDeliveryAnnotations(); if (_header != null) getProtonMessage().setHeader(_header); if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations); if (_messageAnnotations != null) getProtonMessage().setMessageAnnotations(_messageAnnotations); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java index 42ffaeeca9..a6a29a0e3f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.charset.StandardCharsets; import java.util.Date; @@ -32,13 +33,20 @@ import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2; +import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.EncodingCodes; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; @@ -58,7 +66,7 @@ public class AMQPMessageTest { protonMessage.setProperties(properties); protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7)); protonMessage.getHeader().setDurable(Boolean.TRUE); - protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap())); + protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>())); AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); @@ -76,7 +84,7 @@ public class AMQPMessageTest { protonMessage.setProperties(properties); protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7)); protonMessage.getHeader().setDurable(Boolean.TRUE); - HashMap map = new HashMap(); + HashMap map = new HashMap<>(); map.put("key", "string1"); protonMessage.setApplicationProperties(new ApplicationProperties(map)); @@ -97,7 +105,6 @@ public class AMQPMessageTest { assertEquals(true, newDecoded.getHeader().getDurable()); assertEquals("newAddress", newDecoded.getAddress()); assertEquals("string1", newDecoded.getObjectProperty("key")); - } @Test @@ -281,7 +288,131 @@ public class AMQPMessageTest { Assert.assertEquals("someAddress", readMessage.getAddress()); Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name)); } + } + private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L); + private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L); + private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L); + + @Test + public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() { + Header header = new Header(); + header.setDurable(true); + header.setPriority(UnsignedByte.valueOf((byte) 6)); + + ByteBuf encodedBytes = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(encodedBytes); + + EncoderImpl encoder = TLSEncode.getEncoder(); + encoder.setByteBuffer(writable); + encoder.writeObject(header); + + // Signal body of AmqpValue but write corrupt underlying type info + encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR); + encodedBytes.writeByte(EncodingCodes.SMALLULONG); + encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue()); + encodedBytes.writeByte(EncodingCodes.MAP8); + encodedBytes.writeByte(2); // Size + encodedBytes.writeByte(2); // Elements + // Use bad encoding code on underlying type of map key which will fail the decode if run + encodedBytes.writeByte(255); + + ReadableBuffer readable = new NettyReadable(encodedBytes); + + AMQPMessage message = null; + try { + message = new AMQPMessage(0, readable, null, null); + } catch (Exception decodeError) { + fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage()); + } + + try { + // This should perform the lazy decode of the DeliveryAnnotations portion of the message + message.reencode(); + fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed."); + } catch (Exception ex) { + // Expected decode to fail when building full message. + } + } + + @Test + public void testPartialDecodeIgnoresApplicationPropertiesByDefault() { + Header header = new Header(); + header.setDurable(true); + header.setPriority(UnsignedByte.valueOf((byte) 6)); + + ByteBuf encodedBytes = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(encodedBytes); + + EncoderImpl encoder = TLSEncode.getEncoder(); + encoder.setByteBuffer(writable); + encoder.writeObject(header); + + // Signal body of AmqpValue but write corrupt underlying type info + encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR); + encodedBytes.writeByte(EncodingCodes.SMALLULONG); + encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue()); + // Use bad encoding code on underlying type + encodedBytes.writeByte(255); + + ReadableBuffer readable = new NettyReadable(encodedBytes); + + AMQPMessage message = null; + try { + message = new AMQPMessage(0, readable, null, null); + } catch (Exception decodeError) { + fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage()); + } + + assertTrue(message.isDurable()); + + try { + // This should perform the lazy decode of the ApplicationProperties portion of the message + message.getStringProperty("test"); + fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed."); + } catch (Exception ex) { + // Expected decode to fail when building full message. + } + } + + @Test + public void testPartialDecodeIgnoresBodyByDefault() { + Header header = new Header(); + header.setDurable(true); + header.setPriority(UnsignedByte.valueOf((byte) 6)); + + ByteBuf encodedBytes = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(encodedBytes); + + EncoderImpl encoder = TLSEncode.getEncoder(); + encoder.setByteBuffer(writable); + encoder.writeObject(header); + + // Signal body of AmqpValue but write corrupt underlying type info + encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR); + encodedBytes.writeByte(EncodingCodes.SMALLULONG); + encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue()); + // Use bad encoding code on underlying type + encodedBytes.writeByte(255); + + ReadableBuffer readable = new NettyReadable(encodedBytes); + + AMQPMessage message = null; + try { + message = new AMQPMessage(0, readable, null, null); + } catch (Exception decodeError) { + fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage()); + } + + assertTrue(message.isDurable()); + + try { + // This will decode the body section if present in order to present it as a Proton Message object + message.getProtonMessage(); + fail("Should have thrown an error when attempting to decode the body which is malformed."); + } catch (Exception ex) { + // Expected decode to fail when building full message. + } } private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {