From c1cf9ef12d010cec89d6228b29f3459c3d51ee0d Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 27 Mar 2018 17:09:48 -0400 Subject: [PATCH] ARTEMIS-1843 Update Qpid JMS 0.32.0 and Proton-j 0.27.1 Use new no copy variants for the delivery send and receive and make use of the ReadableBuffer type that is now used to convery tranfer payloads without a copy. Also set max outgoing frame size to match the configured maxFrameSize for the AMQP protocol head to avoid the case where an overly large frame can be written instead of chunking a large message. --- .../protocol/amqp/broker/AMQPMessage.java | 97 ++-- .../amqp/broker/AMQPSessionCallback.java | 3 +- .../amqp/proton/AMQPConnectionContext.java | 15 +- .../proton/ProtonServerReceiverContext.java | 5 +- .../proton/ProtonServerSenderContext.java | 18 +- .../protocol/amqp/util/NettyReadable.java | 125 ++++- .../protocol/amqp/util/NettyWritable.java | 31 +- .../protocol/amqp/util/NettyReadableTest.java | 454 ++++++++++++++++++ .../protocol/amqp/util/NettyWritableTest.java | 151 ++++++ pom.xml | 4 +- .../amqp/AmqpLargeMessageTest.java | 95 +++- 11 files changed, 918 insertions(+), 80 deletions(-) create mode 100644 artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadableTest.java create mode 100644 artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 5f7dbac8a4..2775f774f9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; +import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.reader.MessageUtil; @@ -54,6 +55,7 @@ import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; @@ -69,7 +71,7 @@ public class AMQPMessage extends RefCountMessage { public static final int MAX_MESSAGE_PRIORITY = 9; final long messageFormat; - ByteBuf data; + ReadableBuffer data; boolean bufferValid; Boolean durable; long messageID; @@ -106,7 +108,11 @@ public class AMQPMessage extends RefCountMessage { } public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) { - this.data = Unpooled.wrappedBuffer(data); + this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), coreMessageObjectPools); + } + + public AMQPMessage(long messageFormat, ReadableBuffer data, CoreMessageObjectPools coreMessageObjectPools) { + this.data = data; this.messageFormat = messageFormat; this.bufferValid = true; this.coreMessageObjectPools = coreMessageObjectPools; @@ -136,8 +142,8 @@ public class AMQPMessage extends RefCountMessage { protonMessage = (MessageImpl) Message.Factory.create(); if (data != null) { - data.readerIndex(0); - protonMessage.decode(data.nioBuffer()); + data.rewind(); + protonMessage.decode(data.duplicate()); this._header = protonMessage.getHeader(); protonMessage.setHeader(null); } @@ -162,7 +168,6 @@ public class AMQPMessage extends RefCountMessage { } } - @SuppressWarnings("unchecked") private Map getApplicationPropertiesMap() { ApplicationProperties appMap = getApplicationProperties(); Map map = null; @@ -183,15 +188,15 @@ public class AMQPMessage extends RefCountMessage { parseHeaders(); if (applicationProperties == null && appLocation >= 0) { - ByteBuffer buffer = getBuffer().nioBuffer(); + ReadableBuffer buffer = data.duplicate(); buffer.position(appLocation); - TLSEncode.getDecoder().setByteBuffer(buffer); + TLSEncode.getDecoder().setBuffer(buffer); Object section = TLSEncode.getDecoder().readObject(); if (section instanceof ApplicationProperties) { this.applicationProperties = (ApplicationProperties) section; } this.appLocation = -1; - TLSEncode.getDecoder().setByteBuffer(null); + TLSEncode.getDecoder().setBuffer(null); } return applicationProperties; @@ -202,7 +207,7 @@ public class AMQPMessage extends RefCountMessage { if (data == null) { initalizeObjects(); } else { - partialDecode(data.nioBuffer()); + partialDecode(data); } parsedHeaders = true; } @@ -367,10 +372,9 @@ public class AMQPMessage extends RefCountMessage { rejectedConsumers.add(consumer); } - private synchronized void partialDecode(ByteBuffer buffer) { + private synchronized void partialDecode(ReadableBuffer buffer) { DecoderImpl decoder = TLSEncode.getDecoder(); - decoder.setByteBuffer(buffer); - buffer.position(0); + decoder.setBuffer(buffer.rewind()); _header = null; _deliveryAnnotations = null; @@ -449,6 +453,7 @@ public class AMQPMessage extends RefCountMessage { } } finally { decoder.setByteBuffer(null); + data.position(0); } } @@ -456,14 +461,6 @@ public class AMQPMessage extends RefCountMessage { return messageFormat; } - public int getLength() { - return data.array().length; - } - - public byte[] getArray() { - return data.array(); - } - @Override public void messageChanged() { bufferValid = false; @@ -475,7 +472,7 @@ public class AMQPMessage extends RefCountMessage { if (data == null) { return null; } else { - return Unpooled.wrappedBuffer(data); + return Unpooled.wrappedBuffer(data.byteBuffer()); } } @@ -489,14 +486,15 @@ public class AMQPMessage extends RefCountMessage { public org.apache.activemq.artemis.api.core.Message copy() { checkBuffer(); - byte[] origin = data.array(); - byte[] newData = new byte[data.array().length - (messagePaylodStart - headerEnds)]; + ReadableBuffer view = data.duplicate(); - // Copy the original header - System.arraycopy(origin, 0, newData, 0, headerEnds); + byte[] newData = new byte[view.remaining() - (messagePaylodStart - headerEnds)]; - // Copy the body following the delivery annotations if present - System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart); + view.position(0).limit(headerEnds); + view.get(newData, 0, headerEnds); + view.clear(); + view.position(messagePaylodStart); + view.get(newData, headerEnds, view.remaining()); AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData); newEncode.setDurable(isDurable()).setMessageID(this.getMessageID()); @@ -679,7 +677,7 @@ public class AMQPMessage extends RefCountMessage { getProtonMessage().encode(new NettyWritable(buffer)); byte[] bytes = new byte[buffer.writerIndex()]; buffer.readBytes(bytes); - this.data = Unpooled.wrappedBuffer(bytes); + this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(bytes)); } finally { buffer.release(); } @@ -689,7 +687,7 @@ public class AMQPMessage extends RefCountMessage { public int getEncodeSize() { checkBuffer(); // + 20checkBuffer is an estimate for the Header with the deliveryCount - return data.array().length - messagePaylodStart + 20; + return data.remaining() - messagePaylodStart + 20; } @Override @@ -715,10 +713,12 @@ public class AMQPMessage extends RefCountMessage { TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); } } else if (headerEnds > 0) { - buffer.writeBytes(data, 0, headerEnds); + buffer.writeBytes(data.duplicate().limit(headerEnds).byteBuffer()); } - buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart); + data.position(messagePaylodStart); + buffer.writeBytes(data.byteBuffer()); + data.position(0); } /** @@ -734,7 +734,7 @@ public class AMQPMessage extends RefCountMessage { * * @return a Netty ByteBuf containing the encoded bytes of this Message instance. */ - public ByteBuf getSendBuffer(int deliveryCount) { + public ReadableBuffer getSendBuffer(int deliveryCount) { checkBuffer(); if (deliveryCount > 1) { @@ -744,23 +744,28 @@ public class AMQPMessage extends RefCountMessage { } else { // Common case message has no delivery annotations and this is the first delivery // so no re-encoding or section skipping needed. - return data.retainedDuplicate(); + return data.duplicate(); } } - private ByteBuf createCopyWithoutDeliveryAnnotations() { + private ReadableBuffer createCopyWithoutDeliveryAnnotations() { assert headerEnds != messagePaylodStart; // The original message had delivery annotations and so we must copy into a new // buffer skipping the delivery annotations section as that is not meant to survive // beyond this hop. + ReadableBuffer duplicate = data.duplicate(); + final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize()); - result.writeBytes(data, 0, headerEnds); - result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart); - return result; + result.writeBytes(duplicate.limit(headerEnds).byteBuffer()); + duplicate.clear(); + duplicate.position(messagePaylodStart); + result.writeBytes(duplicate.byteBuffer()); + + return new NettyReadable(result); } - private ByteBuf createCopyWithNewDeliveryCount(int deliveryCount) { + private ReadableBuffer createCopyWithNewDeliveryCount(int deliveryCount) { assert deliveryCount > 1; final int amqpDeliveryCount = deliveryCount - 1; @@ -786,9 +791,11 @@ public class AMQPMessage extends RefCountMessage { // This will skip any existing delivery annotations that might have been present // in the original message. - result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart); + data.position(messagePaylodStart); + result.writeBytes(data.byteBuffer()); + data.position(0); - return result; + return new NettyReadable(result); } public TypedProperties createExtraProperties() { @@ -1222,14 +1229,18 @@ public class AMQPMessage extends RefCountMessage { } private int internalPersistSize() { - return data.array().length; + return data.remaining(); } @Override public void persist(ActiveMQBuffer targetRecord) { checkBuffer(); targetRecord.writeInt(internalPersistSize()); - targetRecord.writeBytes(data.array(), 0, data.array().length ); + if (data.hasArray()) { + targetRecord.writeBytes(data.array(), data.arrayOffset(), data.remaining()); + } else { + targetRecord.writeBytes(data.byteBuffer()); + } } @Override @@ -1238,7 +1249,7 @@ public class AMQPMessage extends RefCountMessage { byte[] recordArray = new byte[size]; record.readBytes(recordArray); this.messagePaylodStart = 0; // whatever was persisted will be sent - this.data = Unpooled.wrappedBuffer(recordArray); + this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray)); this.bufferValid = true; this.durable = true; // it's coming from the journal, so it's durable parseHeaders(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 7134d3bf9c..105d58adc1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -71,6 +71,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; @@ -441,7 +442,7 @@ public class AMQPSessionCallback implements SessionCallback { final Delivery delivery, SimpleString address, int messageFormat, - byte[] data) throws Exception { + ReadableBuffer data) throws Exception { AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools); if (address != null) { message.setAddress(address); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 39f960994f..4788d0d436 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME; + import java.net.URI; import java.util.Arrays; import java.util.HashMap; @@ -25,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; @@ -52,11 +57,7 @@ import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.jboss.logging.Logger; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME; +import io.netty.buffer.ByteBuf; public class AMQPConnectionContext extends ProtonInitializable implements EventHandler { @@ -118,12 +119,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH transport.setChannelMax(channelMax); transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize()); transport.setMaxFrameSize(maxFrameSize); + transport.setOutboundFrameSizeLimit(maxFrameSize); if (!isIncomingConnection && saslClientFactory != null) { handler.createClientSASL(); } } - public void scheduledFlush() { handler.scheduledFlush(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 3c35d763c4..0036004c4d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -40,6 +40,7 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.jboss.logging.Logger; @@ -221,10 +222,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements receiver = ((Receiver) delivery.getLink()); Transaction tx = null; - byte[] data; - data = new byte[delivery.available()]; - receiver.recv(data, 0, data.length); + ReadableBuffer data = receiver.recv(); receiver.advance(); if (delivery.getRemoteState() instanceof TransactionalState) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 990a2172f9..9b4704f765 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -68,14 +68,13 @@ import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Sender; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; - /** * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links */ @@ -692,10 +691,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); // Let the Message decide how to present the message bytes - ByteBuf sendBuffer = message.getSendBuffer(deliveryCount); + ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount); try { - int size = sendBuffer.writerIndex(); + int size = sendBuffer.remaining(); while (!connection.tryLock(1, TimeUnit.SECONDS)) { if (closed || sender.getLocalState() == EndpointState.CLOSED) { @@ -715,12 +714,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr delivery.setMessageFormat((int) message.getMessageFormat()); delivery.setContext(messageReference); - if (sendBuffer.hasArray()) { - // this will avoid a copy.. patch provided by Norman using buffer.array() - sender.send(sendBuffer.array(), sendBuffer.arrayOffset() + sendBuffer.readerIndex(), sendBuffer.readableBytes()); - } else { - sender.send(new NettyReadable(sendBuffer)); - } + sender.sendNoCopy(sendBuffer); if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. @@ -736,7 +730,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return size; } finally { - sendBuffer.release(); + if (sendBuffer instanceof NettyReadable) { + ((NettyReadable) sendBuffer).getByteBuf().release(); + } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java index e0705b458a..096d4a65de 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,15 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.artemis.protocol.amqp.util; import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; import io.netty.buffer.ByteBuf; -import org.apache.qpid.proton.codec.ReadableBuffer; +/** + * {@link ReadableBuffer} implementation that wraps a Netty {@link ByteBuf} to + * allow use of Netty buffers to be used when decoding AMQP messages. + */ public class NettyReadable implements ReadableBuffer { private static final Charset Charset_UTF8 = Charset.forName("UTF-8"); @@ -33,9 +40,8 @@ public class NettyReadable implements ReadableBuffer { this.buffer = buffer; } - @Override - public void put(ReadableBuffer other) { - buffer.writeBytes(other.byteBuffer()); + public ByteBuf getByteBuf() { + return this.buffer; } @Override @@ -93,7 +99,8 @@ public class NettyReadable implements ReadableBuffer { @Override public ReadableBuffer flip() { - return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex())); + buffer.setIndex(0, buffer.readerIndex()); + return this; } @Override @@ -136,4 +143,108 @@ public class NettyReadable implements ReadableBuffer { public String readUTF8() { return buffer.toString(Charset_UTF8); } + + @Override + public byte[] array() { + return buffer.array(); + } + + @Override + public int arrayOffset() { + return buffer.arrayOffset() + buffer.readerIndex(); + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public ReadableBuffer clear() { + buffer.setIndex(0, buffer.capacity()); + return this; + } + + @Override + public ReadableBuffer reclaimRead() { + return this; + } + + @Override + public byte get(int index) { + return buffer.getByte(index); + } + + @Override + public boolean hasArray() { + return buffer.hasArray(); + } + + @Override + public ReadableBuffer mark() { + buffer.markReaderIndex(); + return this; + } + + @Override + public String readString(CharsetDecoder decoder) throws CharacterCodingException { + return buffer.toString(decoder.charset()); + } + + @Override + public ReadableBuffer reset() { + buffer.resetReaderIndex(); + return this; + } + + @Override + public ReadableBuffer rewind() { + buffer.setIndex(0, buffer.writerIndex()); + return this; + } + + @Override + public ReadableBuffer get(WritableBuffer target) { + int start = target.position(); + + if (buffer.hasArray()) { + target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); + } else { + target.put(buffer.nioBuffer()); + } + + int written = target.position() - start; + + buffer.readerIndex(buffer.readerIndex() + written); + + return this; + } + + @Override + public String toString() { + return buffer.toString(); + } + + @Override + public int hashCode() { + return buffer.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof ReadableBuffer)) { + return false; + } + + ReadableBuffer readable = (ReadableBuffer) other; + if (this.remaining() != readable.remaining()) { + return false; + } + + return buffer.nioBuffer().equals(readable.byteBuffer()); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java index 75d39b6366..659f35f705 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java @@ -18,13 +18,15 @@ package org.apache.activemq.artemis.protocol.amqp.util; import java.nio.ByteBuffer; -import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.WritableBuffer; -/** - * This is to use NettyBuffer within Proton - */ +import io.netty.buffer.ByteBuf; +/** + * {@link WritableBuffer} implementation that wraps a Netty {@link ByteBuf} to + * allow use of Netty buffers to be used when encoding AMQP messages. + */ public class NettyWritable implements WritableBuffer { final ByteBuf nettyBuffer; @@ -33,6 +35,10 @@ public class NettyWritable implements WritableBuffer { this.nettyBuffer = nettyBuffer; } + public ByteBuf getByteBuf() { + return nettyBuffer; + } + @Override public void put(byte b) { nettyBuffer.writeByte(b); @@ -75,7 +81,7 @@ public class NettyWritable implements WritableBuffer { @Override public int remaining() { - return nettyBuffer.capacity() - nettyBuffer.writerIndex(); + return nettyBuffer.maxCapacity() - nettyBuffer.writerIndex(); } @Override @@ -93,8 +99,23 @@ public class NettyWritable implements WritableBuffer { nettyBuffer.writeBytes(payload); } + public void put(ByteBuf payload) { + nettyBuffer.writeBytes(payload); + } + @Override public int limit() { return nettyBuffer.capacity(); } + + @Override + public void put(ReadableBuffer buffer) { + if (buffer.hasArray()) { + nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining()); + } else { + while (buffer.hasRemaining()) { + nettyBuffer.writeByte(buffer.get()); + } + } + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadableTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadableTest.java new file mode 100644 index 0000000000..437d57b549 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadableTest.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.util; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.StandardCharsets; + +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * Tests for the ReadableBuffer wrapper that uses Netty ByteBuf underneath + */ +public class NettyReadableTest { + + @Test + public void testWrapBuffer() { + ByteBuf byteBuffer = Unpooled.buffer(100, 100); + + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(100, buffer.capacity()); + assertSame(byteBuffer, buffer.getByteBuf()); + assertSame(buffer, buffer.reclaimRead()); + } + + @Test + public void testArrayAccess() { + ByteBuf byteBuffer = Unpooled.buffer(100, 100); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertTrue(buffer.hasArray()); + assertSame(buffer.array(), byteBuffer.array()); + assertEquals(buffer.arrayOffset(), byteBuffer.arrayOffset()); + } + + @Test + public void testArrayAccessWhenNoArray() { + ByteBuf byteBuffer = Unpooled.directBuffer(); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertFalse(buffer.hasArray()); + } + + @Test + public void testByteBuffer() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + ByteBuffer nioBuffer = buffer.byteBuffer(); + assertEquals(data.length, nioBuffer.remaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], nioBuffer.get()); + } + } + + @Test + public void testGet() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + + try { + buffer.get(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testGetIndex() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get(i)); + } + + assertTrue(buffer.hasRemaining()); + } + + @Test + public void testGetShort() { + byte[] data = new byte[] {0, 1}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(1, buffer.getShort()); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getShort(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testGetInt() { + byte[] data = new byte[] {0, 0, 0, 1}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(1, buffer.getInt()); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getInt(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testGetLong() { + byte[] data = new byte[] {0, 0, 0, 0, 0, 0, 0, 1}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(1, buffer.getLong()); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getLong(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testGetFloat() { + byte[] data = new byte[] {0, 0, 0, 0}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(0, buffer.getFloat(), 0.0); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getFloat(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testGetDouble() { + byte[] data = new byte[] {0, 0, 0, 0, 0, 0, 0, 0}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(0, buffer.getDouble(), 0.0); + assertFalse(buffer.hasRemaining()); + + try { + buffer.getDouble(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testGetBytes() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + byte[] target = new byte[data.length]; + + buffer.get(target); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(data, target); + + try { + buffer.get(target); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testGetBytesIntInt() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + byte[] target = new byte[data.length]; + + buffer.get(target, 0, target.length); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(data, target); + + try { + buffer.get(target, 0, target.length); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testGetBytesToWritableBuffer() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length); + NettyWritable target = new NettyWritable(targetBuffer); + + buffer.get(target); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(targetBuffer.array(), data); + } + + @Test + public void testGetBytesToWritableBufferThatIsDirect() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length); + byteBuffer.writeBytes(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length); + NettyWritable target = new NettyWritable(targetBuffer); + + buffer.get(target); + assertFalse(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], target.getByteBuf().readByte()); + } + } + + @Test + public void testDuplicate() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + ReadableBuffer duplicate = buffer.duplicate(); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], duplicate.get()); + } + + assertFalse(duplicate.hasRemaining()); + } + + @Test + public void testSlice() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + ReadableBuffer slice = buffer.slice(); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], slice.get()); + } + + assertFalse(slice.hasRemaining()); + } + + @Test + public void testLimit() { + byte[] data = new byte[] {1, 2}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(data.length, buffer.limit()); + buffer.limit(1); + assertEquals(1, buffer.limit()); + assertEquals(1, buffer.get()); + assertFalse(buffer.hasRemaining()); + + try { + buffer.get(); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + } + + @Test + public void testClear() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + byte[] target = new byte[data.length]; + + buffer.get(target); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(data, target); + + try { + buffer.get(target); + fail("Should throw an IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException ioe) { + } + + buffer.clear(); + assertTrue(buffer.hasRemaining()); + assertEquals(data.length, buffer.remaining()); + buffer.get(target); + assertFalse(buffer.hasRemaining()); + assertArrayEquals(data, target); + } + + @Test + public void testRewind() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + buffer.rewind(); + assertTrue(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + } + + @Test + public void testReset() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + buffer.mark(); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + buffer.reset(); + assertTrue(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + } + + @Test + public void testGetPosition() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(buffer.position(), 0); + for (int i = 0; i < data.length; i++) { + assertEquals(buffer.position(), i); + assertEquals(data[i], buffer.get()); + assertEquals(buffer.position(), i + 1); + } + } + + @Test + public void testSetPosition() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + buffer.position(0); + assertTrue(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + } + + @Test + public void testFlip() { + byte[] data = new byte[] {0, 1, 2, 3, 4}; + ByteBuf byteBuffer = Unpooled.wrappedBuffer(data); + NettyReadable buffer = new NettyReadable(byteBuffer); + + buffer.mark(); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + + assertFalse(buffer.hasRemaining()); + buffer.flip(); + assertTrue(buffer.hasRemaining()); + + for (int i = 0; i < data.length; i++) { + assertEquals(data[i], buffer.get()); + } + } + + @Test + public void testReadUTF8() throws CharacterCodingException { + String testString = "test-string-1"; + byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8); + ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes); + + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(testString, buffer.readUTF8()); + } + + @Test + public void testReadString() throws CharacterCodingException { + String testString = "test-string-1"; + byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8); + ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes); + + NettyReadable buffer = new NettyReadable(byteBuffer); + + assertEquals(testString, buffer.readString(StandardCharsets.UTF_8.newDecoder())); + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java new file mode 100644 index 0000000000..f0de51a3cd --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; + +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * Tests for behavior of NettyWritable + */ +public class NettyWritableTest { + + @Test + public void testGetBuffer() { + ByteBuf buffer = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(buffer); + + assertSame(buffer, writable.getByteBuf()); + } + + @Test + public void testLimit() { + ByteBuf buffer = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(buffer); + + assertEquals(buffer.capacity(), writable.limit()); + } + + @Test + public void testRemaining() { + ByteBuf buffer = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(buffer); + + assertEquals(buffer.maxCapacity(), writable.remaining()); + writable.put((byte) 0); + assertEquals(buffer.maxCapacity() - 1, writable.remaining()); + } + + @Test + public void testHasRemaining() { + ByteBuf buffer = Unpooled.buffer(100, 100); + NettyWritable writable = new NettyWritable(buffer); + + assertTrue(writable.hasRemaining()); + writable.put((byte) 0); + assertTrue(writable.hasRemaining()); + buffer.writerIndex(buffer.maxCapacity()); + assertFalse(writable.hasRemaining()); + } + + @Test + public void testGetPosition() { + ByteBuf buffer = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(buffer); + + assertEquals(0, writable.position()); + writable.put((byte) 0); + assertEquals(1, writable.position()); + } + + @Test + public void testSetPosition() { + ByteBuf buffer = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(buffer); + + assertEquals(0, writable.position()); + writable.position(1); + assertEquals(1, writable.position()); + } + + @Test + public void testPutByteBuffer() { + ByteBuffer input = ByteBuffer.allocate(1024); + input.put((byte) 1); + input.flip(); + + ByteBuf buffer = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(buffer); + + assertEquals(0, writable.position()); + writable.put(input); + assertEquals(1, writable.position()); + } + + @Test + public void testPutByteBuf() { + ByteBuf input = Unpooled.buffer(); + input.writeByte((byte) 1); + + ByteBuf buffer = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(buffer); + + assertEquals(0, writable.position()); + writable.put(input); + assertEquals(1, writable.position()); + } + + @Test + public void testPutReadableBuffer() { + doPutReadableBufferTestImpl(true); + doPutReadableBufferTestImpl(false); + } + + private void doPutReadableBufferTestImpl(boolean readOnly) { + ByteBuffer buf = ByteBuffer.allocate(1024); + buf.put((byte) 1); + buf.flip(); + if (readOnly) { + buf = buf.asReadOnlyBuffer(); + } + + ReadableBuffer input = new ReadableBuffer.ByteBufferReader(buf); + + if (readOnly) { + assertFalse("Expected buffer not to hasArray()", input.hasArray()); + } else { + assertTrue("Expected buffer to hasArray()", input.hasArray()); + } + + ByteBuf buffer = Unpooled.buffer(1024); + NettyWritable writable = new NettyWritable(buffer); + + assertEquals(0, writable.position()); + writable.put(input); + assertEquals(1, writable.position()); + } +} diff --git a/pom.xml b/pom.xml index 2fa7b0baef..b1e133e8c2 100644 --- a/pom.xml +++ b/pom.xml @@ -92,10 +92,10 @@ 2.4 2.8.47 4.1.22.Final - 0.26.0 + 0.27.1 3.0.19.Final 1.7.21 - 0.30.0 + 0.32.0 0.9.5 1.0-alpha-1 1 diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java index d70c700fda..7e80c8ff74 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -17,12 +17,15 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -46,11 +49,18 @@ import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AmqpLargeMessageTest extends AmqpClientTestSupport { - private static final int FRAME_SIZE = 10024; + protected static final Logger LOG = LoggerFactory.getLogger(AmqpLargeMessageTest.class); + + private final Random rand = new Random(System.currentTimeMillis()); + + private static final int FRAME_SIZE = 32767; private static final int PAYLOAD = 110 * 1024; String testQueueName = "ConnectionFrameSize"; @@ -232,6 +242,89 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { receiveJMS(nMsgs, factory); } + private byte[] createLargePayload(int sizeInBytes) { + byte[] payload = new byte[sizeInBytes]; + for (int i = 0; i < sizeInBytes; i++) { + payload[i] = (byte) rand.nextInt(256); + } + + LOG.debug("Created buffer with size : " + sizeInBytes + " bytes"); + return payload; + } + + @Test(timeout = 60000) + public void testSendSmallerMessages() throws Exception { + for (int i = 512; i <= (8 * 1024); i += 512) { + doTestSendLargeMessage(i); + } + } + + @Test(timeout = 120000) + public void testSendFixedSizedMessages() throws Exception { + doTestSendLargeMessage(65536); + doTestSendLargeMessage(65536 * 2); + doTestSendLargeMessage(65536 * 4); + } + + @Test(timeout = 120000) + public void testSend1MBMessage() throws Exception { + doTestSendLargeMessage(1024 * 1024); + } + + @Ignore("Useful for performance testing") + @Test(timeout = 120000) + public void testSend10MBMessage() throws Exception { + doTestSendLargeMessage(1024 * 1024 * 10); + } + + @Ignore("Useful for performance testing") + @Test(timeout = 120000) + public void testSend100MBMessage() throws Exception { + doTestSendLargeMessage(1024 * 1024 * 100); + } + + public void doTestSendLargeMessage(int expectedSize) throws Exception { + LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize); + byte[] payload = createLargePayload(expectedSize); + assertEquals(expectedSize, payload.length); + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + + long startTime = System.currentTimeMillis(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(payload); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + // Set this to non-default to get a Header in the encoded message. + producer.setPriority(4); + producer.send(message); + long endTime = System.currentTimeMillis(); + + LOG.info("Returned from send after {} ms", endTime - startTime); + startTime = System.currentTimeMillis(); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + LOG.info("Calling receive"); + Message received = consumer.receive(); + assertNotNull(received); + assertTrue(received instanceof BytesMessage); + BytesMessage bytesMessage = (BytesMessage) received; + assertNotNull(bytesMessage); + endTime = System.currentTimeMillis(); + + LOG.info("Returned from receive after {} ms", endTime - startTime); + byte[] bytesReceived = new byte[expectedSize]; + assertEquals(expectedSize, bytesMessage.readBytes(bytesReceived, expectedSize)); + assertTrue(Arrays.equals(payload, bytesReceived)); + connection.close(); + } + } + private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception { try (Connection connection = factory.createConnection()) { Session session = connection.createSession();