ARTEMIS-1843 Update Qpid JMS 0.32.0 and Proton-j 0.27.1

Use new no copy variants for the delivery send and receive and make
use of the ReadableBuffer type that is now used to convery tranfer
payloads without a copy.  Also set max outgoing frame size to match
the configured maxFrameSize for the AMQP protocol head to avoid the
case where an overly large frame can be written instead of chunking
a large message.
This commit is contained in:
Timothy Bish 2018-03-27 17:09:48 -04:00 committed by Clebert Suconic
parent b60f6489f3
commit c1cf9ef12d
11 changed files with 918 additions and 80 deletions

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
@ -54,6 +55,7 @@ import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
@ -69,7 +71,7 @@ public class AMQPMessage extends RefCountMessage {
public static final int MAX_MESSAGE_PRIORITY = 9;
final long messageFormat;
ByteBuf data;
ReadableBuffer data;
boolean bufferValid;
Boolean durable;
long messageID;
@ -106,7 +108,11 @@ public class AMQPMessage extends RefCountMessage {
}
public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
this.data = Unpooled.wrappedBuffer(data);
this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), coreMessageObjectPools);
}
public AMQPMessage(long messageFormat, ReadableBuffer data, CoreMessageObjectPools coreMessageObjectPools) {
this.data = data;
this.messageFormat = messageFormat;
this.bufferValid = true;
this.coreMessageObjectPools = coreMessageObjectPools;
@ -136,8 +142,8 @@ public class AMQPMessage extends RefCountMessage {
protonMessage = (MessageImpl) Message.Factory.create();
if (data != null) {
data.readerIndex(0);
protonMessage.decode(data.nioBuffer());
data.rewind();
protonMessage.decode(data.duplicate());
this._header = protonMessage.getHeader();
protonMessage.setHeader(null);
}
@ -162,7 +168,6 @@ public class AMQPMessage extends RefCountMessage {
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> getApplicationPropertiesMap() {
ApplicationProperties appMap = getApplicationProperties();
Map<String, Object> map = null;
@ -183,15 +188,15 @@ public class AMQPMessage extends RefCountMessage {
parseHeaders();
if (applicationProperties == null && appLocation >= 0) {
ByteBuffer buffer = getBuffer().nioBuffer();
ReadableBuffer buffer = data.duplicate();
buffer.position(appLocation);
TLSEncode.getDecoder().setByteBuffer(buffer);
TLSEncode.getDecoder().setBuffer(buffer);
Object section = TLSEncode.getDecoder().readObject();
if (section instanceof ApplicationProperties) {
this.applicationProperties = (ApplicationProperties) section;
}
this.appLocation = -1;
TLSEncode.getDecoder().setByteBuffer(null);
TLSEncode.getDecoder().setBuffer(null);
}
return applicationProperties;
@ -202,7 +207,7 @@ public class AMQPMessage extends RefCountMessage {
if (data == null) {
initalizeObjects();
} else {
partialDecode(data.nioBuffer());
partialDecode(data);
}
parsedHeaders = true;
}
@ -367,10 +372,9 @@ public class AMQPMessage extends RefCountMessage {
rejectedConsumers.add(consumer);
}
private synchronized void partialDecode(ByteBuffer buffer) {
private synchronized void partialDecode(ReadableBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setByteBuffer(buffer);
buffer.position(0);
decoder.setBuffer(buffer.rewind());
_header = null;
_deliveryAnnotations = null;
@ -449,6 +453,7 @@ public class AMQPMessage extends RefCountMessage {
}
} finally {
decoder.setByteBuffer(null);
data.position(0);
}
}
@ -456,14 +461,6 @@ public class AMQPMessage extends RefCountMessage {
return messageFormat;
}
public int getLength() {
return data.array().length;
}
public byte[] getArray() {
return data.array();
}
@Override
public void messageChanged() {
bufferValid = false;
@ -475,7 +472,7 @@ public class AMQPMessage extends RefCountMessage {
if (data == null) {
return null;
} else {
return Unpooled.wrappedBuffer(data);
return Unpooled.wrappedBuffer(data.byteBuffer());
}
}
@ -489,14 +486,15 @@ public class AMQPMessage extends RefCountMessage {
public org.apache.activemq.artemis.api.core.Message copy() {
checkBuffer();
byte[] origin = data.array();
byte[] newData = new byte[data.array().length - (messagePaylodStart - headerEnds)];
ReadableBuffer view = data.duplicate();
// Copy the original header
System.arraycopy(origin, 0, newData, 0, headerEnds);
byte[] newData = new byte[view.remaining() - (messagePaylodStart - headerEnds)];
// Copy the body following the delivery annotations if present
System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart);
view.position(0).limit(headerEnds);
view.get(newData, 0, headerEnds);
view.clear();
view.position(messagePaylodStart);
view.get(newData, headerEnds, view.remaining());
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
@ -679,7 +677,7 @@ public class AMQPMessage extends RefCountMessage {
getProtonMessage().encode(new NettyWritable(buffer));
byte[] bytes = new byte[buffer.writerIndex()];
buffer.readBytes(bytes);
this.data = Unpooled.wrappedBuffer(bytes);
this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(bytes));
} finally {
buffer.release();
}
@ -689,7 +687,7 @@ public class AMQPMessage extends RefCountMessage {
public int getEncodeSize() {
checkBuffer();
// + 20checkBuffer is an estimate for the Header with the deliveryCount
return data.array().length - messagePaylodStart + 20;
return data.remaining() - messagePaylodStart + 20;
}
@Override
@ -715,10 +713,12 @@ public class AMQPMessage extends RefCountMessage {
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
}
} else if (headerEnds > 0) {
buffer.writeBytes(data, 0, headerEnds);
buffer.writeBytes(data.duplicate().limit(headerEnds).byteBuffer());
}
buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
data.position(messagePaylodStart);
buffer.writeBytes(data.byteBuffer());
data.position(0);
}
/**
@ -734,7 +734,7 @@ public class AMQPMessage extends RefCountMessage {
*
* @return a Netty ByteBuf containing the encoded bytes of this Message instance.
*/
public ByteBuf getSendBuffer(int deliveryCount) {
public ReadableBuffer getSendBuffer(int deliveryCount) {
checkBuffer();
if (deliveryCount > 1) {
@ -744,23 +744,28 @@ public class AMQPMessage extends RefCountMessage {
} else {
// Common case message has no delivery annotations and this is the first delivery
// so no re-encoding or section skipping needed.
return data.retainedDuplicate();
return data.duplicate();
}
}
private ByteBuf createCopyWithoutDeliveryAnnotations() {
private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
assert headerEnds != messagePaylodStart;
// The original message had delivery annotations and so we must copy into a new
// buffer skipping the delivery annotations section as that is not meant to survive
// beyond this hop.
ReadableBuffer duplicate = data.duplicate();
final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
result.writeBytes(data, 0, headerEnds);
result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
return result;
result.writeBytes(duplicate.limit(headerEnds).byteBuffer());
duplicate.clear();
duplicate.position(messagePaylodStart);
result.writeBytes(duplicate.byteBuffer());
return new NettyReadable(result);
}
private ByteBuf createCopyWithNewDeliveryCount(int deliveryCount) {
private ReadableBuffer createCopyWithNewDeliveryCount(int deliveryCount) {
assert deliveryCount > 1;
final int amqpDeliveryCount = deliveryCount - 1;
@ -786,9 +791,11 @@ public class AMQPMessage extends RefCountMessage {
// This will skip any existing delivery annotations that might have been present
// in the original message.
result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
data.position(messagePaylodStart);
result.writeBytes(data.byteBuffer());
data.position(0);
return result;
return new NettyReadable(result);
}
public TypedProperties createExtraProperties() {
@ -1222,14 +1229,18 @@ public class AMQPMessage extends RefCountMessage {
}
private int internalPersistSize() {
return data.array().length;
return data.remaining();
}
@Override
public void persist(ActiveMQBuffer targetRecord) {
checkBuffer();
targetRecord.writeInt(internalPersistSize());
targetRecord.writeBytes(data.array(), 0, data.array().length );
if (data.hasArray()) {
targetRecord.writeBytes(data.array(), data.arrayOffset(), data.remaining());
} else {
targetRecord.writeBytes(data.byteBuffer());
}
}
@Override
@ -1238,7 +1249,7 @@ public class AMQPMessage extends RefCountMessage {
byte[] recordArray = new byte[size];
record.readBytes(recordArray);
this.messagePaylodStart = 0; // whatever was persisted will be sent
this.data = Unpooled.wrappedBuffer(recordArray);
this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
this.bufferValid = true;
this.durable = true; // it's coming from the journal, so it's durable
parseHeaders();

View File

@ -71,6 +71,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
@ -441,7 +442,7 @@ public class AMQPSessionCallback implements SessionCallback {
final Delivery delivery,
SimpleString address,
int messageFormat,
byte[] data) throws Exception {
ReadableBuffer data) throws Exception {
AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
if (address != null) {
message.setAddress(address);

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
@ -25,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@ -52,11 +57,7 @@ import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
import io.netty.buffer.ByteBuf;
public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
@ -118,12 +119,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
transport.setChannelMax(channelMax);
transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize());
transport.setMaxFrameSize(maxFrameSize);
transport.setOutboundFrameSizeLimit(maxFrameSize);
if (!isIncomingConnection && saslClientFactory != null) {
handler.createClientSASL();
}
}
public void scheduledFlush() {
handler.scheduledFlush();
}

View File

@ -40,6 +40,7 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
@ -221,10 +222,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
receiver = ((Receiver) delivery.getLink());
Transaction tx = null;
byte[] data;
data = new byte[delivery.available()];
receiver.recv(data, 0, data.length);
ReadableBuffer data = receiver.recv();
receiver.advance();
if (delivery.getRemoteState() instanceof TransactionalState) {

View File

@ -68,14 +68,13 @@ import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
/**
* TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
*/
@ -692,10 +691,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
// Let the Message decide how to present the message bytes
ByteBuf sendBuffer = message.getSendBuffer(deliveryCount);
ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
try {
int size = sendBuffer.writerIndex();
int size = sendBuffer.remaining();
while (!connection.tryLock(1, TimeUnit.SECONDS)) {
if (closed || sender.getLocalState() == EndpointState.CLOSED) {
@ -715,12 +714,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);
if (sendBuffer.hasArray()) {
// this will avoid a copy.. patch provided by Norman using buffer.array()
sender.send(sendBuffer.array(), sendBuffer.arrayOffset() + sendBuffer.readerIndex(), sendBuffer.readableBytes());
} else {
sender.send(new NettyReadable(sendBuffer));
}
sender.sendNoCopy(sendBuffer);
if (preSettle) {
// Presettled means the client implicitly accepts any delivery we send it.
@ -736,7 +730,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return size;
} finally {
sendBuffer.release();
if (sendBuffer instanceof NettyReadable) {
((NettyReadable) sendBuffer).getByteBuf().release();
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -14,15 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.util;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.codec.ReadableBuffer;
/**
* {@link ReadableBuffer} implementation that wraps a Netty {@link ByteBuf} to
* allow use of Netty buffers to be used when decoding AMQP messages.
*/
public class NettyReadable implements ReadableBuffer {
private static final Charset Charset_UTF8 = Charset.forName("UTF-8");
@ -33,9 +40,8 @@ public class NettyReadable implements ReadableBuffer {
this.buffer = buffer;
}
@Override
public void put(ReadableBuffer other) {
buffer.writeBytes(other.byteBuffer());
public ByteBuf getByteBuf() {
return this.buffer;
}
@Override
@ -93,7 +99,8 @@ public class NettyReadable implements ReadableBuffer {
@Override
public ReadableBuffer flip() {
return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex()));
buffer.setIndex(0, buffer.readerIndex());
return this;
}
@Override
@ -136,4 +143,108 @@ public class NettyReadable implements ReadableBuffer {
public String readUTF8() {
return buffer.toString(Charset_UTF8);
}
@Override
public byte[] array() {
return buffer.array();
}
@Override
public int arrayOffset() {
return buffer.arrayOffset() + buffer.readerIndex();
}
@Override
public int capacity() {
return buffer.capacity();
}
@Override
public ReadableBuffer clear() {
buffer.setIndex(0, buffer.capacity());
return this;
}
@Override
public ReadableBuffer reclaimRead() {
return this;
}
@Override
public byte get(int index) {
return buffer.getByte(index);
}
@Override
public boolean hasArray() {
return buffer.hasArray();
}
@Override
public ReadableBuffer mark() {
buffer.markReaderIndex();
return this;
}
@Override
public String readString(CharsetDecoder decoder) throws CharacterCodingException {
return buffer.toString(decoder.charset());
}
@Override
public ReadableBuffer reset() {
buffer.resetReaderIndex();
return this;
}
@Override
public ReadableBuffer rewind() {
buffer.setIndex(0, buffer.writerIndex());
return this;
}
@Override
public ReadableBuffer get(WritableBuffer target) {
int start = target.position();
if (buffer.hasArray()) {
target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
} else {
target.put(buffer.nioBuffer());
}
int written = target.position() - start;
buffer.readerIndex(buffer.readerIndex() + written);
return this;
}
@Override
public String toString() {
return buffer.toString();
}
@Override
public int hashCode() {
return buffer.hashCode();
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof ReadableBuffer)) {
return false;
}
ReadableBuffer readable = (ReadableBuffer) other;
if (this.remaining() != readable.remaining()) {
return false;
}
return buffer.nioBuffer().equals(readable.byteBuffer());
}
}

View File

@ -18,13 +18,15 @@ package org.apache.activemq.artemis.protocol.amqp.util;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
/**
* This is to use NettyBuffer within Proton
*/
import io.netty.buffer.ByteBuf;
/**
* {@link WritableBuffer} implementation that wraps a Netty {@link ByteBuf} to
* allow use of Netty buffers to be used when encoding AMQP messages.
*/
public class NettyWritable implements WritableBuffer {
final ByteBuf nettyBuffer;
@ -33,6 +35,10 @@ public class NettyWritable implements WritableBuffer {
this.nettyBuffer = nettyBuffer;
}
public ByteBuf getByteBuf() {
return nettyBuffer;
}
@Override
public void put(byte b) {
nettyBuffer.writeByte(b);
@ -75,7 +81,7 @@ public class NettyWritable implements WritableBuffer {
@Override
public int remaining() {
return nettyBuffer.capacity() - nettyBuffer.writerIndex();
return nettyBuffer.maxCapacity() - nettyBuffer.writerIndex();
}
@Override
@ -93,8 +99,23 @@ public class NettyWritable implements WritableBuffer {
nettyBuffer.writeBytes(payload);
}
public void put(ByteBuf payload) {
nettyBuffer.writeBytes(payload);
}
@Override
public int limit() {
return nettyBuffer.capacity();
}
@Override
public void put(ReadableBuffer buffer) {
if (buffer.hasArray()) {
nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
} else {
while (buffer.hasRemaining()) {
nettyBuffer.writeByte(buffer.get());
}
}
}
}

View File

@ -0,0 +1,454 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.util;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* Tests for the ReadableBuffer wrapper that uses Netty ByteBuf underneath
*/
public class NettyReadableTest {
@Test
public void testWrapBuffer() {
ByteBuf byteBuffer = Unpooled.buffer(100, 100);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(100, buffer.capacity());
assertSame(byteBuffer, buffer.getByteBuf());
assertSame(buffer, buffer.reclaimRead());
}
@Test
public void testArrayAccess() {
ByteBuf byteBuffer = Unpooled.buffer(100, 100);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertTrue(buffer.hasArray());
assertSame(buffer.array(), byteBuffer.array());
assertEquals(buffer.arrayOffset(), byteBuffer.arrayOffset());
}
@Test
public void testArrayAccessWhenNoArray() {
ByteBuf byteBuffer = Unpooled.directBuffer();
NettyReadable buffer = new NettyReadable(byteBuffer);
assertFalse(buffer.hasArray());
}
@Test
public void testByteBuffer() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ByteBuffer nioBuffer = buffer.byteBuffer();
assertEquals(data.length, nioBuffer.remaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], nioBuffer.get());
}
}
@Test
public void testGet() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
try {
buffer.get();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetIndex() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get(i));
}
assertTrue(buffer.hasRemaining());
}
@Test
public void testGetShort() {
byte[] data = new byte[] {0, 1};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(1, buffer.getShort());
assertFalse(buffer.hasRemaining());
try {
buffer.getShort();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetInt() {
byte[] data = new byte[] {0, 0, 0, 1};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(1, buffer.getInt());
assertFalse(buffer.hasRemaining());
try {
buffer.getInt();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetLong() {
byte[] data = new byte[] {0, 0, 0, 0, 0, 0, 0, 1};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(1, buffer.getLong());
assertFalse(buffer.hasRemaining());
try {
buffer.getLong();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetFloat() {
byte[] data = new byte[] {0, 0, 0, 0};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(0, buffer.getFloat(), 0.0);
assertFalse(buffer.hasRemaining());
try {
buffer.getFloat();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetDouble() {
byte[] data = new byte[] {0, 0, 0, 0, 0, 0, 0, 0};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(0, buffer.getDouble(), 0.0);
assertFalse(buffer.hasRemaining());
try {
buffer.getDouble();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetBytes() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
byte[] target = new byte[data.length];
buffer.get(target);
assertFalse(buffer.hasRemaining());
assertArrayEquals(data, target);
try {
buffer.get(target);
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetBytesIntInt() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
byte[] target = new byte[data.length];
buffer.get(target, 0, target.length);
assertFalse(buffer.hasRemaining());
assertArrayEquals(data, target);
try {
buffer.get(target, 0, target.length);
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetBytesToWritableBuffer() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
NettyWritable target = new NettyWritable(targetBuffer);
buffer.get(target);
assertFalse(buffer.hasRemaining());
assertArrayEquals(targetBuffer.array(), data);
}
@Test
public void testGetBytesToWritableBufferThatIsDirect() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length);
byteBuffer.writeBytes(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
NettyWritable target = new NettyWritable(targetBuffer);
buffer.get(target);
assertFalse(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], target.getByteBuf().readByte());
}
}
@Test
public void testDuplicate() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ReadableBuffer duplicate = buffer.duplicate();
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], duplicate.get());
}
assertFalse(duplicate.hasRemaining());
}
@Test
public void testSlice() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ReadableBuffer slice = buffer.slice();
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], slice.get());
}
assertFalse(slice.hasRemaining());
}
@Test
public void testLimit() {
byte[] data = new byte[] {1, 2};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(data.length, buffer.limit());
buffer.limit(1);
assertEquals(1, buffer.limit());
assertEquals(1, buffer.get());
assertFalse(buffer.hasRemaining());
try {
buffer.get();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testClear() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
byte[] target = new byte[data.length];
buffer.get(target);
assertFalse(buffer.hasRemaining());
assertArrayEquals(data, target);
try {
buffer.get(target);
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
buffer.clear();
assertTrue(buffer.hasRemaining());
assertEquals(data.length, buffer.remaining());
buffer.get(target);
assertFalse(buffer.hasRemaining());
assertArrayEquals(data, target);
}
@Test
public void testRewind() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
buffer.rewind();
assertTrue(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
}
@Test
public void testReset() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
buffer.mark();
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
buffer.reset();
assertTrue(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
}
@Test
public void testGetPosition() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(buffer.position(), 0);
for (int i = 0; i < data.length; i++) {
assertEquals(buffer.position(), i);
assertEquals(data[i], buffer.get());
assertEquals(buffer.position(), i + 1);
}
}
@Test
public void testSetPosition() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
buffer.position(0);
assertTrue(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
}
@Test
public void testFlip() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
buffer.mark();
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
buffer.flip();
assertTrue(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
}
@Test
public void testReadUTF8() throws CharacterCodingException {
String testString = "test-string-1";
byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(testString, buffer.readUTF8());
}
@Test
public void testReadString() throws CharacterCodingException {
String testString = "test-string-1";
byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(testString, buffer.readString(StandardCharsets.UTF_8.newDecoder()));
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* Tests for behavior of NettyWritable
*/
public class NettyWritableTest {
@Test
public void testGetBuffer() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertSame(buffer, writable.getByteBuf());
}
@Test
public void testLimit() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(buffer.capacity(), writable.limit());
}
@Test
public void testRemaining() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(buffer.maxCapacity(), writable.remaining());
writable.put((byte) 0);
assertEquals(buffer.maxCapacity() - 1, writable.remaining());
}
@Test
public void testHasRemaining() {
ByteBuf buffer = Unpooled.buffer(100, 100);
NettyWritable writable = new NettyWritable(buffer);
assertTrue(writable.hasRemaining());
writable.put((byte) 0);
assertTrue(writable.hasRemaining());
buffer.writerIndex(buffer.maxCapacity());
assertFalse(writable.hasRemaining());
}
@Test
public void testGetPosition() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.put((byte) 0);
assertEquals(1, writable.position());
}
@Test
public void testSetPosition() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.position(1);
assertEquals(1, writable.position());
}
@Test
public void testPutByteBuffer() {
ByteBuffer input = ByteBuffer.allocate(1024);
input.put((byte) 1);
input.flip();
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.put(input);
assertEquals(1, writable.position());
}
@Test
public void testPutByteBuf() {
ByteBuf input = Unpooled.buffer();
input.writeByte((byte) 1);
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.put(input);
assertEquals(1, writable.position());
}
@Test
public void testPutReadableBuffer() {
doPutReadableBufferTestImpl(true);
doPutReadableBufferTestImpl(false);
}
private void doPutReadableBufferTestImpl(boolean readOnly) {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put((byte) 1);
buf.flip();
if (readOnly) {
buf = buf.asReadOnlyBuffer();
}
ReadableBuffer input = new ReadableBuffer.ByteBufferReader(buf);
if (readOnly) {
assertFalse("Expected buffer not to hasArray()", input.hasArray());
} else {
assertTrue("Expected buffer to hasArray()", input.hasArray());
}
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.put(input);
assertEquals(1, writable.position());
}
}

View File

@ -92,10 +92,10 @@
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
<mockito.version>2.8.47</mockito.version>
<netty.version>4.1.22.Final</netty.version>
<proton.version>0.26.0</proton.version>
<proton.version>0.27.1</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
<qpid.jms.version>0.30.0</qpid.jms.version>
<qpid.jms.version>0.32.0</qpid.jms.version>
<johnzon.version>0.9.5</johnzon.version>
<json-p.spec.version>1.0-alpha-1</json-p.spec.version>
<javax.inject.version>1</javax.inject.version>

View File

@ -17,12 +17,15 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -46,11 +49,18 @@ import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpLargeMessageTest extends AmqpClientTestSupport {
private static final int FRAME_SIZE = 10024;
protected static final Logger LOG = LoggerFactory.getLogger(AmqpLargeMessageTest.class);
private final Random rand = new Random(System.currentTimeMillis());
private static final int FRAME_SIZE = 32767;
private static final int PAYLOAD = 110 * 1024;
String testQueueName = "ConnectionFrameSize";
@ -232,6 +242,89 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
receiveJMS(nMsgs, factory);
}
private byte[] createLargePayload(int sizeInBytes) {
byte[] payload = new byte[sizeInBytes];
for (int i = 0; i < sizeInBytes; i++) {
payload[i] = (byte) rand.nextInt(256);
}
LOG.debug("Created buffer with size : " + sizeInBytes + " bytes");
return payload;
}
@Test(timeout = 60000)
public void testSendSmallerMessages() throws Exception {
for (int i = 512; i <= (8 * 1024); i += 512) {
doTestSendLargeMessage(i);
}
}
@Test(timeout = 120000)
public void testSendFixedSizedMessages() throws Exception {
doTestSendLargeMessage(65536);
doTestSendLargeMessage(65536 * 2);
doTestSendLargeMessage(65536 * 4);
}
@Test(timeout = 120000)
public void testSend1MBMessage() throws Exception {
doTestSendLargeMessage(1024 * 1024);
}
@Ignore("Useful for performance testing")
@Test(timeout = 120000)
public void testSend10MBMessage() throws Exception {
doTestSendLargeMessage(1024 * 1024 * 10);
}
@Ignore("Useful for performance testing")
@Test(timeout = 120000)
public void testSend100MBMessage() throws Exception {
doTestSendLargeMessage(1024 * 1024 * 100);
}
public void doTestSendLargeMessage(int expectedSize) throws Exception {
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
byte[] payload = createLargePayload(expectedSize);
assertEquals(expectedSize, payload.length);
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
try (Connection connection = factory.createConnection()) {
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Set this to non-default to get a Header in the encoded message.
producer.setPriority(4);
producer.send(message);
long endTime = System.currentTimeMillis();
LOG.info("Returned from send after {} ms", endTime - startTime);
startTime = System.currentTimeMillis();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
LOG.info("Calling receive");
Message received = consumer.receive();
assertNotNull(received);
assertTrue(received instanceof BytesMessage);
BytesMessage bytesMessage = (BytesMessage) received;
assertNotNull(bytesMessage);
endTime = System.currentTimeMillis();
LOG.info("Returned from receive after {} ms", endTime - startTime);
byte[] bytesReceived = new byte[expectedSize];
assertEquals(expectedSize, bytesMessage.readBytes(bytesReceived, expectedSize));
assertTrue(Arrays.equals(payload, bytesReceived));
connection.close();
}
}
private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();