Wait on broker response for async broker commands.
This commit is contained in:
Timothy Bish 2016-06-29 20:13:34 -04:00
parent 4e23adfcc9
commit 83827f2770
3 changed files with 44 additions and 22 deletions

View File

@ -90,10 +90,16 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
@Override
public void close() {
if (!isClosed() && isOpened()) {
sendToActiveMQ(new RemoveInfo(getProducerId()));
}
sendToActiveMQ(new RemoveInfo(getProducerId()), new ResponseHandler() {
super.close();
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
AmqpReceiver.super.close();
}
});
} else {
super.close();
}
}
//----- Configuration accessors ------------------------------------------//

View File

@ -118,12 +118,18 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand);
session.unregisterSender(getConsumerId());
sendToActiveMQ(removeCommand, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
session.unregisterSender(getConsumerId());
AmqpSender.super.detach();
}
});
} else {
super.detach();
}
super.detach();
}
@Override
@ -131,21 +137,27 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand);
if (consumerInfo.isDurable()) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(session.getConnection().getConnectionId());
rsi.setSubscriptionName(getEndpoint().getName());
rsi.setClientId(session.getConnection().getClientId());
sendToActiveMQ(removeCommand, new ResponseHandler() {
sendToActiveMQ(rsi);
}
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (consumerInfo.isDurable()) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(session.getConnection().getConnectionId());
rsi.setSubscriptionName(getEndpoint().getName());
rsi.setClientId(session.getConnection().getClientId());
session.unregisterSender(getConsumerId());
sendToActiveMQ(rsi);
}
session.unregisterSender(getConsumerId());
AmqpSender.super.close();
}
});
} else {
super.close();
}
super.close();
}
@Override

View File

@ -108,11 +108,15 @@ public class AmqpSession implements AmqpResource {
public void close() {
LOG.debug("Session {} closed", getSessionId());
getEndpoint().setContext(null);
getEndpoint().close();
getEndpoint().free();
connection.sendToActiveMQ(new RemoveInfo(getSessionId()), new ResponseHandler() {
connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
getEndpoint().setContext(null);
getEndpoint().close();
getEndpoint().free();
}
});
}
/**