This commit is contained in:
Clebert Suconic 2017-11-09 11:52:44 -05:00
commit c2a21c9743
1 changed files with 21 additions and 10 deletions

View File

@ -1156,7 +1156,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
try {
closeChildren();
synchronized (this) {
synchronized (producerCreditManager) {
producerCreditManager.close();
}
inClose = true;
@ -1177,7 +1177,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return;
}
producerCreditManager.close();
synchronized (producerCreditManager) {
producerCreditManager.close();
}
cleanUpChildren();
@ -1282,7 +1284,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
}
if (resetCreditManager) {
producerCreditManager.reset();
synchronized (producerCreditManager) {
producerCreditManager.reset();
}
// Also need to send more credits for consumers, otherwise the system could hand with the server
// not having any credits to send
@ -1343,25 +1347,32 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
}
@Override
public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon) {
ClientProducerCredits credits = producerCreditManager.getCredits(address, anon, sessionContext);
return credits;
public ClientProducerCredits getCredits(final SimpleString address, final boolean anon) {
synchronized (producerCreditManager) {
ClientProducerCredits credits = producerCreditManager.getCredits(address, anon, sessionContext);
return credits;
}
}
@Override
public void returnCredits(final SimpleString address) {
producerCreditManager.returnCredits(address);
synchronized (producerCreditManager) {
producerCreditManager.returnCredits(address);
}
}
@Override
public void handleReceiveProducerCredits(final SimpleString address, final int credits) {
producerCreditManager.receiveCredits(address, credits);
synchronized (producerCreditManager) {
producerCreditManager.receiveCredits(address, credits);
}
}
@Override
public void handleReceiveProducerFailCredits(final SimpleString address, int credits) {
producerCreditManager.receiveFailCredits(address, credits);
synchronized (producerCreditManager) {
producerCreditManager.receiveFailCredits(address, credits);
}
}
@Override