This commit is contained in:
Clebert Suconic 2017-08-22 22:02:42 -04:00
commit 8d3f484387
1 changed files with 45 additions and 21 deletions

View File

@ -75,8 +75,13 @@ public class AMQPMessage extends RefCountMessage {
MessageImpl protonMessage;
private volatile int memoryEstimate = -1;
private long expiration = 0;
// this is to store where to start sending bytes, ignoring header and delivery annotations.
private int sendFrom = 0;
// Records where the Header section ends if present.
private int headerEnds = 0;
// Records where the message payload starts, ignoring DeliveryAnnotations if present
private int messagePaylodStart = 0;
private boolean parsedHeaders = false;
private Header _header;
private DeliveryAnnotations _deliveryAnnotations;
@ -130,14 +135,15 @@ public class AMQPMessage extends RefCountMessage {
private void initalizeObjects() {
if (protonMessage == null) {
if (data == null) {
this.sendFrom = 0;
headerEnds = 0;
messagePaylodStart = 0;
_header = new Header();
_deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
_properties = new Properties();
this.applicationProperties = new ApplicationProperties(new HashMap<>());
this.protonMessage = (MessageImpl) Message.Factory.create();
this.protonMessage.setApplicationProperties(applicationProperties);
this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
applicationProperties = new ApplicationProperties(new HashMap<>());
protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setApplicationProperties(applicationProperties);
protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
}
}
}
@ -364,8 +370,9 @@ public class AMQPMessage extends RefCountMessage {
}
if (section instanceof Header) {
sendFrom = buffer.position();
_header = (Header) section;
headerEnds = buffer.position();
messagePaylodStart = headerEnds;
if (_header.getTtl() != null) {
this.expiration = System.currentTimeMillis() + _header.getTtl().intValue();
@ -376,13 +383,17 @@ public class AMQPMessage extends RefCountMessage {
} else {
section = null;
}
} else {
// meaning there is no header
sendFrom = 0;
headerEnds = 0;
}
if (section instanceof DeliveryAnnotations) {
_deliveryAnnotations = (DeliveryAnnotations) section;
sendFrom = buffer.position();
// Advance the start beyond the delivery annotations so they are not written
// out on send of the message.
messagePaylodStart = buffer.position();
if (buffer.hasRemaining()) {
section = (Section) decoder.readObject();
@ -463,10 +474,14 @@ public class AMQPMessage extends RefCountMessage {
checkBuffer();
byte[] origin = data.array();
byte[] newData = new byte[data.array().length - sendFrom];
for (int i = 0; i < newData.length; i++) {
newData[i] = origin[i + sendFrom];
}
byte[] newData = new byte[data.array().length - (messagePaylodStart - headerEnds)];
// Copy the original header
System.arraycopy(origin, 0, newData, 0, headerEnds);
// Copy the body following the delivery annotations if present
System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart);
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
newEncode.setDurable(isDurable());
return newEncode;
@ -550,7 +565,7 @@ public class AMQPMessage extends RefCountMessage {
}
if (getHeader() != null && getHeader().getDurable() != null) {
durable = getHeader().getDurable().booleanValue();
durable = getHeader().getDurable();
return durable;
} else {
return durable != null ? durable : false;
@ -652,26 +667,35 @@ public class AMQPMessage extends RefCountMessage {
public int getEncodeSize() {
checkBuffer();
// + 20checkBuffer is an estimate for the Header with the deliveryCount
return data.array().length - sendFrom + 20;
return data.array().length - messagePaylodStart + 20;
}
@Override
public void sendBuffer(ByteBuf buffer, int deliveryCount) {
checkBuffer();
int amqpDeliveryCount = deliveryCount - 1;
Header header = getHeader();
if (header == null && deliveryCount > 0) {
if (header == null && (amqpDeliveryCount > 0)) {
header = new Header();
header.setDurable(durable);
}
if (header != null) {
// If the re-delivering the message then the header must be re-encoded
// otherwise we want to write the original header if present.
if (amqpDeliveryCount > 0) {
synchronized (header) {
header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1));
header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
TLSEncode.getEncoder().writeObject(header);
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
}
} else if (headerEnds > 0) {
buffer.writeBytes(data, 0, headerEnds);
}
buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom);
buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
}
@Override
@ -1045,7 +1069,7 @@ public class AMQPMessage extends RefCountMessage {
int size = record.readInt();
byte[] recordArray = new byte[size];
record.readBytes(recordArray);
this.sendFrom = 0; // whatever was persisted will be sent
this.messagePaylodStart = 0; // whatever was persisted will be sent
this.data = Unpooled.wrappedBuffer(recordArray);
this.bufferValid = true;
this.durable = true; // it's coming from the journal, so it's durable