diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 30dd10aeea..cdd1362d43 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -205,23 +205,20 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } /* - * called when Proton receives a message to be delivered via a Delivery. - * - * This may be called more than once per deliver so we have to cache the buffer until we have received it all. - * - * */ + * called when Proton receives a message to be delivered via a Delivery. + * + * This may be called more than once per deliver so we have to cache the buffer until we have received it all. + */ @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { - Receiver receiver; try { + Receiver receiver = ((Receiver) delivery.getLink()); - if (!delivery.isReadable()) { + if (receiver.current() != delivery) { return; } if (delivery.isAborted()) { - receiver = ((Receiver) delivery.getLink()); - // Aborting implicitly remotely settles, so advance // receiver to the next delivery and settle locally. receiver.advance(); @@ -233,16 +230,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } return; - } - - if (delivery.isPartial()) { + } else if (delivery.isPartial()) { return; } - receiver = ((Receiver) delivery.getLink()); - Transaction tx = null; - ReadableBuffer data = receiver.recv(); receiver.advance(); @@ -267,13 +259,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements condition.setDescription(e.getMessage()); rejected.setError(condition); - connection.lock(); - try { - delivery.disposition(rejected); - delivery.settle(); - } finally { - connection.unlock(); - } + + delivery.disposition(rejected); + delivery.settle(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 0b40ee2140..24dcff03aa 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -87,6 +87,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private static final Symbol SHARED = Symbol.valueOf("shared"); private static final Symbol GLOBAL = Symbol.valueOf("global"); + private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback(); + private Consumer brokerConsumer; protected final AMQPSessionContext protonSession; @@ -101,6 +103,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private boolean shared = false; private boolean global = false; private boolean isVolatile = false; + private boolean preSettle; private SimpleString tempQueueName; public ProtonServerSenderContext(AMQPConnectionContext connection, @@ -417,6 +420,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } + // Detect if sender is in pre-settle mode. + preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; + // We need to update the source with any filters we support otherwise the client // is free to consider the attach as having failed if we don't send back what we // do support or if we send something we don't support the client won't know we @@ -538,17 +544,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr try { Message message = ((MessageReference) delivery.getContext()).getMessage(); - - boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; - - DeliveryState remoteState; - - connection.lock(); - try { - remoteState = delivery.getRemoteState(); - } finally { - connection.unlock(); - } + DeliveryState remoteState = delivery.getRemoteState(); boolean settleImmediate = true; if (remoteState instanceof Accepted) { @@ -558,8 +554,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return; } // we have to individual ack as we can't guarantee we will get the delivery updates - // (including acks) in order - // from dealer, a perf hit but a must + // (including acks) in order from dealer, a performance hit but a must try { sessionSPI.ack(null, brokerConsumer, message); } catch (Exception e) { @@ -580,16 +575,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr TransactionalState txAccepted = new TransactionalState(); txAccepted.setOutcome(Accepted.getInstance()); txAccepted.setTxnId(txState.getTxnId()); - connection.lock(); - try { - delivery.disposition(txAccepted); - } finally { - connection.unlock(); - } + delivery.disposition(txAccepted); } // we have to individual ack as we can't guarantee we will get the delivery - // updates (including acks) in order - // from dealer, a perf hit but a must + // (including acks) in order from dealer, a performance hit but a must try { sessionSPI.ack(tx, brokerConsumer, message); tx.addDelivery(delivery, this); @@ -636,26 +625,27 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } if (settleImmediate) { - settle(delivery); + delivery.settle(); } } finally { - sessionSPI.afterIO(new IOCallback() { - @Override - public void done() { - connection.flush(); - } - - @Override - public void onError(int errorCode, String errorMessage) { - connection.flush(); - } - }); - + sessionSPI.afterIO(connectionFlusher); sessionSPI.resetContext(oldContext); } } + private final class ConnectionFlushIOCallback implements IOCallback { + @Override + public void done() { + connection.flush(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + connection.flush(); + } + } + public void settle(Delivery delivery) { connection.lock(); try { @@ -681,16 +671,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage()); sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()); - // presettle means we can settle the message on the dealer side before we send it, i.e. - // for browsers - boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; - // we only need a tag if we are going to settle later byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); // Let the Message decide how to present the message bytes - boolean attemptRelease = true; ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount); + boolean releaseRequired = sendBuffer instanceof NettyReadable; try { int size = sendBuffer.remaining(); @@ -713,14 +699,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr delivery.setMessageFormat((int) message.getMessageFormat()); delivery.setContext(messageReference); - if (sendBuffer instanceof NettyReadable) { + if (releaseRequired) { sender.send(sendBuffer); // Above send copied, so release now if needed - attemptRelease = false; + releaseRequired = false; ((NettyReadable) sendBuffer).getByteBuf().release(); } else { // Don't have pooled content, no need to release or copy. - attemptRelease = false; sender.sendNoCopy(sendBuffer); } @@ -731,6 +716,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else { sender.advance(); } + connection.flush(); } finally { connection.unlock(); @@ -738,7 +724,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return size; } finally { - if (attemptRelease && sendBuffer instanceof NettyReadable) { + if (releaseRequired) { ((NettyReadable) sendBuffer).getByteBuf().release(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 38ca7a7978..694c1d36e7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -243,24 +243,9 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { int capacity = transport.capacity(); if (!receivedFirstPacket) { - try { - byte auth = buffer.getByte(4); - if (auth == SASL || auth == BARE) { - if (isServer) { - dispatchAuth(auth == SASL); - } else if (auth == BARE && clientSASLMechanism == null) { - dispatchAuthSuccess(); - } - /* - * there is a chance that if SASL Handshake has been carried out that the capacity may change. - * */ - capacity = transport.capacity(); - } - } catch (Throwable e) { - log.warn(e.getMessage(), e); - } - - receivedFirstPacket = true; + handleFirstPacket(buffer); + // there is a chance that if SASL Handshake has been carried out that the capacity may change. + capacity = transport.capacity(); } if (capacity > 0) { @@ -537,4 +522,21 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { sasl.client(); sasl.setListener(this); } + + private void handleFirstPacket(ByteBuf buffer) { + try { + byte auth = buffer.getByte(4); + if (auth == SASL || auth == BARE) { + if (isServer) { + dispatchAuth(auth == SASL); + } else if (auth == BARE && clientSASLMechanism == null) { + dispatchAuthSuccess(); + } + } + } catch (Throwable e) { + log.warn(e.getMessage(), e); + } + + receivedFirstPacket = true; + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 28573e0664..9ccc1964e3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -73,33 +73,28 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { ByteBuffer buffer; MessageImpl msg; - connection.lock(); - try { - // Replenish coordinator receiver credit on exhaustion so sender can continue - // transaction declare and discahrge operations. - if (receiver.getCredit() < amqpLowMark) { - receiver.flow(amqpCredit); - } - - // Declare is generally 7 bytes and discharge is around 48 depending on the - // encoded size of the TXN ID. Decode buffer has a bit of extra space but if - // the incoming request is to big just use a scratch buffer. - if (delivery.available() > DECODE_BUFFER.capacity()) { - buffer = ByteBuffer.allocate(delivery.available()); - } else { - buffer = (ByteBuffer) DECODE_BUFFER.clear(); - } - - // Update Buffer for the next incoming command. - buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity())); - - receiver.advance(); - - msg = decodeMessage(buffer); - } finally { - connection.unlock(); + // Replenish coordinator receiver credit on exhaustion so sender can continue + // transaction declare and discahrge operations. + if (receiver.getCredit() < amqpLowMark) { + receiver.flow(amqpCredit); } + // Declare is generally 7 bytes and discharge is around 48 depending on the + // encoded size of the TXN ID. Decode buffer has a bit of extra space but if + // the incoming request is to big just use a scratch buffer. + if (delivery.available() > DECODE_BUFFER.capacity()) { + buffer = ByteBuffer.allocate(delivery.available()); + } else { + buffer = (ByteBuffer) DECODE_BUFFER.clear(); + } + + // Update Buffer for the next incoming command. + buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity())); + + receiver.advance(); + + msg = decodeMessage(buffer); + Object action = ((AmqpValue) msg.getBody()).getValue(); if (action instanceof Declare) { Binary txID = sessionSPI.newTransaction(); @@ -160,23 +155,13 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } } catch (ActiveMQAMQPException amqpE) { log.warn(amqpE.getMessage(), amqpE); - connection.lock(); - try { - delivery.settle(); - delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); - } finally { - connection.unlock(); - } + delivery.settle(); + delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); connection.flush(); } catch (Throwable e) { log.warn(e.getMessage(), e); - connection.lock(); - try { - delivery.settle(); - delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); - } finally { - connection.unlock(); - } + delivery.settle(); + delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); connection.flush(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java index 88dfe3a9d3..a157ef1435 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java @@ -49,17 +49,19 @@ public class ProtonServerReceiverContextTest { ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver); Delivery mockDelivery = mock(Delivery.class); - when(mockDelivery.isReadable()).thenReturn(true); when(mockDelivery.isAborted()).thenReturn(true); when(mockDelivery.isPartial()).thenReturn(true); when(mockDelivery.getLink()).thenReturn(mockReceiver); + when(mockReceiver.current()).thenReturn(mockDelivery); + if (drain) { when(mockReceiver.getDrain()).thenReturn(true); } rc.onMessage(mockDelivery); + verify(mockReceiver, times(1)).current(); verify(mockReceiver, times(1)).advance(); verify(mockDelivery, times(1)).settle();