mirror of https://github.com/apache/activemq.git
Fix test, remote broker only dequeues the message if it is ack'd
otherwise it remains in-flight.
This commit is contained in:
parent
6ae576cafb
commit
4fa10356f0
|
@ -26,6 +26,7 @@ import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
import org.apache.activemq.command.ConnectionInfo;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageAck;
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
import org.apache.activemq.command.SessionInfo;
|
import org.apache.activemq.command.SessionInfo;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
|
@ -80,14 +81,20 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
|
||||||
|
|
||||||
// Now create remote consumer that should cause message to move to this
|
// Now create remote consumer that should cause message to move to this
|
||||||
// remote consumer.
|
// remote consumer.
|
||||||
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
|
final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
|
||||||
connection2.request(consumerInfo2);
|
connection2.request(consumerInfo2);
|
||||||
|
|
||||||
// Make sure the message was delivered via the remote.
|
// Make sure the message was delivered via the remote.
|
||||||
assertTrue("message was received", Wait.waitFor(new Wait.Condition() {
|
assertTrue("message was received", Wait.waitFor(new Wait.Condition() {
|
||||||
@Override
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
return receiveMessage(connection2) != null;
|
Message msg = receiveMessage(connection2);
|
||||||
|
if (msg != null) {
|
||||||
|
connection2.request(createAck(consumerInfo2, msg, 1, MessageAck.STANDARD_ACK_TYPE));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
@ -97,6 +104,7 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
|
||||||
return 1 == destinationStatistics.getForwards().getCount();
|
return 1 == destinationStatistics.getForwards().getCount();
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
|
assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
|
||||||
assertEquals("remote broker dest stat dequeues", 1, remoteBroker.getDestination(destination).getDestinationStatistics().getDequeues().getCount());
|
assertEquals("remote broker dest stat dequeues", 1, remoteBroker.getDestination(destination).getDestinationStatistics().getDequeues().getCount());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue