diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 4b26d79185..0add7b7ae5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -567,8 +567,7 @@ public class AMQPSessionCallback implements SessionCallback { ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); try { - invokeOutgoing((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()); - return plugSender.deliverMessage(ref, deliveryCount); + return plugSender.deliverMessage(ref, deliveryCount, transportConnection); } catch (Exception e) { connection.lock(); try { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index f2f94d55c9..6dcf41a2c1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; @@ -47,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransa import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; +import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; @@ -657,13 +659,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr /** * handle an out going message from ActiveMQ Artemis, send via the Proton Sender */ - public int deliverMessage(MessageReference messageReference, int deliveryCount) throws Exception { - AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage()); + public int deliverMessage(MessageReference messageReference, int deliveryCount, Connection transportConnection) throws Exception { if (closed) { return 0; } + AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage()); + sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()); + // presettle means we can settle the message on the dealer side before we send it, i.e. // for browsers boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;