AMQ-5738: Ensure the sender links for non-durable consumers also get unregistered, remove duplicate closed field, fix method names.

This commit is contained in:
Robert Gemmell 2015-04-22 10:17:38 +01:00
parent b313209aa2
commit 3a5f127d52
3 changed files with 10 additions and 11 deletions

View File

@ -563,11 +563,11 @@ public class AmqpConnection implements AmqpProtocolConverter {
//----- Utility methods for connection resources to use ------------------// //----- Utility methods for connection resources to use ------------------//
void regosterSender(ConsumerId consumerId, AmqpSender sender) { void registerSender(ConsumerId consumerId, AmqpSender sender) {
subscriptionsByConsumerId.put(consumerId, sender); subscriptionsByConsumerId.put(consumerId, sender);
} }
void unregosterSender(ConsumerId consumerId) { void unregisterSender(ConsumerId consumerId) {
subscriptionsByConsumerId.remove(consumerId); subscriptionsByConsumerId.remove(consumerId);
} }

View File

@ -80,7 +80,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private final ConsumerInfo consumerInfo; private final ConsumerInfo consumerInfo;
private final boolean presettle; private final boolean presettle;
private boolean closed;
private int currentCredit; private int currentCredit;
private boolean draining; private boolean draining;
private long lastDeliveredSequenceId; private long lastDeliveredSequenceId;
@ -108,8 +107,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
@Override @Override
public void open() { public void open() {
if (!closed) { if (!isClosed()) {
session.regosterSender(getConsumerId(), this); session.registerSender(getConsumerId(), this);
} }
super.open(); super.open();
@ -142,10 +141,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
rsi.setClientId(session.getConnection().getClientId()); rsi.setClientId(session.getConnection().getClientId());
sendToActiveMQ(rsi, null); sendToActiveMQ(rsi, null);
}
session.unregisterSender(getConsumerId()); session.unregisterSender(getConsumerId());
} }
}
super.close(); super.close();
} }
@ -350,7 +349,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
//----- Internal Implementation ------------------------------------------// //----- Internal Implementation ------------------------------------------//
public void pumpOutbound() throws Exception { public void pumpOutbound() throws Exception {
while (!closed) { while (!isClosed()) {
while (currentBuffer != null) { while (currentBuffer != null) {
int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length); int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
if (sent > 0) { if (sent > 0) {

View File

@ -345,14 +345,14 @@ public class AmqpSession implements AmqpResource {
connection.pumpProtonToSocket(); connection.pumpProtonToSocket();
} }
public void regosterSender(ConsumerId consumerId, AmqpSender sender) { public void registerSender(ConsumerId consumerId, AmqpSender sender) {
consumers.put(consumerId, sender); consumers.put(consumerId, sender);
connection.regosterSender(consumerId, sender); connection.registerSender(consumerId, sender);
} }
public void unregisterSender(ConsumerId consumerId) { public void unregisterSender(ConsumerId consumerId) {
consumers.remove(consumerId); consumers.remove(consumerId);
connection.unregosterSender(consumerId); connection.unregisterSender(consumerId);
} }
//----- Configuration accessors ------------------------------------------// //----- Configuration accessors ------------------------------------------//