mirror of https://github.com/apache/activemq.git
recompute the timeout value and send a new pull request if the message received exceeds the configure redelivery maximum.
This commit is contained in:
parent
84ec047d2f
commit
b84413a314
|
@ -502,6 +502,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
} else if (redeliveryExceeded(md)) {
|
} else if (redeliveryExceeded(md)) {
|
||||||
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
|
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
|
||||||
posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
|
posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
|
||||||
|
if (timeout > 0) {
|
||||||
|
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
|
||||||
|
}
|
||||||
|
sendPullCommand(timeout);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(getConsumerId() + " received message: " + md);
|
LOG.trace(getConsumerId() + " received message: " + md);
|
||||||
|
|
|
@ -16,35 +16,47 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq;
|
package org.apache.activemq;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
/**
|
import org.apache.activemq.broker.BrokerService;
|
||||||
*
|
import org.junit.After;
|
||||||
*/
|
import org.junit.Before;
|
||||||
public class ConsumerReceiveWithTimeoutTest extends TestSupport {
|
import org.junit.Test;
|
||||||
|
|
||||||
private Connection connection;
|
public class ConsumerReceiveWithTimeoutTest {
|
||||||
|
|
||||||
|
private ActiveMQConnection connection;
|
||||||
|
private BrokerService broker;
|
||||||
|
private String connectionUri;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
createBroker();
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
|
||||||
super.setUp();
|
|
||||||
connection = createConnection();
|
connection = createConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@After
|
||||||
* @see junit.framework.TestCase#tearDown()
|
public void tearDown() throws Exception {
|
||||||
*/
|
|
||||||
protected void tearDown() throws Exception {
|
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
|
try {
|
||||||
connection.close();
|
connection.close();
|
||||||
connection = null;
|
} catch (Exception e) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (broker != null) {
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
}
|
}
|
||||||
super.tearDown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -53,6 +65,7 @@ public class ConsumerReceiveWithTimeoutTest extends TestSupport {
|
||||||
*
|
*
|
||||||
* @throws javax.jms.JMSException
|
* @throws javax.jms.JMSException
|
||||||
*/
|
*/
|
||||||
|
@Test(timeout = 30000)
|
||||||
public void testConsumerReceiveBeforeMessageDispatched() throws JMSException {
|
public void testConsumerReceiveBeforeMessageDispatched() throws JMSException {
|
||||||
|
|
||||||
connection.start();
|
connection.start();
|
||||||
|
@ -61,6 +74,7 @@ public class ConsumerReceiveWithTimeoutTest extends TestSupport {
|
||||||
final Queue queue = session.createQueue("test");
|
final Queue queue = session.createQueue("test");
|
||||||
|
|
||||||
Thread t = new Thread() {
|
Thread t = new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
// wait for 10 seconds to allow consumer.receive to be run
|
// wait for 10 seconds to allow consumer.receive to be run
|
||||||
|
@ -81,7 +95,67 @@ public class ConsumerReceiveWithTimeoutTest extends TestSupport {
|
||||||
Message msg = consumer.receive(60000);
|
Message msg = consumer.receive(60000);
|
||||||
assertNotNull(msg);
|
assertNotNull(msg);
|
||||||
session.close();
|
session.close();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* check if receive(timeout) does timeout when prefetch=0 and redeliveries=0
|
||||||
|
* <p/>
|
||||||
|
* send a message.
|
||||||
|
* consume and rollback to ensure redeliverCount is incremented
|
||||||
|
* try to consume message with a timeout.
|
||||||
|
*/
|
||||||
|
@Test(timeout=20000)
|
||||||
|
public void testConsumerReceivePrefetchZeroRedeliveryZero() throws Exception {
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
// push message to queue
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue("test.prefetch.zero");
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
TextMessage textMessage = session.createTextMessage("test Message");
|
||||||
|
producer.send(textMessage);
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
// consume and rollback - increase redelivery counter on message
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
Message message = consumer.receive(2000);
|
||||||
|
assertNotNull(message);
|
||||||
|
session.rollback();
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
// Reconnect with zero prefetch and zero redeliveries allowed.
|
||||||
|
connection.close();
|
||||||
|
connection = createConnection();
|
||||||
|
connection.getPrefetchPolicy().setQueuePrefetch(0);
|
||||||
|
connection.getRedeliveryPolicy().setMaximumRedeliveries(0);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
// try consume with timeout - expect it to timeout and return NULL message
|
||||||
|
session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
consumer = session.createConsumer(queue);
|
||||||
|
message = consumer.receive(3000);
|
||||||
|
|
||||||
|
assertNull(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createBroker() throws Exception {
|
||||||
|
broker = new BrokerService();
|
||||||
|
broker.setUseJmx(false);
|
||||||
|
broker.setPersistent(false);
|
||||||
|
broker.addConnector("tcp://localhost:0");
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
|
||||||
|
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||||
|
return new ActiveMQConnectionFactory(connectionUri);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ActiveMQConnection createConnection() throws Exception {
|
||||||
|
return (ActiveMQConnection) createConnectionFactory().createConnection();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue