This commit is contained in:
Clebert Suconic 2020-04-07 23:11:01 -04:00
commit 7febd8c49d
2 changed files with 11 additions and 33 deletions

View File

@ -629,7 +629,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// this can happen in the twice ack mode, that is the receiver accepts and settles separately
// acking again would show an exception but would have no negative effect but best to handle anyway.
if (!delivery.isSettled()) {
inSessionACK(delivery, message);
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
delivery.settle();
}
} else {
handleExtendedDeliveryOutcomes(message, delivery, remoteState);
@ -644,37 +653,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
private void inSessionACK(Delivery delivery, Message message) throws ActiveMQAMQPIllegalStateException {
OperationContext oldContext = sessionSPI.recoverContext();
try {
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
log.warn(e.toString(), e);
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
}
sessionSPI.afterIO(new IOCallback() {
@Override
public void done() {
connection.runLater(() -> {
delivery.settle();
connection.instantFlush();
});
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
} finally {
sessionSPI.resetContext(oldContext);
}
}
private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
boolean settleImmediate = true;
boolean handled = true;

View File

@ -131,7 +131,7 @@ public class JMSSelectorTest extends JMSClientTestSupport {
TextMessage msg = (TextMessage) consumer.receive(1000);
assertNotNull(msg);
assertEquals("how are you", msg.getText());
assertNull(consumer.receive(1000));
assertNull(consumer.receiveNoWait());
consumer.close();
}
}