ARTEMIS-2659 / ARTEMIS-2673 Fix AmqpSendReceiveTest.testAcceptWithoutSettling

This commit is contained in:
Clebert Suconic 2020-04-07 22:43:37 -04:00
parent b2342b626e
commit 858a8240f9
2 changed files with 11 additions and 33 deletions

View File

@ -629,7 +629,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// this can happen in the twice ack mode, that is the receiver accepts and settles separately // this can happen in the twice ack mode, that is the receiver accepts and settles separately
// acking again would show an exception but would have no negative effect but best to handle anyway. // acking again would show an exception but would have no negative effect but best to handle anyway.
if (!delivery.isSettled()) { if (!delivery.isSettled()) {
inSessionACK(delivery, message); // we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
delivery.settle();
} }
} else { } else {
handleExtendedDeliveryOutcomes(message, delivery, remoteState); handleExtendedDeliveryOutcomes(message, delivery, remoteState);
@ -644,37 +653,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
} }
private void inSessionACK(Delivery delivery, Message message) throws ActiveMQAMQPIllegalStateException {
OperationContext oldContext = sessionSPI.recoverContext();
try {
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
sessionSPI.afterIO(new IOCallback() {
@Override
public void done() {
connection.runLater(() -> {
delivery.settle();
connection.instantFlush();
});
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
} finally {
sessionSPI.resetContext(oldContext);
}
}
private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException { private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
boolean settleImmediate = true; boolean settleImmediate = true;
boolean handled = true; boolean handled = true;

View File

@ -131,7 +131,7 @@ public class JMSSelectorTest extends JMSClientTestSupport {
TextMessage msg = (TextMessage) consumer.receive(1000); TextMessage msg = (TextMessage) consumer.receive(1000);
assertNotNull(msg); assertNotNull(msg);
assertEquals("how are you", msg.getText()); assertEquals("how are you", msg.getText());
assertNull(consumer.receive(1000)); assertNull(consumer.receiveNoWait());
consumer.close(); consumer.close();
} }
} }