mirror of https://github.com/apache/activemq.git
AMQ-4637 fix test that fails due to this change, also remove extra call to setPoisonCause
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1504954 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ac8e38fb69
commit
0826fdcf06
|
@ -1206,7 +1206,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
// Acknowledge the last message.
|
// Acknowledge the last message.
|
||||||
|
|
||||||
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
|
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
|
||||||
ack.setPoisonCause(lastMd.getRollbackCause());
|
|
||||||
ack.setFirstMessageId(firstMsgId);
|
ack.setFirstMessageId(firstMsgId);
|
||||||
ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy));
|
ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy));
|
||||||
session.sendAck(ack,true);
|
session.sendAck(ack,true);
|
||||||
|
|
|
@ -47,6 +47,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
|
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
|
|
||||||
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
connection = createConnection();
|
connection = createConnection();
|
||||||
}
|
}
|
||||||
|
@ -54,6 +55,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
/**
|
/**
|
||||||
* @see junit.framework.TestCase#tearDown()
|
* @see junit.framework.TestCase#tearDown()
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
connection.close();
|
connection.close();
|
||||||
|
@ -80,12 +82,13 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
private class TestMessageListener implements MessageListener {
|
private class TestMessageListener implements MessageListener {
|
||||||
|
|
||||||
public int counter;
|
public int counter;
|
||||||
private Session session;
|
private final Session session;
|
||||||
|
|
||||||
public TestMessageListener(Session session) {
|
public TestMessageListener(Session session) {
|
||||||
this.session = session;
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
try {
|
try {
|
||||||
LOG.info("Message Received: " + message);
|
LOG.info("Message Received: " + message);
|
||||||
|
@ -125,7 +128,6 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// first try.. should get 2 since there is no delay on the
|
// first try.. should get 2 since there is no delay on the
|
||||||
|
@ -135,7 +137,6 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
||||||
}
|
}
|
||||||
// 2nd redeliver (redelivery after 1 sec)
|
// 2nd redeliver (redelivery after 1 sec)
|
||||||
assertEquals(3, listener.counter);
|
assertEquals(3, listener.counter);
|
||||||
|
@ -143,7 +144,6 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
||||||
}
|
}
|
||||||
// 3rd redeliver (redelivery after 2 seconds) - it should give up after
|
// 3rd redeliver (redelivery after 2 seconds) - it should give up after
|
||||||
// that
|
// that
|
||||||
|
@ -156,7 +156,6 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// ignore
|
|
||||||
}
|
}
|
||||||
// it should be committed, so no redelivery
|
// it should be committed, so no redelivery
|
||||||
assertEquals(5, listener.counter);
|
assertEquals(5, listener.counter);
|
||||||
|
@ -164,7 +163,6 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1500);
|
Thread.sleep(1500);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// ignore
|
|
||||||
}
|
}
|
||||||
// no redelivery, counter should still be 4
|
// no redelivery, counter should still be 4
|
||||||
assertEquals(5, listener.counter);
|
assertEquals(5, listener.counter);
|
||||||
|
@ -239,7 +237,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
|
|
||||||
public void testQueueSessionListenerExceptionRetry() throws Exception {
|
public void testQueueSessionListenerExceptionRetry() throws Exception {
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
Queue queue = session.createQueue("queue-" + getName());
|
Queue queue = session.createQueue("queue-" + getName());
|
||||||
MessageProducer producer = createProducer(session, queue);
|
MessageProducer producer = createProducer(session, queue);
|
||||||
|
@ -247,15 +245,15 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
message = createTextMessage(session, "2");
|
message = createTextMessage(session, "2");
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
final CountDownLatch gotMessage = new CountDownLatch(2);
|
final CountDownLatch gotMessage = new CountDownLatch(2);
|
||||||
final AtomicInteger count = new AtomicInteger(0);
|
final AtomicInteger count = new AtomicInteger(0);
|
||||||
final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
|
final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
|
||||||
final ArrayList<String> received = new ArrayList<String>();
|
final ArrayList<String> received = new ArrayList<String>();
|
||||||
consumer.setMessageListener(new MessageListener() {
|
consumer.setMessageListener(new MessageListener() {
|
||||||
|
@Override
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
LOG.info("Message Received: " + message);
|
LOG.info("Message Received: " + message);
|
||||||
try {
|
try {
|
||||||
|
@ -272,9 +270,9 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
gotMessage.countDown();
|
gotMessage.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
|
assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
|
||||||
|
|
||||||
for (int i=0; i<maxDeliveries; i++) {
|
for (int i=0; i<maxDeliveries; i++) {
|
||||||
assertEquals("got first redelivered: " + i, "1", received.get(i));
|
assertEquals("got first redelivered: " + i, "1", received.get(i));
|
||||||
}
|
}
|
||||||
|
@ -283,11 +281,10 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
}
|
}
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testQueueSessionListenerExceptionDlq() throws Exception {
|
public void testQueueSessionListenerExceptionDlq() throws Exception {
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
Queue queue = session.createQueue("queue-" + getName());
|
Queue queue = session.createQueue("queue-" + getName());
|
||||||
MessageProducer producer = createProducer(session, queue);
|
MessageProducer producer = createProducer(session, queue);
|
||||||
|
@ -299,6 +296,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
|
MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
|
||||||
final CountDownLatch gotDlqMessage = new CountDownLatch(1);
|
final CountDownLatch gotDlqMessage = new CountDownLatch(1);
|
||||||
dlqConsumer.setMessageListener(new MessageListener() {
|
dlqConsumer.setMessageListener(new MessageListener() {
|
||||||
|
@Override
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
LOG.info("DLQ Message Received: " + message);
|
LOG.info("DLQ Message Received: " + message);
|
||||||
dlqMessage[0] = message;
|
dlqMessage[0] = message;
|
||||||
|
@ -307,20 +305,21 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
});
|
});
|
||||||
|
|
||||||
MessageConsumer consumer = session.createConsumer(queue);
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
|
final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
|
||||||
final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
|
final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
|
||||||
|
|
||||||
consumer.setMessageListener(new MessageListener() {
|
consumer.setMessageListener(new MessageListener() {
|
||||||
|
@Override
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
LOG.info("Message Received: " + message);
|
LOG.info("Message Received: " + message);
|
||||||
gotMessage.countDown();
|
gotMessage.countDown();
|
||||||
throw new RuntimeException(getName() + " force a redelivery");
|
throw new RuntimeException(getName() + " force a redelivery");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
|
assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
|
||||||
|
|
||||||
// check DLQ
|
// check DLQ
|
||||||
assertTrue("got dlq message", gotDlqMessage.await(20, TimeUnit.SECONDS));
|
assertTrue("got dlq message", gotDlqMessage.await(20, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
@ -328,12 +327,11 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
message = dlqMessage[0];
|
message = dlqMessage[0];
|
||||||
assertNotNull("dlq message captured", message);
|
assertNotNull("dlq message captured", message);
|
||||||
String cause = message.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
|
String cause = message.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
|
||||||
assertTrue("cause exception is remembered", cause.contains("RuntimeException"));
|
LOG.info("DLQ'd message cause reported as: {}", cause);
|
||||||
assertTrue("is correct exception", cause.contains(getName()));
|
assertTrue("cause exception is remembered", cause.contains("Throwable"));
|
||||||
|
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private TextMessage createTextMessage(Session session, String text) throws JMSException {
|
private TextMessage createTextMessage(Session session, String text) throws JMSException {
|
||||||
return session.createTextMessage(text);
|
return session.createTextMessage(text);
|
||||||
|
|
Loading…
Reference in New Issue