mirror of https://github.com/apache/activemq.git
fix and test for: https://issues.apache.org/jira/browse/AMQ-4464
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1466131 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a80c711d78
commit
2aff82c47a
|
@ -18,6 +18,7 @@ package org.apache.activemq;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -289,6 +290,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
|
return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public StatsImpl getStats() {
|
public StatsImpl getStats() {
|
||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
@ -380,6 +382,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* @throws JMSException if the JMS provider fails to receive the next
|
* @throws JMSException if the JMS provider fails to receive the next
|
||||||
* message due to some internal error.
|
* message due to some internal error.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public String getMessageSelector() throws JMSException {
|
public String getMessageSelector() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return selector;
|
return selector;
|
||||||
|
@ -394,6 +397,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* listener due to some internal error.
|
* listener due to some internal error.
|
||||||
* @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
|
* @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public MessageListener getMessageListener() throws JMSException {
|
public MessageListener getMessageListener() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return this.messageListener.get();
|
return this.messageListener.get();
|
||||||
|
@ -414,6 +418,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* message due to some internal error.
|
* message due to some internal error.
|
||||||
* @see javax.jms.MessageConsumer#getMessageListener
|
* @see javax.jms.MessageConsumer#getMessageListener
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setMessageListener(MessageListener listener) throws JMSException {
|
public void setMessageListener(MessageListener listener) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
if (info.getPrefetchSize() == 0) {
|
if (info.getPrefetchSize() == 0) {
|
||||||
|
@ -436,6 +441,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public MessageAvailableListener getAvailableListener() {
|
public MessageAvailableListener getAvailableListener() {
|
||||||
return availableListener;
|
return availableListener;
|
||||||
}
|
}
|
||||||
|
@ -445,6 +451,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* message available so that the {@link MessageConsumer#receiveNoWait()} can
|
* message available so that the {@link MessageConsumer#receiveNoWait()} can
|
||||||
* be called.
|
* be called.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setAvailableListener(MessageAvailableListener availableListener) {
|
public void setAvailableListener(MessageAvailableListener availableListener) {
|
||||||
this.availableListener = availableListener;
|
this.availableListener = availableListener;
|
||||||
}
|
}
|
||||||
|
@ -514,6 +521,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* @return the next message produced for this message consumer, or null if
|
* @return the next message produced for this message consumer, or null if
|
||||||
* this message consumer is concurrently closed
|
* this message consumer is concurrently closed
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public Message receive() throws JMSException {
|
public Message receive() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
checkMessageListener();
|
checkMessageListener();
|
||||||
|
@ -547,6 +555,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
if (session.isClientAcknowledge()) {
|
if (session.isClientAcknowledge()) {
|
||||||
m.setAcknowledgeCallback(new Callback() {
|
m.setAcknowledgeCallback(new Callback() {
|
||||||
|
@Override
|
||||||
public void execute() throws Exception {
|
public void execute() throws Exception {
|
||||||
session.checkClosed();
|
session.checkClosed();
|
||||||
session.acknowledge();
|
session.acknowledge();
|
||||||
|
@ -554,6 +563,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
});
|
});
|
||||||
} else if (session.isIndividualAcknowledge()) {
|
} else if (session.isIndividualAcknowledge()) {
|
||||||
m.setAcknowledgeCallback(new Callback() {
|
m.setAcknowledgeCallback(new Callback() {
|
||||||
|
@Override
|
||||||
public void execute() throws Exception {
|
public void execute() throws Exception {
|
||||||
session.checkClosed();
|
session.checkClosed();
|
||||||
acknowledge(md);
|
acknowledge(md);
|
||||||
|
@ -577,6 +587,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* the timeout expires or this message consumer is concurrently
|
* the timeout expires or this message consumer is concurrently
|
||||||
* closed
|
* closed
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public Message receive(long timeout) throws JMSException {
|
public Message receive(long timeout) throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
checkMessageListener();
|
checkMessageListener();
|
||||||
|
@ -613,6 +624,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* @throws JMSException if the JMS provider fails to receive the next
|
* @throws JMSException if the JMS provider fails to receive the next
|
||||||
* message due to some internal error.
|
* message due to some internal error.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public Message receiveNoWait() throws JMSException {
|
public Message receiveNoWait() throws JMSException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
checkMessageListener();
|
checkMessageListener();
|
||||||
|
@ -651,6 +663,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
* @throws JMSException if the JMS provider fails to close the consumer due
|
* @throws JMSException if the JMS provider fails to close the consumer due
|
||||||
* to some internal error.
|
* to some internal error.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void close() throws JMSException {
|
public void close() throws JMSException {
|
||||||
if (!unconsumedMessages.isClosed()) {
|
if (!unconsumedMessages.isClosed()) {
|
||||||
if (session.getTransactionContext().isInTransaction()) {
|
if (session.getTransactionContext().isInTransaction()) {
|
||||||
|
@ -743,6 +756,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
executorService = Executors.newSingleThreadExecutor();
|
executorService = Executors.newSingleThreadExecutor();
|
||||||
}
|
}
|
||||||
executorService.submit(new Runnable() {
|
executorService.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
session.sendAck(ackToSend,true);
|
session.sendAck(ackToSend,true);
|
||||||
|
@ -1197,6 +1211,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
// Adjust the window size.
|
// Adjust the window size.
|
||||||
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
|
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
|
||||||
redeliveryDelay = 0;
|
redeliveryDelay = 0;
|
||||||
|
|
||||||
|
deliveredCounter -= deliveredMessages.size();
|
||||||
|
deliveredMessages.clear();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// only redelivery_ack after first delivery
|
// only redelivery_ack after first delivery
|
||||||
|
@ -1213,8 +1231,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
final LinkedList<MessageDispatch> pendingRedeliveries =
|
final LinkedList<MessageDispatch> pendingRedeliveries =
|
||||||
new LinkedList<MessageDispatch>(deliveredMessages);
|
new LinkedList<MessageDispatch>(deliveredMessages);
|
||||||
|
|
||||||
|
Collections.reverse(pendingRedeliveries);
|
||||||
|
|
||||||
|
deliveredCounter -= deliveredMessages.size();
|
||||||
|
deliveredMessages.clear();
|
||||||
|
|
||||||
// Start up the delivery again a little later.
|
// Start up the delivery again a little later.
|
||||||
session.getScheduler().executeAfterDelay(new Runnable() {
|
session.getScheduler().executeAfterDelay(new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
if (!unconsumedMessages.isClosed()) {
|
if (!unconsumedMessages.isClosed()) {
|
||||||
|
@ -1236,9 +1260,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
unconsumedMessages.enqueueFirst(md);
|
unconsumedMessages.enqueueFirst(md);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deliveredCounter -= deliveredMessages.size();
|
||||||
|
deliveredMessages.clear();
|
||||||
|
|
||||||
if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
|
if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
|
||||||
// Start up the delivery again a little later.
|
// Start up the delivery again a little later.
|
||||||
session.getScheduler().executeAfterDelay(new Runnable() {
|
session.getScheduler().executeAfterDelay(new Runnable() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
if (started.get()) {
|
if (started.get()) {
|
||||||
|
@ -1254,8 +1282,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
deliveredCounter -= deliveredMessages.size();
|
|
||||||
deliveredMessages.clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (messageListener.get() != null) {
|
if (messageListener.get() != null) {
|
||||||
|
@ -1304,6 +1330,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void dispatch(MessageDispatch md) {
|
public void dispatch(MessageDispatch md) {
|
||||||
MessageListener listener = this.messageListener.get();
|
MessageListener listener = this.messageListener.get();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -16,8 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.usecases;
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -29,6 +32,7 @@ import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageListener;
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
@ -78,6 +82,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + received.size() + " messages.");
|
LOG.info("Consumer has received " + received.size() + " messages.");
|
||||||
return received.size() == MSG_COUNT;
|
return received.size() == MSG_COUNT;
|
||||||
|
@ -91,6 +96,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + received.size() + " messages since rollback.");
|
LOG.info("Consumer has received " + received.size() + " messages since rollback.");
|
||||||
return received.size() == MSG_COUNT;
|
return received.size() == MSG_COUNT;
|
||||||
|
@ -106,6 +112,76 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
session.commit();
|
session.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMessageDeleiveredInCorrectOrder() throws Exception {
|
||||||
|
|
||||||
|
final LinkedHashSet<Message> received = new LinkedHashSet<Message>();
|
||||||
|
final LinkedHashSet<Message> beforeRollback = new LinkedHashSet<Message>();
|
||||||
|
final LinkedHashSet<Message> afterRollback = new LinkedHashSet<Message>();
|
||||||
|
|
||||||
|
Connection connection = connectionFactory.createConnection();
|
||||||
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createQueue(destinationName);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
consumer.setMessageListener(new MessageListener() {
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
received.add(message);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
sendMessages();
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
LOG.info("Consumer has received " + received.size() + " messages.");
|
||||||
|
return received.size() == MSG_COUNT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
));
|
||||||
|
|
||||||
|
beforeRollback.addAll(received);
|
||||||
|
received.clear();
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
LOG.info("Consumer has received " + received.size() + " messages since rollback.");
|
||||||
|
return received.size() == MSG_COUNT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
));
|
||||||
|
|
||||||
|
afterRollback.addAll(received);
|
||||||
|
received.clear();
|
||||||
|
|
||||||
|
assertEquals(beforeRollback.size(), afterRollback.size());
|
||||||
|
assertEquals(beforeRollback, afterRollback);
|
||||||
|
|
||||||
|
Iterator<Message> after = afterRollback.iterator();
|
||||||
|
Iterator<Message> before = beforeRollback.iterator();
|
||||||
|
|
||||||
|
while (before.hasNext() && after.hasNext()) {
|
||||||
|
TextMessage original = (TextMessage) before.next();
|
||||||
|
TextMessage rolledBack = (TextMessage) after.next();
|
||||||
|
|
||||||
|
int originalInt = Integer.parseInt(original.getText());
|
||||||
|
int rolledbackInt = Integer.parseInt(rolledBack.getText());
|
||||||
|
|
||||||
|
assertEquals(originalInt, rolledbackInt);
|
||||||
|
}
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMessageDeleiveryDoesntStop() throws Exception {
|
public void testMessageDeleiveryDoesntStop() throws Exception {
|
||||||
|
|
||||||
|
@ -130,6 +206,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + received.size() + " messages.");
|
LOG.info("Consumer has received " + received.size() + " messages.");
|
||||||
return received.size() == MSG_COUNT;
|
return received.size() == MSG_COUNT;
|
||||||
|
@ -145,6 +222,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + received.size() + " messages since rollback.");
|
LOG.info("Consumer has received " + received.size() + " messages since rollback.");
|
||||||
return received.size() == MSG_COUNT * 2;
|
return received.size() == MSG_COUNT * 2;
|
||||||
|
@ -182,6 +260,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + received.size() + " messages.");
|
LOG.info("Consumer has received " + received.size() + " messages.");
|
||||||
return received.size() == MSG_COUNT;
|
return received.size() == MSG_COUNT;
|
||||||
|
@ -194,6 +273,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertFalse("Delayed redelivery test not expecting any messages yet.",
|
assertFalse("Delayed redelivery test not expecting any messages yet.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
return received.size() > 0;
|
return received.size() > 0;
|
||||||
}
|
}
|
||||||
|
@ -225,6 +305,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + received.size() + " messages.");
|
LOG.info("Consumer has received " + received.size() + " messages.");
|
||||||
return received.size() == MSG_COUNT;
|
return received.size() == MSG_COUNT;
|
||||||
|
@ -264,6 +345,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + received.size() + " messages since rollback.");
|
LOG.info("Consumer has received " + received.size() + " messages since rollback.");
|
||||||
return received.size() == MSG_COUNT;
|
return received.size() == MSG_COUNT;
|
||||||
|
@ -307,6 +389,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + received.size() + " messages.");
|
LOG.info("Consumer has received " + received.size() + " messages.");
|
||||||
return received.size() == MSG_COUNT;
|
return received.size() == MSG_COUNT;
|
||||||
|
@ -329,6 +412,7 @@ public class NonBlockingConsumerRedeliveryTest {
|
||||||
|
|
||||||
assertTrue("Post-Rollback expects to DLQ: " + MSG_COUNT + " messages.",
|
assertTrue("Post-Rollback expects to DLQ: " + MSG_COUNT + " messages.",
|
||||||
Wait.waitFor(new Wait.Condition(){
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
public boolean isSatisified() throws Exception {
|
public boolean isSatisified() throws Exception {
|
||||||
LOG.info("Consumer has received " + dlqed.size() + " messages in DLQ.");
|
LOG.info("Consumer has received " + dlqed.size() + " messages in DLQ.");
|
||||||
return dlqed.size() == MSG_COUNT;
|
return dlqed.size() == MSG_COUNT;
|
||||||
|
|
Loading…
Reference in New Issue