This commit is contained in:
Clebert Suconic 2020-05-04 15:40:08 -04:00
commit 8e0fb3ebed
4 changed files with 36 additions and 12 deletions

View File

@ -544,8 +544,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
// We scheduled it for later, as that will work through anything that's pending on the current deliveries. // We scheduled it for later, as that will work through anything that's pending on the current deliveries.
runNow(() -> { runNow(() -> {
link.close();
link.free();
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) { if (linkContext != null) {
@ -555,7 +553,15 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
} }
flush();
/// we have to perform the link.close after the linkContext.close is finished.
// linkeContext.close will perform a few executions on the netty loop,
// this has to come next
runLater(() -> {
link.close();
link.free();
flush();
});
}); });
} }

View File

@ -569,8 +569,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
*/ */
@Override @Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
// we need to mark closed first to make sure no more adds are accepted
closed = true;
// MessageReferences are sent to the Connection executor (Netty Loop)
// as a result the returning references have to be done later after they
// had their chance to finish and clear the runnable
connection.runLater(() -> {
try {
internalClose(remoteLinkClose);
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
});
}
private void internalClose(boolean remoteLinkClose) throws ActiveMQAMQPException {
try { try {
closed = true;
protonSession.removeSender(sender); protonSession.removeSender(sender);
sessionSPI.closeSender(brokerConsumer); sessionSPI.closeSender(brokerConsumer);
// if this is a link close rather than a connection close or detach, we need to delete // if this is a link close rather than a connection close or detach, we need to delete
@ -836,6 +851,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
void resume() { void resume() {
connection.runNow(this::deliver); connection.runNow(this::deliver);
} }
void deliver() { void deliver() {
// This is discounting some bytes due to Transfer payload // This is discounting some bytes due to Transfer payload
@ -994,11 +1010,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
private static SimpleString createQueueName(boolean useCoreSubscriptionNaming, private static SimpleString createQueueName(boolean useCoreSubscriptionNaming,
String clientId, String clientId,
String pubId, String pubId,
boolean shared, boolean shared,
boolean global, boolean global,
boolean isVolatile) { boolean isVolatile) {
if (useCoreSubscriptionNaming) { if (useCoreSubscriptionNaming) {
final boolean durable = !isVolatile; final boolean durable = !isVolatile;
final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId; final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId;

View File

@ -114,7 +114,10 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
@Override @Override
public void onDelivery(Consumer<? super MessageReference> onDelivery) { public void onDelivery(Consumer<? super MessageReference> onDelivery) {
assert this.onDelivery == null; // I am keeping this commented out as a documentation feature:
// a Message reference may eventually be taken back before the connection.run was finished.
// as a result it may be possible to have this.onDelivery != null here due to cancellations.
// assert this.onDelivery == null;
this.onDelivery = onDelivery; this.onDelivery = onDelivery;
} }

View File

@ -50,14 +50,13 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message); sender.send(message);
sender.close(); sender.close();
Wait.assertEquals(1, queueView::getMessageCount);
// Now try and get the message // Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName()); AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1); receiver.flow(1);
AmqpMessage received = receiver.receiveNoWait(); AmqpMessage received = receiver.receiveNoWait();
assertNull(received); assertNull(received);
Wait.assertEquals(0, queueView::getMessageCount);
Wait.assertEquals(1, queueView::getMessagesExpired); Wait.assertEquals(1, queueView::getMessagesExpired);
connection.close(); connection.close();