diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index d5fc1966a3..f08c1fc4e2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.Arrays;
import java.util.List;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -28,7 +26,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
-import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@@ -134,7 +131,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
@Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
Receiver receiver;
- ByteBuf buffer = null;
try {
receiver = ((Receiver) delivery.getLink());
@@ -145,20 +141,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (delivery.isPartial()) {
return;
}
- // This should be used if getDataLength was avilable
-// byte[] data = new byte[delivery.getDataLength()];
- buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
Transaction tx = null;
+ byte[] data;
+
synchronized (connection.getLock()) {
- DeliveryUtil.readDelivery(receiver, buffer);
+ data = new byte[delivery.available()];
+ receiver.recv(data, 0, data.length);
receiver.advance();
}
- byte[] data = new byte[buffer.writerIndex()];
- buffer.readBytes(data);
-
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
@@ -179,10 +172,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
rejected.setError(condition);
delivery.disposition(rejected);
delivery.settle();
- } finally {
- if (buffer != null) {
- buffer.release();
- }
}
}
diff --git a/pom.xml b/pom.xml
index 37f6ca16bd..8c0553749b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@
3.6.9.Final
2.4
4.1.6.Final
- 0.16.0
+ 0.18.0
3.0.19.Final
1.7.21
0.20.0
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
index d9bddcbc6a..5545884245 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java
@@ -57,6 +57,11 @@ public class UnmodifiableDelivery implements Delivery {
return delivery.getDataLength();
} */
+ @Override
+ public int available() {
+ return delivery.available();
+ }
+
@Override
public DeliveryState getLocalState() {
return delivery.getLocalState();