mirror of https://github.com/apache/activemq.git
Apply patch from gemmellr to ensure that TX messages accepted retain the TX state until commit.
This commit is contained in:
parent
5cd56e7fb1
commit
c5f183548e
|
@ -72,6 +72,7 @@ import org.apache.qpid.proton.amqp.Symbol;
|
|||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||
import org.apache.qpid.proton.amqp.messaging.Outcome;
|
||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||
import org.apache.qpid.proton.amqp.messaging.Released;
|
||||
import org.apache.qpid.proton.amqp.messaging.Target;
|
||||
|
@ -1148,12 +1149,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
|||
|
||||
if (state instanceof TransactionalState) {
|
||||
TransactionalState txState = (TransactionalState) state;
|
||||
if (txState.getOutcome() instanceof DeliveryState) {
|
||||
LOG.trace("onDelivery: TX delivery state = {}", state);
|
||||
state = (DeliveryState) txState.getOutcome();
|
||||
if (state instanceof Accepted) {
|
||||
LOG.trace("onDelivery: TX delivery state = {}", state);
|
||||
if (txState.getOutcome() != null) {
|
||||
Outcome outcome = txState.getOutcome();
|
||||
if (outcome instanceof Accepted) {
|
||||
if (!delivery.remotelySettled()) {
|
||||
delivery.disposition(new Accepted());
|
||||
TransactionalState txAccepted = new TransactionalState();
|
||||
txAccepted.setOutcome(Accepted.getInstance());
|
||||
txAccepted.setTxnId(((TransactionalState) state).getTxnId());
|
||||
|
||||
delivery.disposition(txAccepted);
|
||||
}
|
||||
settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue