diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index caf20830c6..af1e331a10 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -601,7 +601,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId()); - DeliveryState remoteState = delivery.getRemoteState(); + final DeliveryState remoteState = delivery.getRemoteState(); if (remoteState != null && remoteState instanceof TransactionalState) { TransactionalState s = (TransactionalState) remoteState; long txid = toLong(s.getTxnId()); @@ -639,7 +639,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { receiver.flow(prefetch - receiver.getCredit()); } - delivery.disposition(Accepted.getInstance()); + if (remoteState != null && remoteState instanceof TransactionalState) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId()); + + delivery.disposition(txAccepted); + } else { + delivery.disposition(Accepted.getInstance()); + } + delivery.settle(); }