This commit is contained in:
Clebert Suconic 2018-05-03 12:10:26 -04:00
commit 410cb9ee23
11 changed files with 918 additions and 80 deletions

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
@ -54,6 +55,7 @@ import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
@ -69,7 +71,7 @@ public class AMQPMessage extends RefCountMessage {
public static final int MAX_MESSAGE_PRIORITY = 9;
final long messageFormat;
ByteBuf data;
ReadableBuffer data;
boolean bufferValid;
Boolean durable;
long messageID;
@ -106,7 +108,11 @@ public class AMQPMessage extends RefCountMessage {
}
public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
this.data = Unpooled.wrappedBuffer(data);
this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), coreMessageObjectPools);
}
public AMQPMessage(long messageFormat, ReadableBuffer data, CoreMessageObjectPools coreMessageObjectPools) {
this.data = data;
this.messageFormat = messageFormat;
this.bufferValid = true;
this.coreMessageObjectPools = coreMessageObjectPools;
@ -136,8 +142,8 @@ public class AMQPMessage extends RefCountMessage {
protonMessage = (MessageImpl) Message.Factory.create();
if (data != null) {
data.readerIndex(0);
protonMessage.decode(data.nioBuffer());
data.rewind();
protonMessage.decode(data.duplicate());
this._header = protonMessage.getHeader();
protonMessage.setHeader(null);
}
@ -162,7 +168,6 @@ public class AMQPMessage extends RefCountMessage {
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> getApplicationPropertiesMap() {
ApplicationProperties appMap = getApplicationProperties();
Map<String, Object> map = null;
@ -183,15 +188,15 @@ public class AMQPMessage extends RefCountMessage {
parseHeaders();
if (applicationProperties == null && appLocation >= 0) {
ByteBuffer buffer = getBuffer().nioBuffer();
ReadableBuffer buffer = data.duplicate();
buffer.position(appLocation);
TLSEncode.getDecoder().setByteBuffer(buffer);
TLSEncode.getDecoder().setBuffer(buffer);
Object section = TLSEncode.getDecoder().readObject();
if (section instanceof ApplicationProperties) {
this.applicationProperties = (ApplicationProperties) section;
}
this.appLocation = -1;
TLSEncode.getDecoder().setByteBuffer(null);
TLSEncode.getDecoder().setBuffer(null);
}
return applicationProperties;
@ -202,7 +207,7 @@ public class AMQPMessage extends RefCountMessage {
if (data == null) {
initalizeObjects();
} else {
partialDecode(data.nioBuffer());
partialDecode(data);
}
parsedHeaders = true;
}
@ -367,10 +372,9 @@ public class AMQPMessage extends RefCountMessage {
rejectedConsumers.add(consumer);
}
private synchronized void partialDecode(ByteBuffer buffer) {
private synchronized void partialDecode(ReadableBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setByteBuffer(buffer);
buffer.position(0);
decoder.setBuffer(buffer.rewind());
_header = null;
_deliveryAnnotations = null;
@ -449,6 +453,7 @@ public class AMQPMessage extends RefCountMessage {
}
} finally {
decoder.setByteBuffer(null);
data.position(0);
}
}
@ -456,14 +461,6 @@ public class AMQPMessage extends RefCountMessage {
return messageFormat;
}
public int getLength() {
return data.array().length;
}
public byte[] getArray() {
return data.array();
}
@Override
public void messageChanged() {
bufferValid = false;
@ -475,7 +472,7 @@ public class AMQPMessage extends RefCountMessage {
if (data == null) {
return null;
} else {
return Unpooled.wrappedBuffer(data);
return Unpooled.wrappedBuffer(data.byteBuffer());
}
}
@ -489,14 +486,15 @@ public class AMQPMessage extends RefCountMessage {
public org.apache.activemq.artemis.api.core.Message copy() {
checkBuffer();
byte[] origin = data.array();
byte[] newData = new byte[data.array().length - (messagePaylodStart - headerEnds)];
ReadableBuffer view = data.duplicate();
// Copy the original header
System.arraycopy(origin, 0, newData, 0, headerEnds);
byte[] newData = new byte[view.remaining() - (messagePaylodStart - headerEnds)];
// Copy the body following the delivery annotations if present
System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart);
view.position(0).limit(headerEnds);
view.get(newData, 0, headerEnds);
view.clear();
view.position(messagePaylodStart);
view.get(newData, headerEnds, view.remaining());
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
@ -679,7 +677,7 @@ public class AMQPMessage extends RefCountMessage {
getProtonMessage().encode(new NettyWritable(buffer));
byte[] bytes = new byte[buffer.writerIndex()];
buffer.readBytes(bytes);
this.data = Unpooled.wrappedBuffer(bytes);
this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(bytes));
} finally {
buffer.release();
}
@ -689,7 +687,7 @@ public class AMQPMessage extends RefCountMessage {
public int getEncodeSize() {
checkBuffer();
// + 20checkBuffer is an estimate for the Header with the deliveryCount
return data.array().length - messagePaylodStart + 20;
return data.remaining() - messagePaylodStart + 20;
}
@Override
@ -715,10 +713,12 @@ public class AMQPMessage extends RefCountMessage {
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
}
} else if (headerEnds > 0) {
buffer.writeBytes(data, 0, headerEnds);
buffer.writeBytes(data.duplicate().limit(headerEnds).byteBuffer());
}
buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
data.position(messagePaylodStart);
buffer.writeBytes(data.byteBuffer());
data.position(0);
}
/**
@ -734,7 +734,7 @@ public class AMQPMessage extends RefCountMessage {
*
* @return a Netty ByteBuf containing the encoded bytes of this Message instance.
*/
public ByteBuf getSendBuffer(int deliveryCount) {
public ReadableBuffer getSendBuffer(int deliveryCount) {
checkBuffer();
if (deliveryCount > 1) {
@ -744,23 +744,28 @@ public class AMQPMessage extends RefCountMessage {
} else {
// Common case message has no delivery annotations and this is the first delivery
// so no re-encoding or section skipping needed.
return data.retainedDuplicate();
return data.duplicate();
}
}
private ByteBuf createCopyWithoutDeliveryAnnotations() {
private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
assert headerEnds != messagePaylodStart;
// The original message had delivery annotations and so we must copy into a new
// buffer skipping the delivery annotations section as that is not meant to survive
// beyond this hop.
ReadableBuffer duplicate = data.duplicate();
final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
result.writeBytes(data, 0, headerEnds);
result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
return result;
result.writeBytes(duplicate.limit(headerEnds).byteBuffer());
duplicate.clear();
duplicate.position(messagePaylodStart);
result.writeBytes(duplicate.byteBuffer());
return new NettyReadable(result);
}
private ByteBuf createCopyWithNewDeliveryCount(int deliveryCount) {
private ReadableBuffer createCopyWithNewDeliveryCount(int deliveryCount) {
assert deliveryCount > 1;
final int amqpDeliveryCount = deliveryCount - 1;
@ -786,9 +791,11 @@ public class AMQPMessage extends RefCountMessage {
// This will skip any existing delivery annotations that might have been present
// in the original message.
result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
data.position(messagePaylodStart);
result.writeBytes(data.byteBuffer());
data.position(0);
return result;
return new NettyReadable(result);
}
public TypedProperties createExtraProperties() {
@ -1222,14 +1229,18 @@ public class AMQPMessage extends RefCountMessage {
}
private int internalPersistSize() {
return data.array().length;
return data.remaining();
}
@Override
public void persist(ActiveMQBuffer targetRecord) {
checkBuffer();
targetRecord.writeInt(internalPersistSize());
targetRecord.writeBytes(data.array(), 0, data.array().length );
if (data.hasArray()) {
targetRecord.writeBytes(data.array(), data.arrayOffset(), data.remaining());
} else {
targetRecord.writeBytes(data.byteBuffer());
}
}
@Override
@ -1238,7 +1249,7 @@ public class AMQPMessage extends RefCountMessage {
byte[] recordArray = new byte[size];
record.readBytes(recordArray);
this.messagePaylodStart = 0; // whatever was persisted will be sent
this.data = Unpooled.wrappedBuffer(recordArray);
this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
this.bufferValid = true;
this.durable = true; // it's coming from the journal, so it's durable
parseHeaders();

View File

@ -71,6 +71,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
@ -441,7 +442,7 @@ public class AMQPSessionCallback implements SessionCallback {
final Delivery delivery,
SimpleString address,
int messageFormat,
byte[] data) throws Exception {
ReadableBuffer data) throws Exception {
AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
if (address != null) {
message.setAddress(address);

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
@ -25,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@ -52,11 +57,7 @@ import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
import io.netty.buffer.ByteBuf;
public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
@ -118,12 +119,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
transport.setChannelMax(channelMax);
transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize());
transport.setMaxFrameSize(maxFrameSize);
transport.setOutboundFrameSizeLimit(maxFrameSize);
if (!isIncomingConnection && saslClientFactory != null) {
handler.createClientSASL();
}
}
public void scheduledFlush() {
handler.scheduledFlush();
}

View File

@ -40,6 +40,7 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
@ -221,10 +222,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
receiver = ((Receiver) delivery.getLink());
Transaction tx = null;
byte[] data;
data = new byte[delivery.available()];
receiver.recv(data, 0, data.length);
ReadableBuffer data = receiver.recv();
receiver.advance();
if (delivery.getRemoteState() instanceof TransactionalState) {

View File

@ -68,14 +68,13 @@ import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
/**
* TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
*/
@ -692,10 +691,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
// Let the Message decide how to present the message bytes
ByteBuf sendBuffer = message.getSendBuffer(deliveryCount);
ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
try {
int size = sendBuffer.writerIndex();
int size = sendBuffer.remaining();
while (!connection.tryLock(1, TimeUnit.SECONDS)) {
if (closed || sender.getLocalState() == EndpointState.CLOSED) {
@ -715,12 +714,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);
if (sendBuffer.hasArray()) {
// this will avoid a copy.. patch provided by Norman using buffer.array()
sender.send(sendBuffer.array(), sendBuffer.arrayOffset() + sendBuffer.readerIndex(), sendBuffer.readableBytes());
} else {
sender.send(new NettyReadable(sendBuffer));
}
sender.sendNoCopy(sendBuffer);
if (preSettle) {
// Presettled means the client implicitly accepts any delivery we send it.
@ -736,7 +730,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return size;
} finally {
sendBuffer.release();
if (sendBuffer instanceof NettyReadable) {
((NettyReadable) sendBuffer).getByteBuf().release();
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -14,15 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.util;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.codec.ReadableBuffer;
/**
* {@link ReadableBuffer} implementation that wraps a Netty {@link ByteBuf} to
* allow use of Netty buffers to be used when decoding AMQP messages.
*/
public class NettyReadable implements ReadableBuffer {
private static final Charset Charset_UTF8 = Charset.forName("UTF-8");
@ -33,9 +40,8 @@ public class NettyReadable implements ReadableBuffer {
this.buffer = buffer;
}
@Override
public void put(ReadableBuffer other) {
buffer.writeBytes(other.byteBuffer());
public ByteBuf getByteBuf() {
return this.buffer;
}
@Override
@ -93,7 +99,8 @@ public class NettyReadable implements ReadableBuffer {
@Override
public ReadableBuffer flip() {
return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex()));
buffer.setIndex(0, buffer.readerIndex());
return this;
}
@Override
@ -136,4 +143,108 @@ public class NettyReadable implements ReadableBuffer {
public String readUTF8() {
return buffer.toString(Charset_UTF8);
}
@Override
public byte[] array() {
return buffer.array();
}
@Override
public int arrayOffset() {
return buffer.arrayOffset() + buffer.readerIndex();
}
@Override
public int capacity() {
return buffer.capacity();
}
@Override
public ReadableBuffer clear() {
buffer.setIndex(0, buffer.capacity());
return this;
}
@Override
public ReadableBuffer reclaimRead() {
return this;
}
@Override
public byte get(int index) {
return buffer.getByte(index);
}
@Override
public boolean hasArray() {
return buffer.hasArray();
}
@Override
public ReadableBuffer mark() {
buffer.markReaderIndex();
return this;
}
@Override
public String readString(CharsetDecoder decoder) throws CharacterCodingException {
return buffer.toString(decoder.charset());
}
@Override
public ReadableBuffer reset() {
buffer.resetReaderIndex();
return this;
}
@Override
public ReadableBuffer rewind() {
buffer.setIndex(0, buffer.writerIndex());
return this;
}
@Override
public ReadableBuffer get(WritableBuffer target) {
int start = target.position();
if (buffer.hasArray()) {
target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
} else {
target.put(buffer.nioBuffer());
}
int written = target.position() - start;
buffer.readerIndex(buffer.readerIndex() + written);
return this;
}
@Override
public String toString() {
return buffer.toString();
}
@Override
public int hashCode() {
return buffer.hashCode();
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof ReadableBuffer)) {
return false;
}
ReadableBuffer readable = (ReadableBuffer) other;
if (this.remaining() != readable.remaining()) {
return false;
}
return buffer.nioBuffer().equals(readable.byteBuffer());
}
}

View File

@ -18,13 +18,15 @@ package org.apache.activemq.artemis.protocol.amqp.util;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
/**
* This is to use NettyBuffer within Proton
*/
import io.netty.buffer.ByteBuf;
/**
* {@link WritableBuffer} implementation that wraps a Netty {@link ByteBuf} to
* allow use of Netty buffers to be used when encoding AMQP messages.
*/
public class NettyWritable implements WritableBuffer {
final ByteBuf nettyBuffer;
@ -33,6 +35,10 @@ public class NettyWritable implements WritableBuffer {
this.nettyBuffer = nettyBuffer;
}
public ByteBuf getByteBuf() {
return nettyBuffer;
}
@Override
public void put(byte b) {
nettyBuffer.writeByte(b);
@ -75,7 +81,7 @@ public class NettyWritable implements WritableBuffer {
@Override
public int remaining() {
return nettyBuffer.capacity() - nettyBuffer.writerIndex();
return nettyBuffer.maxCapacity() - nettyBuffer.writerIndex();
}
@Override
@ -93,8 +99,23 @@ public class NettyWritable implements WritableBuffer {
nettyBuffer.writeBytes(payload);
}
public void put(ByteBuf payload) {
nettyBuffer.writeBytes(payload);
}
@Override
public int limit() {
return nettyBuffer.capacity();
}
@Override
public void put(ReadableBuffer buffer) {
if (buffer.hasArray()) {
nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
} else {
while (buffer.hasRemaining()) {
nettyBuffer.writeByte(buffer.get());
}
}
}
}

View File

@ -0,0 +1,454 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.util;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* Tests for the ReadableBuffer wrapper that uses Netty ByteBuf underneath
*/
public class NettyReadableTest {
@Test
public void testWrapBuffer() {
ByteBuf byteBuffer = Unpooled.buffer(100, 100);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(100, buffer.capacity());
assertSame(byteBuffer, buffer.getByteBuf());
assertSame(buffer, buffer.reclaimRead());
}
@Test
public void testArrayAccess() {
ByteBuf byteBuffer = Unpooled.buffer(100, 100);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertTrue(buffer.hasArray());
assertSame(buffer.array(), byteBuffer.array());
assertEquals(buffer.arrayOffset(), byteBuffer.arrayOffset());
}
@Test
public void testArrayAccessWhenNoArray() {
ByteBuf byteBuffer = Unpooled.directBuffer();
NettyReadable buffer = new NettyReadable(byteBuffer);
assertFalse(buffer.hasArray());
}
@Test
public void testByteBuffer() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ByteBuffer nioBuffer = buffer.byteBuffer();
assertEquals(data.length, nioBuffer.remaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], nioBuffer.get());
}
}
@Test
public void testGet() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
try {
buffer.get();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetIndex() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get(i));
}
assertTrue(buffer.hasRemaining());
}
@Test
public void testGetShort() {
byte[] data = new byte[] {0, 1};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(1, buffer.getShort());
assertFalse(buffer.hasRemaining());
try {
buffer.getShort();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetInt() {
byte[] data = new byte[] {0, 0, 0, 1};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(1, buffer.getInt());
assertFalse(buffer.hasRemaining());
try {
buffer.getInt();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetLong() {
byte[] data = new byte[] {0, 0, 0, 0, 0, 0, 0, 1};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(1, buffer.getLong());
assertFalse(buffer.hasRemaining());
try {
buffer.getLong();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetFloat() {
byte[] data = new byte[] {0, 0, 0, 0};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(0, buffer.getFloat(), 0.0);
assertFalse(buffer.hasRemaining());
try {
buffer.getFloat();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetDouble() {
byte[] data = new byte[] {0, 0, 0, 0, 0, 0, 0, 0};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(0, buffer.getDouble(), 0.0);
assertFalse(buffer.hasRemaining());
try {
buffer.getDouble();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetBytes() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
byte[] target = new byte[data.length];
buffer.get(target);
assertFalse(buffer.hasRemaining());
assertArrayEquals(data, target);
try {
buffer.get(target);
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetBytesIntInt() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
byte[] target = new byte[data.length];
buffer.get(target, 0, target.length);
assertFalse(buffer.hasRemaining());
assertArrayEquals(data, target);
try {
buffer.get(target, 0, target.length);
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testGetBytesToWritableBuffer() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
NettyWritable target = new NettyWritable(targetBuffer);
buffer.get(target);
assertFalse(buffer.hasRemaining());
assertArrayEquals(targetBuffer.array(), data);
}
@Test
public void testGetBytesToWritableBufferThatIsDirect() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length);
byteBuffer.writeBytes(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
NettyWritable target = new NettyWritable(targetBuffer);
buffer.get(target);
assertFalse(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], target.getByteBuf().readByte());
}
}
@Test
public void testDuplicate() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ReadableBuffer duplicate = buffer.duplicate();
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], duplicate.get());
}
assertFalse(duplicate.hasRemaining());
}
@Test
public void testSlice() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ReadableBuffer slice = buffer.slice();
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], slice.get());
}
assertFalse(slice.hasRemaining());
}
@Test
public void testLimit() {
byte[] data = new byte[] {1, 2};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(data.length, buffer.limit());
buffer.limit(1);
assertEquals(1, buffer.limit());
assertEquals(1, buffer.get());
assertFalse(buffer.hasRemaining());
try {
buffer.get();
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
}
@Test
public void testClear() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
byte[] target = new byte[data.length];
buffer.get(target);
assertFalse(buffer.hasRemaining());
assertArrayEquals(data, target);
try {
buffer.get(target);
fail("Should throw an IndexOutOfBoundsException");
} catch (IndexOutOfBoundsException ioe) {
}
buffer.clear();
assertTrue(buffer.hasRemaining());
assertEquals(data.length, buffer.remaining());
buffer.get(target);
assertFalse(buffer.hasRemaining());
assertArrayEquals(data, target);
}
@Test
public void testRewind() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
buffer.rewind();
assertTrue(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
}
@Test
public void testReset() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
buffer.mark();
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
buffer.reset();
assertTrue(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
}
@Test
public void testGetPosition() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(buffer.position(), 0);
for (int i = 0; i < data.length; i++) {
assertEquals(buffer.position(), i);
assertEquals(data[i], buffer.get());
assertEquals(buffer.position(), i + 1);
}
}
@Test
public void testSetPosition() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
buffer.position(0);
assertTrue(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
}
@Test
public void testFlip() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
buffer.mark();
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
assertFalse(buffer.hasRemaining());
buffer.flip();
assertTrue(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], buffer.get());
}
}
@Test
public void testReadUTF8() throws CharacterCodingException {
String testString = "test-string-1";
byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(testString, buffer.readUTF8());
}
@Test
public void testReadString() throws CharacterCodingException {
String testString = "test-string-1";
byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
NettyReadable buffer = new NettyReadable(byteBuffer);
assertEquals(testString, buffer.readString(StandardCharsets.UTF_8.newDecoder()));
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* Tests for behavior of NettyWritable
*/
public class NettyWritableTest {
@Test
public void testGetBuffer() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertSame(buffer, writable.getByteBuf());
}
@Test
public void testLimit() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(buffer.capacity(), writable.limit());
}
@Test
public void testRemaining() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(buffer.maxCapacity(), writable.remaining());
writable.put((byte) 0);
assertEquals(buffer.maxCapacity() - 1, writable.remaining());
}
@Test
public void testHasRemaining() {
ByteBuf buffer = Unpooled.buffer(100, 100);
NettyWritable writable = new NettyWritable(buffer);
assertTrue(writable.hasRemaining());
writable.put((byte) 0);
assertTrue(writable.hasRemaining());
buffer.writerIndex(buffer.maxCapacity());
assertFalse(writable.hasRemaining());
}
@Test
public void testGetPosition() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.put((byte) 0);
assertEquals(1, writable.position());
}
@Test
public void testSetPosition() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.position(1);
assertEquals(1, writable.position());
}
@Test
public void testPutByteBuffer() {
ByteBuffer input = ByteBuffer.allocate(1024);
input.put((byte) 1);
input.flip();
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.put(input);
assertEquals(1, writable.position());
}
@Test
public void testPutByteBuf() {
ByteBuf input = Unpooled.buffer();
input.writeByte((byte) 1);
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.put(input);
assertEquals(1, writable.position());
}
@Test
public void testPutReadableBuffer() {
doPutReadableBufferTestImpl(true);
doPutReadableBufferTestImpl(false);
}
private void doPutReadableBufferTestImpl(boolean readOnly) {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put((byte) 1);
buf.flip();
if (readOnly) {
buf = buf.asReadOnlyBuffer();
}
ReadableBuffer input = new ReadableBuffer.ByteBufferReader(buf);
if (readOnly) {
assertFalse("Expected buffer not to hasArray()", input.hasArray());
} else {
assertTrue("Expected buffer to hasArray()", input.hasArray());
}
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
assertEquals(0, writable.position());
writable.put(input);
assertEquals(1, writable.position());
}
}

View File

@ -92,10 +92,10 @@
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
<mockito.version>2.8.47</mockito.version>
<netty.version>4.1.22.Final</netty.version>
<proton.version>0.26.0</proton.version>
<proton.version>0.27.1</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
<qpid.jms.version>0.30.0</qpid.jms.version>
<qpid.jms.version>0.32.0</qpid.jms.version>
<johnzon.version>0.9.5</johnzon.version>
<json-p.spec.version>1.0-alpha-1</json-p.spec.version>
<javax.inject.version>1</javax.inject.version>

View File

@ -17,12 +17,15 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -46,11 +49,18 @@ import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpLargeMessageTest extends AmqpClientTestSupport {
private static final int FRAME_SIZE = 10024;
protected static final Logger LOG = LoggerFactory.getLogger(AmqpLargeMessageTest.class);
private final Random rand = new Random(System.currentTimeMillis());
private static final int FRAME_SIZE = 32767;
private static final int PAYLOAD = 110 * 1024;
String testQueueName = "ConnectionFrameSize";
@ -232,6 +242,89 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
receiveJMS(nMsgs, factory);
}
private byte[] createLargePayload(int sizeInBytes) {
byte[] payload = new byte[sizeInBytes];
for (int i = 0; i < sizeInBytes; i++) {
payload[i] = (byte) rand.nextInt(256);
}
LOG.debug("Created buffer with size : " + sizeInBytes + " bytes");
return payload;
}
@Test(timeout = 60000)
public void testSendSmallerMessages() throws Exception {
for (int i = 512; i <= (8 * 1024); i += 512) {
doTestSendLargeMessage(i);
}
}
@Test(timeout = 120000)
public void testSendFixedSizedMessages() throws Exception {
doTestSendLargeMessage(65536);
doTestSendLargeMessage(65536 * 2);
doTestSendLargeMessage(65536 * 4);
}
@Test(timeout = 120000)
public void testSend1MBMessage() throws Exception {
doTestSendLargeMessage(1024 * 1024);
}
@Ignore("Useful for performance testing")
@Test(timeout = 120000)
public void testSend10MBMessage() throws Exception {
doTestSendLargeMessage(1024 * 1024 * 10);
}
@Ignore("Useful for performance testing")
@Test(timeout = 120000)
public void testSend100MBMessage() throws Exception {
doTestSendLargeMessage(1024 * 1024 * 100);
}
public void doTestSendLargeMessage(int expectedSize) throws Exception {
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
byte[] payload = createLargePayload(expectedSize);
assertEquals(expectedSize, payload.length);
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
try (Connection connection = factory.createConnection()) {
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Set this to non-default to get a Header in the encoded message.
producer.setPriority(4);
producer.send(message);
long endTime = System.currentTimeMillis();
LOG.info("Returned from send after {} ms", endTime - startTime);
startTime = System.currentTimeMillis();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
LOG.info("Calling receive");
Message received = consumer.receive();
assertNotNull(received);
assertTrue(received instanceof BytesMessage);
BytesMessage bytesMessage = (BytesMessage) received;
assertNotNull(bytesMessage);
endTime = System.currentTimeMillis();
LOG.info("Returned from receive after {} ms", endTime - startTime);
byte[] bytesReceived = new byte[expectedSize];
assertEquals(expectedSize, bytesMessage.readBytes(bytesReceived, expectedSize));
assertTrue(Arrays.equals(payload, bytesReceived));
connection.close();
}
}
private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();