mirror of https://github.com/apache/activemq.git
Fix for XARecoveryBrokerTest
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@545202 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
415f35b208
commit
12dba4d65c
|
@ -346,7 +346,7 @@ public class Queue implements Destination, Task {
|
|||
if (log.isDebugEnabled()) {
|
||||
log.debug("Expired message: " + message);
|
||||
}
|
||||
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
|
||||
if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
|
||||
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
|
||||
context.getConnection().dispatchAsync(ack);
|
||||
}
|
||||
|
@ -370,7 +370,7 @@ public class Queue implements Destination, Task {
|
|||
log.debug("Expired message: " + message);
|
||||
}
|
||||
|
||||
if( !message.isResponseRequired() ) {
|
||||
if( !message.isResponseRequired() && !context.isInRecoveryMode() ) {
|
||||
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
|
||||
context.getConnection().dispatchAsync(ack);
|
||||
}
|
||||
|
@ -381,7 +381,7 @@ public class Queue implements Destination, Task {
|
|||
try {
|
||||
doMessageSend(producerExchange, message);
|
||||
} catch (Exception e) {
|
||||
if( message.isResponseRequired() ) {
|
||||
if( message.isResponseRequired() && !context.isInRecoveryMode() ) {
|
||||
ExceptionResponse response = new ExceptionResponse(e);
|
||||
response.setCorrelationId(message.getCommandId());
|
||||
context.getConnection().dispatchAsync(response);
|
||||
|
@ -427,7 +427,7 @@ public class Queue implements Destination, Task {
|
|||
if(store!=null&&message.isPersistent()){
|
||||
store.addMessage(context,message);
|
||||
}
|
||||
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
|
||||
if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
|
||||
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
|
||||
context.getConnection().dispatchAsync(ack);
|
||||
}
|
||||
|
|
|
@ -264,7 +264,7 @@ public class Topic implements Destination {
|
|||
if (log.isDebugEnabled()) {
|
||||
log.debug("Expired message: " + message);
|
||||
}
|
||||
if( producerExchange.getProducerState().getInfo().getWindowSize() > 0 || !message.isResponseRequired() ) {
|
||||
if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
|
||||
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
|
||||
context.getConnection().dispatchAsync(ack);
|
||||
}
|
||||
|
@ -289,7 +289,7 @@ public class Topic implements Destination {
|
|||
log.debug("Expired message: " + message);
|
||||
}
|
||||
|
||||
if( !message.isResponseRequired() ) {
|
||||
if( !message.isResponseRequired() && !context.isInRecoveryMode() ) {
|
||||
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
|
||||
context.getConnection().dispatchAsync(ack);
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ public class Topic implements Destination {
|
|||
try {
|
||||
doMessageSend(producerExchange, message);
|
||||
} catch (Exception e) {
|
||||
if( message.isResponseRequired() ) {
|
||||
if( message.isResponseRequired() && !context.isInRecoveryMode() ) {
|
||||
ExceptionResponse response = new ExceptionResponse(e);
|
||||
response.setCorrelationId(message.getCommandId());
|
||||
context.getConnection().dispatchAsync(response);
|
||||
|
|
Loading…
Reference in New Issue