mirror of https://github.com/apache/activemq.git
Applied and tested, all tests still passing after this change.
This commit is contained in:
parent
7ca25965db
commit
c70d75213e
|
@ -601,7 +601,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
|
|
||||||
LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId());
|
LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId());
|
||||||
|
|
||||||
DeliveryState remoteState = delivery.getRemoteState();
|
final DeliveryState remoteState = delivery.getRemoteState();
|
||||||
if (remoteState != null && remoteState instanceof TransactionalState) {
|
if (remoteState != null && remoteState instanceof TransactionalState) {
|
||||||
TransactionalState s = (TransactionalState) remoteState;
|
TransactionalState s = (TransactionalState) remoteState;
|
||||||
long txid = toLong(s.getTxnId());
|
long txid = toLong(s.getTxnId());
|
||||||
|
@ -639,7 +639,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
receiver.flow(prefetch - receiver.getCredit());
|
receiver.flow(prefetch - receiver.getCredit());
|
||||||
}
|
}
|
||||||
|
|
||||||
delivery.disposition(Accepted.getInstance());
|
if (remoteState != null && remoteState instanceof TransactionalState) {
|
||||||
|
TransactionalState txAccepted = new TransactionalState();
|
||||||
|
txAccepted.setOutcome(Accepted.getInstance());
|
||||||
|
txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
|
||||||
|
|
||||||
|
delivery.disposition(txAccepted);
|
||||||
|
} else {
|
||||||
|
delivery.disposition(Accepted.getInstance());
|
||||||
|
}
|
||||||
|
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue