diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index a3dae25d0e..f817ed4420 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton.transaction; +import java.nio.ByteBuffer; + import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler; -import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -32,6 +33,7 @@ import org.apache.qpid.proton.amqp.transaction.Discharge; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; @@ -48,6 +50,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { final AMQPSessionCallback sessionSPI; final AMQPConnectionContext connection; + private final ByteBuffer DECODE_BUFFER = ByteBuffer.allocate(64); + public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) { this.sessionSPI = sessionSPI; this.connection = connection; @@ -65,7 +69,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { return; } - byte[] buffer; + ByteBuffer buffer; + MessageImpl msg; synchronized (connection.getLock()) { // Replenish coordinator receiver credit on exhaustion so sender can continue @@ -74,15 +79,23 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { receiver.flow(amqpCredit); } - buffer = new byte[delivery.available()]; - receiver.recv(buffer, 0, buffer.length); + // Declare is generally 7 bytes and discharge is around 48 depending on the + // encoded size of the TXN ID. Decode buffer has a bit of extra space but if + // the incoming request is to big just use a scratch buffer. + if (delivery.available() > DECODE_BUFFER.capacity()) { + buffer = ByteBuffer.allocate(delivery.available()); + } else { + buffer = (ByteBuffer) DECODE_BUFFER.clear(); + } + + // Update Buffer for the next incoming command. + buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity())); + receiver.advance(); + + msg = decodeMessage(buffer); } - - - MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer); - Object action = ((AmqpValue) msg.getBody()).getValue(); if (action instanceof Declare) { @@ -133,6 +146,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } } + @Override + public void onFlow(int credits, boolean drain) { + } + + @Override + public void close(boolean linkRemoteClose) throws ActiveMQAMQPException { + } + + @Override + public void close(ErrorCondition condition) throws ActiveMQAMQPException { + } + private Rejected createRejected(Symbol amqpError, String message) { Rejected rejected = new Rejected(); ErrorCondition condition = new ErrorCondition(); @@ -142,17 +167,9 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { return rejected; } - @Override - public void onFlow(int credits, boolean drain) { - } - - @Override - public void close(boolean linkRemoteClose) throws ActiveMQAMQPException { - // no op - } - - @Override - public void close(ErrorCondition condition) throws ActiveMQAMQPException { - // no op + private MessageImpl decodeMessage(ByteBuffer encoded) { + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.decode(encoded); + return message; } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java deleted file mode 100644 index 4267b85b28..0000000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.util; - -import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.impl.MessageImpl; - -public class DeliveryUtil { - - public static MessageImpl decodeMessageImpl(byte[] data) { - MessageImpl message = (MessageImpl) Message.Factory.create(); - message.decode(data, 0, data.length); - return message; - } - -}