reorganize the connection response code to only pump once all state is
process.
This commit is contained in:
Timothy Bish 2015-02-25 10:17:06 -05:00
parent 0142c4dc89
commit f988ca6e49
1 changed files with 18 additions and 14 deletions

View File

@ -530,11 +530,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
sendToActiveMQ(connectionInfo, new ResponseHandler() { sendToActiveMQ(connectionInfo, new ResponseHandler() {
@Override @Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
Throwable exception = null;
try {
protonConnection.open(); protonConnection.open();
pumpProtonToSocket();
if (response.isException()) { if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException(); exception = ((ExceptionResponse) response).getException();
if (exception instanceof SecurityException) { if (exception instanceof SecurityException) {
protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage())); protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
} else if (exception instanceof InvalidClientIDException) { } else if (exception instanceof InvalidClientIDException) {
@ -543,11 +544,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage())); protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
} }
protonConnection.close(); protonConnection.close();
pumpProtonToSocket();
amqpTransport.onException(IOExceptionSupport.create(exception));
return;
} }
} finally {
pumpProtonToSocket();
if (response.isException()) {
amqpTransport.onException(IOExceptionSupport.create(exception));
}
}
} }
}); });
} }