diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 505e229913..9bb3d70bd9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -544,8 +544,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH // We scheduled it for later, as that will work through anything that's pending on the current deliveries. runNow(() -> { - link.close(); - link.free(); ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); if (linkContext != null) { @@ -555,7 +553,15 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH log.error(e.getMessage(), e); } } - flush(); + + /// we have to perform the link.close after the linkContext.close is finished. + // linkeContext.close will perform a few executions on the netty loop, + // this has to come next + runLater(() -> { + link.close(); + link.free(); + flush(); + }); }); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ca3d418fc7..b951b0bee0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -569,8 +569,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr */ @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { + // we need to mark closed first to make sure no more adds are accepted + closed = true; + + // MessageReferences are sent to the Connection executor (Netty Loop) + // as a result the returning references have to be done later after they + // had their chance to finish and clear the runnable + connection.runLater(() -> { + try { + internalClose(remoteLinkClose); + } catch (Exception e) { + log.warn(e.getMessage(), e); + } + }); + } + + private void internalClose(boolean remoteLinkClose) throws ActiveMQAMQPException { try { - closed = true; protonSession.removeSender(sender); sessionSPI.closeSender(brokerConsumer); // if this is a link close rather than a connection close or detach, we need to delete @@ -836,6 +851,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr void resume() { connection.runNow(this::deliver); } + void deliver() { // This is discounting some bytes due to Transfer payload @@ -994,11 +1010,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } private static SimpleString createQueueName(boolean useCoreSubscriptionNaming, - String clientId, - String pubId, - boolean shared, - boolean global, - boolean isVolatile) { + String clientId, + String pubId, + boolean shared, + boolean global, + boolean isVolatile) { if (useCoreSubscriptionNaming) { final boolean durable = !isVolatile; final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index e9238fef60..dea1478e93 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -114,7 +114,10 @@ public class MessageReferenceImpl extends LinkedListImpl.Node onDelivery) { - assert this.onDelivery == null; + // I am keeping this commented out as a documentation feature: + // a Message reference may eventually be taken back before the connection.run was finished. + // as a result it may be possible to have this.onDelivery != null here due to cancellations. + // assert this.onDelivery == null; this.onDelivery = onDelivery; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index f360ab7f0a..121137f431 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -50,14 +50,13 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - Wait.assertEquals(1, queueView::getMessageCount); - // Now try and get the message AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); AmqpMessage received = receiver.receiveNoWait(); assertNull(received); + Wait.assertEquals(0, queueView::getMessageCount); Wait.assertEquals(1, queueView::getMessagesExpired); connection.close();