This commit is contained in:
Justin Bertram 2018-12-17 10:30:31 -06:00
commit 969983cf1b
2 changed files with 126 additions and 13 deletions

View File

@ -60,6 +60,7 @@ import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.DroppingWritableBuffer;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
@ -121,6 +122,7 @@ public class AMQPMessage extends RefCountMessage {
private String connectionID;
private final CoreMessageObjectPools coreMessageObjectPools;
private Set<Object> rejectedConsumers;
private DeliveryAnnotations deliveryAnnotationsForSendBuffer;
// These are properties set at the broker level and carried only internally by broker storage.
private volatile TypedProperties extraProperties;
@ -239,6 +241,21 @@ public class AMQPMessage extends RefCountMessage {
return scanForMessageSection(deliveryAnnotationsPosition, DeliveryAnnotations.class);
}
/**
* Sets the delivery annotations to be included when encoding the message for sending it on the wire.
*
* The broker can add additional message annotations as long as the annotations being added follow the
* rules from the spec. If the user adds something that the remote doesn't understand and it is not
* prefixed with "x-opt" the remote can just kill the link. See:
*
* http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-annotations
*
* @param deliveryAnnotations delivery annotations used in the sendBuffer() method
*/
public void setDeliveryAnnotationsForSendBuffer(DeliveryAnnotations deliveryAnnotations) {
this.deliveryAnnotationsForSendBuffer = deliveryAnnotations;
}
/**
* Returns a copy of the DeliveryAnnotations in the message if present or null. Changes to the
* returned MessageAnnotations instance do not affect the original Message.
@ -545,26 +562,27 @@ public class AMQPMessage extends RefCountMessage {
if (deliveryCount > 1) {
return createCopyWithNewDeliveryCount(deliveryCount);
} else if (deliveryAnnotationsPosition != VALUE_NOT_PRESENT) {
return createCopyWithoutDeliveryAnnotations();
} else if (deliveryAnnotationsPosition != VALUE_NOT_PRESENT
|| (deliveryAnnotationsForSendBuffer != null && !deliveryAnnotationsForSendBuffer.getValue().isEmpty())) {
return createCopyWithSkippedOrExplicitlySetDeliveryAnnotations();
} else {
// Common case message has no delivery annotations and this is the first delivery
// so no re-encoding or section skipping needed.
// Common case message has no delivery annotations, no delivery annotations for the send buffer were set
// and this is the first delivery so no re-encoding or section skipping needed.
return data.duplicate();
}
}
private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
assert deliveryAnnotationsPosition != VALUE_NOT_PRESENT;
// The original message had delivery annotations and so we must copy into a new
// buffer skipping the delivery annotations section as that is not meant to survive
// beyond this hop.
private ReadableBuffer createCopyWithSkippedOrExplicitlySetDeliveryAnnotations() {
// The original message had delivery annotations, or delivery annotations for the send buffer are set.
// That means we must copy into a new buffer skipping the original delivery annotations section
// (not meant to survive beyond this hop) and including the delivery annotations for the send buffer if set.
ReadableBuffer duplicate = data.duplicate();
final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
result.writeBytes(duplicate.limit(encodedHeaderSize).byteBuffer());
writeDeliveryAnnotationsForSendBuffer(result);
duplicate.clear();
// skip existing delivery annotations of the original message
duplicate.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
result.writeBytes(duplicate.byteBuffer());
@ -594,8 +612,8 @@ public class AMQPMessage extends RefCountMessage {
TLSEncode.getEncoder().writeObject(header);
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
// This will skip any existing delivery annotations that might have been present
// in the original message.
writeDeliveryAnnotationsForSendBuffer(result);
// skip existing delivery annotations of the original message
data.position(encodedHeaderSize + encodedDeliveryAnnotationsSize);
result.writeBytes(data.byteBuffer());
data.position(0);
@ -603,6 +621,25 @@ public class AMQPMessage extends RefCountMessage {
return new NettyReadable(result);
}
private void writeDeliveryAnnotationsForSendBuffer(ByteBuf result) {
if (deliveryAnnotationsForSendBuffer != null && !deliveryAnnotationsForSendBuffer.getValue().isEmpty()) {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
TLSEncode.getEncoder().writeObject(deliveryAnnotationsForSendBuffer);
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
}
}
private int getDeliveryAnnotationsForSendBufferSize() {
if (deliveryAnnotationsForSendBuffer == null || deliveryAnnotationsForSendBuffer.getValue().isEmpty()) {
return 0;
}
DroppingWritableBuffer droppingWritableBuffer = new DroppingWritableBuffer();
TLSEncode.getEncoder().setByteBuffer(droppingWritableBuffer);
TLSEncode.getEncoder().writeObject(deliveryAnnotationsForSendBuffer);
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
return droppingWritableBuffer.position() + 1;
}
@Override
public void messageChanged() {
modified = true;
@ -632,7 +669,7 @@ public class AMQPMessage extends RefCountMessage {
public int getEncodeSize() {
ensureDataIsValid();
// The encoded size will exclude any delivery annotations that are present as we will clip them.
return data.remaining() - encodedDeliveryAnnotationsSize;
return data.remaining() - encodedDeliveryAnnotationsSize + getDeliveryAnnotationsForSendBufferSize();
}
@Override

View File

@ -1920,6 +1920,82 @@ public class AMQPMessageTest {
}
}
@Test
public void testGetSendBufferWithoutDeliveryAnnotations() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Header header = new Header();
header.setDeliveryCount(new UnsignedInteger(1));
protonMessage.setHeader(header);
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.setBody(new AmqpValue("Sample payload"));
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
final String annotationKey = "annotationKey";
final String annotationValue = "annotationValue";
deliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), annotationValue);
protonMessage.setDeliveryAnnotations(deliveryAnnotations);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
ReadableBuffer sendBuffer = decoded.getSendBuffer(1);
assertEquals(decoded.getEncodeSize(), sendBuffer.capacity());
AMQPMessage msgFromSendBuffer = new AMQPMessage(0, sendBuffer, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer.getAddress());
assertNull(msgFromSendBuffer.getDeliveryAnnotations());
// again with higher deliveryCount
ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5);
assertEquals(decoded.getEncodeSize(), sendBuffer2.capacity());
AMQPMessage msgFromSendBuffer2 = new AMQPMessage(0, sendBuffer2, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress());
assertNull(msgFromSendBuffer2.getDeliveryAnnotations());
}
@Test
public void testGetSendBufferWithDeliveryAnnotations() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Header header = new Header();
header.setDeliveryCount(new UnsignedInteger(1));
protonMessage.setHeader(header);
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.setBody(new AmqpValue("Sample payload"));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
DeliveryAnnotations newDeliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
final String annotationKey = "annotationKey";
final String annotationValue = "annotationValue";
newDeliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), annotationValue);
decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations);
ReadableBuffer sendBuffer = decoded.getSendBuffer(1);
assertEquals(decoded.getEncodeSize(), sendBuffer.capacity());
AMQPMessage msgFromSendBuffer = new AMQPMessage(0, sendBuffer, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer.getAddress());
assertNotNull(msgFromSendBuffer.getDeliveryAnnotations());
assertEquals(1, msgFromSendBuffer.getDeliveryAnnotations().getValue().size());
assertEquals(annotationValue, msgFromSendBuffer.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey)));
// again with higher deliveryCount
DeliveryAnnotations newDeliveryAnnotations2 = new DeliveryAnnotations(new HashMap<>());
final String annotationKey2 = "annotationKey2";
final String annotationValue2 = "annotationValue2";
newDeliveryAnnotations2.getValue().put(Symbol.getSymbol(annotationKey2), annotationValue2);
decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations2);
ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5);
assertEquals(decoded.getEncodeSize(), sendBuffer2.capacity());
AMQPMessage msgFromSendBuffer2 = new AMQPMessage(0, sendBuffer2, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress());
assertNotNull(msgFromSendBuffer2.getDeliveryAnnotations());
assertEquals(1, msgFromSendBuffer2.getDeliveryAnnotations().getValue().size());
assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2)));
}
//----- Test Support ------------------------------------------------------//
private MessageImpl createProtonMessage() {