This commit is contained in:
Clebert Suconic 2018-03-06 18:44:22 -05:00
commit cc4a13a377
2 changed files with 105 additions and 25 deletions

View File

@ -665,16 +665,20 @@ public class AMQPMessage extends RefCountMessage {
private synchronized void checkBuffer() {
if (!bufferValid) {
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
try {
getProtonMessage().encode(new NettyWritable(buffer));
byte[] bytes = new byte[buffer.writerIndex()];
buffer.readBytes(bytes);
this.data = Unpooled.wrappedBuffer(bytes);
} finally {
buffer.release();
}
encodeProtonMessage();
}
}
private void encodeProtonMessage() {
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
try {
getProtonMessage().encode(new NettyWritable(buffer));
byte[] bytes = new byte[buffer.writerIndex()];
buffer.readBytes(bytes);
this.data = Unpooled.wrappedBuffer(bytes);
} finally {
buffer.release();
}
}
@ -691,15 +695,16 @@ public class AMQPMessage extends RefCountMessage {
int amqpDeliveryCount = deliveryCount - 1;
Header header = getHeader();
if (header == null && (amqpDeliveryCount > 0)) {
header = new Header();
header.setDurable(durable);
}
// If the re-delivering the message then the header must be re-encoded
// otherwise we want to write the original header if present.
if (amqpDeliveryCount > 0) {
Header header = getHeader();
if (header == null) {
header = new Header();
header.setDurable(durable);
}
synchronized (header) {
header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
@ -713,6 +718,76 @@ public class AMQPMessage extends RefCountMessage {
buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
}
/**
* Gets a ByteBuf from the Message that contains the encoded bytes to be sent on the wire.
* <p>
* When possible this method will present the bytes to the caller without copying them into
* another buffer copy. If copying is needed a new Netty buffer is created and returned. The
* caller should ensure that the reference count on the returned buffer is always decremented
* to avoid a leak in the case of a copied buffer being returned.
*
* @param deliveryCount
* The new delivery count for this message.
*
* @return a Netty ByteBuf containing the encoded bytes of this Message instance.
*/
public ByteBuf getSendBuffer(int deliveryCount) {
checkBuffer();
if (deliveryCount > 1) {
return createCopyWithNewDeliveryCount(deliveryCount);
} else if (headerEnds != messagePaylodStart) {
return createCopyWithoutDeliveryAnnotations();
} else {
// Common case message has no delivery annotations and this is the first delivery
// so no re-encoding or section skipping needed.
return data.retainedDuplicate();
}
}
private ByteBuf createCopyWithoutDeliveryAnnotations() {
assert headerEnds != messagePaylodStart;
// The original message had delivery annotations and so we must copy into a new
// buffer skipping the delivery annotations section as that is not meant to survive
// beyond this hop.
final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
result.writeBytes(data, 0, headerEnds);
result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
return result;
}
private ByteBuf createCopyWithNewDeliveryCount(int deliveryCount) {
assert deliveryCount > 1;
final int amqpDeliveryCount = deliveryCount - 1;
// If the re-delivering the message then the header must be re-encoded
// (or created if not previously present). Any delivery annotations should
// be skipped as well in the resulting buffer.
final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
Header header = getHeader();
if (header == null) {
header = new Header();
header.setDurable(durable);
}
synchronized (header) {
// Updates or adds a Header section with the correct delivery count
header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
TLSEncode.getEncoder().writeObject(header);
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
}
// This will skip any existing delivery annotations that might have been present
// in the original message.
result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
return result;
}
public TypedProperties createExtraProperties() {
if (extraProperties == null) {
extraProperties = new TypedProperties();

View File

@ -22,8 +22,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@ -48,6 +46,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
@ -75,6 +74,8 @@ import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
/**
* TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
*/
@ -690,11 +691,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// we only need a tag if we are going to settle later
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(message.getEncodeSize());
try {
message.sendBuffer(nettyBuffer, deliveryCount);
// Let the Message decide how to present the message bytes
ByteBuf sendBuffer = message.getSendBuffer(deliveryCount);
int size = nettyBuffer.writerIndex();
try {
int size = sendBuffer.writerIndex();
while (!connection.tryLock(1, TimeUnit.SECONDS)) {
if (closed || sender.getLocalState() == EndpointState.CLOSED) {
@ -714,8 +715,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);
// this will avoid a copy.. patch provided by Norman using buffer.array()
sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
if (sendBuffer.hasArray()) {
// this will avoid a copy.. patch provided by Norman using buffer.array()
sender.send(sendBuffer.array(), sendBuffer.arrayOffset() + sendBuffer.readerIndex(), sendBuffer.readableBytes());
} else {
sender.send(new NettyReadable(sendBuffer));
}
if (preSettle) {
// Presettled means the client implicitly accepts any delivery we send it.
@ -731,7 +736,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return size;
} finally {
nettyBuffer.release();
sendBuffer.release();
}
}