This commit is contained in:
Clebert Suconic 2018-09-12 17:56:48 -04:00
commit 576f67c7c2
2 changed files with 199 additions and 72 deletions

View File

@ -53,9 +53,9 @@ import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
@ -93,6 +93,7 @@ public class AMQPMessage extends RefCountMessage {
private DeliveryAnnotations _deliveryAnnotations;
private MessageAnnotations _messageAnnotations;
private Properties _properties;
private int deliveryAnnotationsPosition = -1;
private int appLocation = -1;
private ApplicationProperties applicationProperties;
private long scheduledTime = -1;
@ -195,16 +196,30 @@ public class AMQPMessage extends RefCountMessage {
buffer.position(appLocation);
TLSEncode.getDecoder().setBuffer(buffer);
Object section = TLSEncode.getDecoder().readObject();
if (section instanceof ApplicationProperties) {
this.applicationProperties = (ApplicationProperties) section;
}
this.appLocation = -1;
applicationProperties = (ApplicationProperties) section;
appLocation = -1;
TLSEncode.getDecoder().setBuffer(null);
}
return applicationProperties;
}
private DeliveryAnnotations getDeliveryAnnotations() {
parseHeaders();
if (_deliveryAnnotations == null && deliveryAnnotationsPosition >= 0) {
ReadableBuffer buffer = data.duplicate();
buffer.position(deliveryAnnotationsPosition);
TLSEncode.getDecoder().setBuffer(buffer);
Object section = TLSEncode.getDecoder().readObject();
_deliveryAnnotations = (DeliveryAnnotations) section;
deliveryAnnotationsPosition = -1;
TLSEncode.getDecoder().setBuffer(null);
}
return _deliveryAnnotations;
}
private synchronized void parseHeaders() {
if (!parsedHeaders) {
if (data == null) {
@ -380,83 +395,63 @@ public class AMQPMessage extends RefCountMessage {
decoder.setBuffer(buffer.rewind());
_header = null;
expiration = 0;
headerEnds = 0;
messagePaylodStart = 0;
_deliveryAnnotations = null;
_messageAnnotations = null;
_properties = null;
applicationProperties = null;
Section section = null;
appLocation = -1;
deliveryAnnotationsPosition = -1;
try {
if (buffer.hasRemaining()) {
section = (Section) decoder.readObject();
}
while (buffer.hasRemaining()) {
int constructorPos = buffer.position();
TypeConstructor<?> constructor = decoder.readConstructor();
if (Header.class.equals(constructor.getTypeClass())) {
_header = (Header) constructor.readValue();
headerEnds = messagePaylodStart = buffer.position();
durable = _header.getDurable();
if (_header.getTtl() != null) {
expiration = System.currentTimeMillis() + _header.getTtl().intValue();
}
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
// Don't decode these as they are not used by the broker at all and are
// discarded on send, mark for lazy decode if ever needed.
constructor.skipValue();
deliveryAnnotationsPosition = constructorPos;
messagePaylodStart = buffer.position();
} else if (MessageAnnotations.class.equals(constructor.getTypeClass())) {
_messageAnnotations = (MessageAnnotations) constructor.readValue();
} else if (Properties.class.equals(constructor.getTypeClass())) {
_properties = (Properties) constructor.readValue();
if (section instanceof Header) {
_header = (Header) section;
headerEnds = buffer.position();
messagePaylodStart = headerEnds;
this.durable = _header.getDurable();
if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) {
expiration = _properties.getAbsoluteExpiryTime().getTime();
}
if (_header.getTtl() != null) {
this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
}
if (buffer.hasRemaining()) {
section = (Section) decoder.readObject();
// Next is either Application Properties or the rest of the message, leave it for
// lazy decode of the ApplicationProperties should there be any. Check first though
// as we don't want to actually decode the body which could be expensive.
if (buffer.hasRemaining()) {
constructor = decoder.peekConstructor();
if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
appLocation = buffer.position();
}
}
break;
} else if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
// Lazy decoding will start at the TypeConstructor of these ApplicationProperties
appLocation = constructorPos;
break;
} else {
section = null;
}
} else {
// meaning there is no header
headerEnds = 0;
}
if (section instanceof DeliveryAnnotations) {
_deliveryAnnotations = (DeliveryAnnotations) section;
// Advance the start beyond the delivery annotations so they are not written
// out on send of the message.
messagePaylodStart = buffer.position();
if (buffer.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
}
}
if (section instanceof MessageAnnotations) {
_messageAnnotations = (MessageAnnotations) section;
if (buffer.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
}
}
if (section instanceof Properties) {
_properties = (Properties) section;
if (_properties.getAbsoluteExpiryTime() != null && _properties.getAbsoluteExpiryTime().getTime() > 0) {
this.expiration = _properties.getAbsoluteExpiryTime().getTime();
}
// We don't read the next section on purpose, as we will parse ApplicationProperties
// lazily
section = null;
}
if (section instanceof ApplicationProperties) {
applicationProperties = (ApplicationProperties) section;
} else {
if (buffer.hasRemaining()) {
this.appLocation = buffer.position();
} else {
this.appLocation = -1;
break;
}
}
} finally {
decoder.setByteBuffer(null);
data.position(0);
buffer.position(0);
}
}
@ -1082,6 +1077,7 @@ public class AMQPMessage extends RefCountMessage {
public void reencode() {
parseHeaders();
getApplicationProperties();
getDeliveryAnnotations();
if (_header != null) getProtonMessage().setHeader(_header);
if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
if (_messageAnnotations != null) getProtonMessage().setMessageAnnotations(_messageAnnotations);

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.Date;
@ -32,13 +33,20 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
@ -58,7 +66,7 @@ public class AMQPMessageTest {
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap()));
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
@ -76,7 +84,7 @@ public class AMQPMessageTest {
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
HashMap map = new HashMap();
HashMap<String, Object> map = new HashMap<>();
map.put("key", "string1");
protonMessage.setApplicationProperties(new ApplicationProperties(map));
@ -97,7 +105,6 @@ public class AMQPMessageTest {
assertEquals(true, newDecoded.getHeader().getDurable());
assertEquals("newAddress", newDecoded.getAddress());
assertEquals("string1", newDecoded.getObjectProperty("key"));
}
@Test
@ -281,7 +288,131 @@ public class AMQPMessageTest {
Assert.assertEquals("someAddress", readMessage.getAddress());
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
}
}
private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L);
@Test
public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue());
encodedBytes.writeByte(EncodingCodes.MAP8);
encodedBytes.writeByte(2); // Size
encodedBytes.writeByte(2); // Elements
// Use bad encoding code on underlying type of map key which will fail the decode if run
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
try {
// This should perform the lazy decode of the DeliveryAnnotations portion of the message
message.reencode();
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This should perform the lazy decode of the ApplicationProperties portion of the message
message.getStringProperty("test");
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresBodyByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This will decode the body section if present in order to present it as a Proton Message object
message.getProtonMessage();
fail("Should have thrown an error when attempting to decode the body which is malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {