AMQ-6362 - merge duplicated code - fix regression in AMQ4889Test and ExceptionListenerTest

This commit is contained in:
gtully 2016-07-25 14:27:40 +01:00
parent ad657cc202
commit a65f5e7c20
2 changed files with 8 additions and 30 deletions

View File

@ -685,7 +685,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
RemoveInfo removeCommand = info.createRemoveCommand(); RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
try { try {
doSyncSendPacket(removeCommand, closeTimeout); syncSendPacket(removeCommand, closeTimeout);
} catch (JMSException e) { } catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) { if (e.getCause() instanceof RequestTimedOutIOException) {
// expected // expected
@ -1377,13 +1377,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
onException(new IOException("Force close due to SecurityException on connect", exception)); onException(new IOException("Force close due to SecurityException on connect", exception));
} }
public Response syncSendPacket(Command command) throws JMSException { public Response syncSendPacket(Command command, int timeout) throws JMSException {
if (isClosed()) { if (isClosed()) {
throw new ConnectionClosedException(); throw new ConnectionClosedException();
} else { } else {
try { try {
Response response = (Response)this.transport.request(command); Response response = (Response)(timeout > 0
? this.transport.request(command, timeout)
: this.transport.request(command));
if (response.isException()) { if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response; ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) { if (er.getException() instanceof JMSException) {
@ -1422,32 +1424,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* *
* @throws JMSException * @throws JMSException
*/ */
public Response syncSendPacket(Command command, int timeout) throws JMSException { public Response syncSendPacket(Command command) throws JMSException {
if (isClosed() || closing.get()) { return syncSendPacket(command, 0);
throw new ConnectionClosedException();
} else {
return doSyncSendPacket(command, timeout);
}
}
protected Response doSyncSendPacket(Command command, int timeout)
throws JMSException {
try {
Response response = (Response) (timeout > 0
? this.transport.request(command, timeout)
: this.transport.request(command));
if (response != null && response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
} else {
throw JMSExceptionSupport.create(er.getException());
}
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
} }
/** /**

View File

@ -277,7 +277,7 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
this.transactionId = null; this.transactionId = null;
//make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
this.connection.doSyncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0); this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
// Notify the listener that the tx was rolled back // Notify the listener that the tx was rolled back
if (localTransactionEventListener != null) { if (localTransactionEventListener != null) {
localTransactionEventListener.rollbackEvent(); localTransactionEventListener.rollbackEvent();