diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java index 07abb42c70..3ae018eab7 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java @@ -90,10 +90,16 @@ public class AmqpReceiver extends AmqpAbstractReceiver { @Override public void close() { if (!isClosed() && isOpened()) { - sendToActiveMQ(new RemoveInfo(getProducerId())); - } + sendToActiveMQ(new RemoveInfo(getProducerId()), new ResponseHandler() { - super.close(); + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + AmqpReceiver.super.close(); + } + }); + } else { + super.close(); + } } //----- Configuration accessors ------------------------------------------// diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 9fb85a39e0..12bd6277d5 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -118,12 +118,18 @@ public class AmqpSender extends AmqpAbstractLink { if (!isClosed() && isOpened()) { RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); - sendToActiveMQ(removeCommand); - session.unregisterSender(getConsumerId()); + sendToActiveMQ(removeCommand, new ResponseHandler() { + + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + session.unregisterSender(getConsumerId()); + AmqpSender.super.detach(); + } + }); + } else { + super.detach(); } - - super.detach(); } @Override @@ -131,21 +137,27 @@ public class AmqpSender extends AmqpAbstractLink { if (!isClosed() && isOpened()) { RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); - sendToActiveMQ(removeCommand); - if (consumerInfo.isDurable()) { - RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); - rsi.setConnectionId(session.getConnection().getConnectionId()); - rsi.setSubscriptionName(getEndpoint().getName()); - rsi.setClientId(session.getConnection().getClientId()); + sendToActiveMQ(removeCommand, new ResponseHandler() { - sendToActiveMQ(rsi); - } + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + if (consumerInfo.isDurable()) { + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); + rsi.setConnectionId(session.getConnection().getConnectionId()); + rsi.setSubscriptionName(getEndpoint().getName()); + rsi.setClientId(session.getConnection().getClientId()); - session.unregisterSender(getConsumerId()); + sendToActiveMQ(rsi); + } + + session.unregisterSender(getConsumerId()); + AmqpSender.super.close(); + } + }); + } else { + super.close(); } - - super.close(); } @Override diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index 20a8b9f480..c390b8cce5 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -108,11 +108,15 @@ public class AmqpSession implements AmqpResource { public void close() { LOG.debug("Session {} closed", getSessionId()); - getEndpoint().setContext(null); - getEndpoint().close(); - getEndpoint().free(); + connection.sendToActiveMQ(new RemoveInfo(getSessionId()), new ResponseHandler() { - connection.sendToActiveMQ(new RemoveInfo(getSessionId())); + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + getEndpoint().setContext(null); + getEndpoint().close(); + getEndpoint().free(); + } + }); } /**