mirror of https://github.com/apache/activemq.git
follow up to fix for https://issues.apache.org/activemq/browse/AMQ-2877 - one test depended on the lack of pull replay to validate redeliver in a transaciton, re worked the test to make it work with this change
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@990106 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
550637b84c
commit
fafaf7d6ea
|
@ -807,7 +807,7 @@ public class FailoverTransactionTest extends TestSupport {
|
||||||
|
|
||||||
|
|
||||||
public void testPoisonOnDeliveryWhilePending() throws Exception {
|
public void testPoisonOnDeliveryWhilePending() throws Exception {
|
||||||
LOG.info("testWaitForMissingRedeliveries()");
|
LOG.info("testPoisonOnDeliveryWhilePending()");
|
||||||
broker = createBroker(true);
|
broker = createBroker(true);
|
||||||
broker.start();
|
broker.start();
|
||||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
|
||||||
|
@ -825,41 +825,51 @@ public class FailoverTransactionTest extends TestSupport {
|
||||||
}
|
}
|
||||||
assertNotNull("got message just produced", msg);
|
assertNotNull("got message just produced", msg);
|
||||||
|
|
||||||
|
// add another consumer into the mix that may get the message after restart
|
||||||
|
MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
|
||||||
|
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker = createBroker(false);
|
broker = createBroker(false);
|
||||||
broker.start();
|
broker.start();
|
||||||
|
|
||||||
final CountDownLatch commitDone = new CountDownLatch(1);
|
final CountDownLatch commitDone = new CountDownLatch(1);
|
||||||
|
|
||||||
|
final Vector<Exception> exceptions = new Vector<Exception>();
|
||||||
|
|
||||||
// with prefetch=0, it will not get redelivered as there will not be another receive
|
// commit may fail if other consumer gets the message on restart, it will be seen a a duplicate on teh connection
|
||||||
// for this consumer. so it will block till it timeout with an exception
|
// but with no transaciton and it pending on another consumer it will be posion
|
||||||
// will block pending re-deliveries
|
|
||||||
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("doing async commit...");
|
LOG.info("doing async commit...");
|
||||||
try {
|
try {
|
||||||
consumerSession.commit();
|
consumerSession.commit();
|
||||||
} catch (JMSException ignored) {
|
} catch (JMSException ex) {
|
||||||
|
exceptions.add(ex);
|
||||||
|
} finally {
|
||||||
commitDone.countDown();
|
commitDone.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// pull the pending message to this consumer where it will be poison as it is a duplicate without a tx
|
assertNull("consumer2 not get a message while pending to 1 or consumed by 1", consumer2.receive(2000));
|
||||||
MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
|
|
||||||
assertNull("consumer2 not get a message while pending to 1", consumer2.receive(2000));
|
|
||||||
|
|
||||||
assertTrue("commit completed with ex", commitDone.await(15, TimeUnit.SECONDS));
|
assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
|
||||||
assertNull("consumer should not get rolledback and non redelivered message", consumer.receive(5000));
|
|
||||||
|
|
||||||
// message should be in dlq
|
// either message consumed or sent to dlq via poison on redelivery to wrong consumer
|
||||||
MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
|
// message should not be available again in any event
|
||||||
TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
|
assertNull("consumer should not get rolledback on non redelivered message or duplicate", consumer.receive(5000));
|
||||||
assertNotNull("found message in dlq", dlqMessage);
|
|
||||||
assertEquals("text matches", "Test message", dlqMessage.getText());
|
|
||||||
consumerSession.commit();
|
|
||||||
|
|
||||||
|
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
|
||||||
|
if (exceptions.isEmpty()) {
|
||||||
|
// commit succeeded, message was redelivered to the correct consumer after restart so commit was fine
|
||||||
|
} else {
|
||||||
|
// message should be in dlq
|
||||||
|
MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
|
||||||
|
TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
|
||||||
|
assertNotNull("found message in dlq", dlqMessage);
|
||||||
|
assertEquals("text matches", "Test message", dlqMessage.getText());
|
||||||
|
consumerSession.commit();
|
||||||
|
}
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue