This commit is contained in:
Clebert Suconic 2017-04-11 11:48:30 -04:00
commit f282dff57c
2 changed files with 37 additions and 50 deletions

View File

@ -16,11 +16,12 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton.transaction; package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Accepted;
@ -32,6 +33,7 @@ import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl; import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -48,6 +50,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
final AMQPSessionCallback sessionSPI; final AMQPSessionCallback sessionSPI;
final AMQPConnectionContext connection; final AMQPConnectionContext connection;
private final ByteBuffer DECODE_BUFFER = ByteBuffer.allocate(64);
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) { public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
this.sessionSPI = sessionSPI; this.sessionSPI = sessionSPI;
this.connection = connection; this.connection = connection;
@ -65,7 +69,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
return; return;
} }
byte[] buffer; ByteBuffer buffer;
MessageImpl msg;
synchronized (connection.getLock()) { synchronized (connection.getLock()) {
// Replenish coordinator receiver credit on exhaustion so sender can continue // Replenish coordinator receiver credit on exhaustion so sender can continue
@ -74,14 +79,22 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
receiver.flow(amqpCredit); receiver.flow(amqpCredit);
} }
buffer = new byte[delivery.available()]; // Declare is generally 7 bytes and discharge is around 48 depending on the
receiver.recv(buffer, 0, buffer.length); // encoded size of the TXN ID. Decode buffer has a bit of extra space but if
receiver.advance(); // the incoming request is to big just use a scratch buffer.
if (delivery.available() > DECODE_BUFFER.capacity()) {
buffer = ByteBuffer.allocate(delivery.available());
} else {
buffer = (ByteBuffer) DECODE_BUFFER.clear();
} }
// Update Buffer for the next incoming command.
buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity()));
receiver.advance();
MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer); msg = decodeMessage(buffer);
}
Object action = ((AmqpValue) msg.getBody()).getValue(); Object action = ((AmqpValue) msg.getBody()).getValue();
@ -133,6 +146,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
} }
} }
@Override
public void onFlow(int credits, boolean drain) {
}
@Override
public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
}
private Rejected createRejected(Symbol amqpError, String message) { private Rejected createRejected(Symbol amqpError, String message) {
Rejected rejected = new Rejected(); Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition(); ErrorCondition condition = new ErrorCondition();
@ -142,17 +167,9 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
return rejected; return rejected;
} }
@Override private MessageImpl decodeMessage(ByteBuffer encoded) {
public void onFlow(int credits, boolean drain) { MessageImpl message = (MessageImpl) Message.Factory.create();
} message.decode(encoded);
return message;
@Override
public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
// no op
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
// no op
} }
} }

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.util;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
public class DeliveryUtil {
public static MessageImpl decodeMessageImpl(byte[] data) {
MessageImpl message = (MessageImpl) Message.Factory.create();
message.decode(data, 0, data.length);
return message;
}
}