From 858a8240f97f7cd2fc3bca052ff74a97c0bf886c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 7 Apr 2020 22:43:37 -0400 Subject: [PATCH] ARTEMIS-2659 / ARTEMIS-2673 Fix AmqpSendReceiveTest.testAcceptWithoutSettling --- .../proton/ProtonServerSenderContext.java | 42 +++++-------------- .../integration/amqp/JMSSelectorTest.java | 2 +- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index b9385bc3e8..24d0bcbe18 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -629,7 +629,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // this can happen in the twice ack mode, that is the receiver accepts and settles separately // acking again would show an exception but would have no negative effect but best to handle anyway. if (!delivery.isSettled()) { - inSessionACK(delivery, message); + // we have to individual ack as we can't guarantee we will get the delivery updates + // (including acks) in order from dealer, a performance hit but a must + try { + sessionSPI.ack(null, brokerConsumer, message); + } catch (Exception e) { + log.warn(e.toString(), e); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } + + delivery.settle(); } } else { handleExtendedDeliveryOutcomes(message, delivery, remoteState); @@ -644,37 +653,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - private void inSessionACK(Delivery delivery, Message message) throws ActiveMQAMQPIllegalStateException { - OperationContext oldContext = sessionSPI.recoverContext(); - try { - // we have to individual ack as we can't guarantee we will get the delivery updates - // (including acks) in order from dealer, a performance hit but a must - try { - sessionSPI.ack(null, brokerConsumer, message); - } catch (Exception e) { - log.warn(e.toString(), e); - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); - } - - sessionSPI.afterIO(new IOCallback() { - @Override - public void done() { - connection.runLater(() -> { - delivery.settle(); - connection.instantFlush(); - }); - } - - @Override - public void onError(int errorCode, String errorMessage) { - - } - }); - } finally { - sessionSPI.resetContext(oldContext); - } - } - private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException { boolean settleImmediate = true; boolean handled = true; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java index c61898f47c..56c83d234d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java @@ -131,7 +131,7 @@ public class JMSSelectorTest extends JMSClientTestSupport { TextMessage msg = (TextMessage) consumer.receive(1000); assertNotNull(msg); assertEquals("how are you", msg.getText()); - assertNull(consumer.receive(1000)); + assertNull(consumer.receiveNoWait()); consumer.close(); } }