ARTEMIS-2083 Decode only the relavent portions of the message

Ensure that the Body of the message is never decoded in the partial
decode phase of the message processing and also gaurd against the
decode of ApplicationProperties which should be done lazily.  Add
lazy decode of DeliveryAnnotations as they are not used at present.
This commit is contained in:
Timothy Bish 2018-09-12 12:42:45 -04:00
parent 21190aabbc
commit 369e475af6
2 changed files with 199 additions and 72 deletions

View File

@ -53,9 +53,9 @@ import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl; import org.apache.qpid.proton.message.impl.MessageImpl;
@ -93,6 +93,7 @@ public class AMQPMessage extends RefCountMessage {
private DeliveryAnnotations _deliveryAnnotations; private DeliveryAnnotations _deliveryAnnotations;
private MessageAnnotations _messageAnnotations; private MessageAnnotations _messageAnnotations;
private Properties _properties; private Properties _properties;
private int deliveryAnnotationsPosition = -1;
private int appLocation = -1; private int appLocation = -1;
private ApplicationProperties applicationProperties; private ApplicationProperties applicationProperties;
private long scheduledTime = -1; private long scheduledTime = -1;
@ -195,16 +196,30 @@ public class AMQPMessage extends RefCountMessage {
buffer.position(appLocation); buffer.position(appLocation);
TLSEncode.getDecoder().setBuffer(buffer); TLSEncode.getDecoder().setBuffer(buffer);
Object section = TLSEncode.getDecoder().readObject(); Object section = TLSEncode.getDecoder().readObject();
if (section instanceof ApplicationProperties) { applicationProperties = (ApplicationProperties) section;
this.applicationProperties = (ApplicationProperties) section; appLocation = -1;
}
this.appLocation = -1;
TLSEncode.getDecoder().setBuffer(null); TLSEncode.getDecoder().setBuffer(null);
} }
return applicationProperties; return applicationProperties;
} }
private DeliveryAnnotations getDeliveryAnnotations() {
parseHeaders();
if (_deliveryAnnotations == null && deliveryAnnotationsPosition >= 0) {
ReadableBuffer buffer = data.duplicate();
buffer.position(deliveryAnnotationsPosition);
TLSEncode.getDecoder().setBuffer(buffer);
Object section = TLSEncode.getDecoder().readObject();
_deliveryAnnotations = (DeliveryAnnotations) section;
deliveryAnnotationsPosition = -1;
TLSEncode.getDecoder().setBuffer(null);
}
return _deliveryAnnotations;
}
private synchronized void parseHeaders() { private synchronized void parseHeaders() {
if (!parsedHeaders) { if (!parsedHeaders) {
if (data == null) { if (data == null) {
@ -380,83 +395,63 @@ public class AMQPMessage extends RefCountMessage {
decoder.setBuffer(buffer.rewind()); decoder.setBuffer(buffer.rewind());
_header = null; _header = null;
expiration = 0;
headerEnds = 0;
messagePaylodStart = 0;
_deliveryAnnotations = null; _deliveryAnnotations = null;
_messageAnnotations = null; _messageAnnotations = null;
_properties = null; _properties = null;
applicationProperties = null; applicationProperties = null;
Section section = null; appLocation = -1;
deliveryAnnotationsPosition = -1;
try { try {
if (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
section = (Section) decoder.readObject(); int constructorPos = buffer.position();
} TypeConstructor<?> constructor = decoder.readConstructor();
if (Header.class.equals(constructor.getTypeClass())) {
if (section instanceof Header) { _header = (Header) constructor.readValue();
_header = (Header) section; headerEnds = messagePaylodStart = buffer.position();
headerEnds = buffer.position(); durable = _header.getDurable();
messagePaylodStart = headerEnds;
this.durable = _header.getDurable();
if (_header.getTtl() != null) { if (_header.getTtl() != null) {
this.expiration = System.currentTimeMillis() + _header.getTtl().intValue(); expiration = System.currentTimeMillis() + _header.getTtl().intValue();
} }
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
if (buffer.hasRemaining()) { // Don't decode these as they are not used by the broker at all and are
section = (Section) decoder.readObject(); // discarded on send, mark for lazy decode if ever needed.
} else { constructor.skipValue();
section = null; deliveryAnnotationsPosition = constructorPos;
}
} else {
// meaning there is no header
headerEnds = 0;
}
if (section instanceof DeliveryAnnotations) {
_deliveryAnnotations = (DeliveryAnnotations) section;
// Advance the start beyond the delivery annotations so they are not written
// out on send of the message.
messagePaylodStart = buffer.position(); messagePaylodStart = buffer.position();
} else if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
if (buffer.hasRemaining()) { _messageAnnotations = (MessageAnnotations) constructor.readValue();
section = (Section) decoder.readObject(); } else if (Properties.class.equals(constructor.getTypeClass())) {
} else { _properties = (Properties) constructor.readValue();
section = null;
}
}
if (section instanceof MessageAnnotations) {
_messageAnnotations = (MessageAnnotations) section;
if (buffer.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
}
}
if (section instanceof Properties) {
_properties = (Properties) section;
if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) { if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) {
this.expiration = _properties.getAbsoluteExpiryTime().getTime(); expiration = _properties.getAbsoluteExpiryTime().getTime();
} }
// We don't read the next section on purpose, as we will parse ApplicationProperties // Next is either Application Properties or the rest of the message, leave it for
// lazily // lazy decode of the ApplicationProperties should there be any. Check first though
section = null; // as we don't want to actually decode the body which could be expensive.
}
if (section instanceof ApplicationProperties) {
applicationProperties = (ApplicationProperties) section;
} else {
if (buffer.hasRemaining()) { if (buffer.hasRemaining()) {
this.appLocation = buffer.position(); constructor = decoder.peekConstructor();
if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
appLocation = buffer.position();
}
}
break;
} else if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
// Lazy decoding will start at the TypeConstructor of these ApplicationProperties
appLocation = constructorPos;
break;
} else { } else {
this.appLocation = -1; break;
} }
} }
} finally { } finally {
decoder.setByteBuffer(null); decoder.setByteBuffer(null);
data.position(0); buffer.position(0);
} }
} }
@ -1082,6 +1077,7 @@ public class AMQPMessage extends RefCountMessage {
public void reencode() { public void reencode() {
parseHeaders(); parseHeaders();
getApplicationProperties(); getApplicationProperties();
getDeliveryAnnotations();
if (_header != null) getProtonMessage().setHeader(_header); if (_header != null) getProtonMessage().setHeader(_header);
if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations); if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
if (_messageAnnotations != null) getProtonMessage().setMessageAnnotations(_messageAnnotations); if (_messageAnnotations != null) getProtonMessage().setMessageAnnotations(_messageAnnotations);

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Date; import java.util.Date;
@ -32,13 +33,20 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl; import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert; import org.junit.Assert;
@ -58,7 +66,7 @@ public class AMQPMessageTest {
protonMessage.setProperties(properties); protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7)); protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE); protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap())); protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
@ -76,7 +84,7 @@ public class AMQPMessageTest {
protonMessage.setProperties(properties); protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7)); protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE); protonMessage.getHeader().setDurable(Boolean.TRUE);
HashMap map = new HashMap(); HashMap<String, Object> map = new HashMap<>();
map.put("key", "string1"); map.put("key", "string1");
protonMessage.setApplicationProperties(new ApplicationProperties(map)); protonMessage.setApplicationProperties(new ApplicationProperties(map));
@ -97,7 +105,6 @@ public class AMQPMessageTest {
assertEquals(true, newDecoded.getHeader().getDurable()); assertEquals(true, newDecoded.getHeader().getDurable());
assertEquals("newAddress", newDecoded.getAddress()); assertEquals("newAddress", newDecoded.getAddress());
assertEquals("string1", newDecoded.getObjectProperty("key")); assertEquals("string1", newDecoded.getObjectProperty("key"));
} }
@Test @Test
@ -281,7 +288,131 @@ public class AMQPMessageTest {
Assert.assertEquals("someAddress", readMessage.getAddress()); Assert.assertEquals("someAddress", readMessage.getAddress());
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name)); Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
} }
}
private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L);
@Test
public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue());
encodedBytes.writeByte(EncodingCodes.MAP8);
encodedBytes.writeByte(2); // Size
encodedBytes.writeByte(2); // Elements
// Use bad encoding code on underlying type of map key which will fail the decode if run
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
try {
// This should perform the lazy decode of the DeliveryAnnotations portion of the message
message.reencode();
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This should perform the lazy decode of the ApplicationProperties portion of the message
message.getStringProperty("test");
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresBodyByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This will decode the body section if present in order to present it as a Proton Message object
message.getProtonMessage();
fail("Should have thrown an error when attempting to decode the body which is malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
} }
private AMQPMessage encodeAndDecodeMessage(MessageImpl message) { private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {