All message dispatching should occur from the session's executor. Also, we should dispatch any messages in the consumers queue before dispatching messages in the session's queues.

http://issues.apache.org/activemq/browse/AMQ-1031
http://issues.apache.org/activemq/browse/AMQ-1032



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@472345 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-08 00:14:56 +00:00
parent c636b3798f
commit da5139c24b
8 changed files with 127 additions and 71 deletions

View File

@ -254,8 +254,6 @@
<!-- TODO need to get the JUnit test configured to create SSL sockets nicely via system properties -->
<exclude>**/StompSslTest.*</exclude>
<!-- TODO reproduces a bad ack bug -->
<exclude>**/RollbacksWhileConsumingLargeQueueTest.*</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -776,6 +776,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
if(deliveredMessages.isEmpty())
return;
// Only increase the redlivery delay after the first redelivery..
if( rollbackCounter > 0 )
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
rollbackCounter++;
if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
// We need to NACK the messages so that they get sent to the
@ -791,23 +796,29 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}else{
// stop the delivery of messages.
unconsumedMessages.stop();
// Start up the delivery again a little later.
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
Scheduler.executeAfterDelay(new Runnable(){
public void run(){
try{
if(started.get())
start();
}catch(JMSException e){
session.connection.onAsyncException(e);
}
}
},redeliveryDelay);
for(Iterator iter=deliveredMessages.iterator();iter.hasNext();){
MessageDispatch md=(MessageDispatch) iter.next();
md.getMessage().onMessageRolledBack();
unconsumedMessages.enqueueFirst(md);
}
if( redeliveryDelay > 0 ) {
// Start up the delivery again a little later.
Scheduler.executeAfterDelay(new Runnable(){
public void run(){
try{
if(started.get())
start();
}catch(JMSException e){
session.connection.onAsyncException(e);
}
}
},redeliveryDelay);
} else {
start();
}
}
deliveredCounter-=deliveredMessages.size();
deliveredMessages.clear();
@ -820,31 +831,33 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener;
try {
if (!unconsumedMessages.isClosed()) {
if (listener != null && unconsumedMessages.isRunning() ) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
listener.onMessage(message);
afterMessageIsConsumed(md, false);
} catch (RuntimeException e) {
if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
// Redeliver the message
} else {
// Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(md, false);
}
log.warn("Exception while processing message: " + e, e);
}
} else {
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
}
synchronized(unconsumedMessages.getMutex()){
if (!unconsumedMessages.isClosed()) {
if (listener != null && unconsumedMessages.isRunning() ) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
listener.onMessage(message);
afterMessageIsConsumed(md, false);
} catch (RuntimeException e) {
if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
// Redeliver the message
} else {
// Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(md, false);
}
log.warn("Exception while processing message: " + e, e);
}
} else {
unconsumedMessages.enqueue(md);
if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
}
}
}
} catch (Exception e) {
log.warn("could not process message: " + md, e);
session.connection.onAsyncException(e);
}
}
@ -853,18 +866,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
public void start() throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
}
started.set(true);
unconsumedMessages.start();
MessageListener listener = this.messageListener;
if( listener!=null ) {
MessageDispatch md;
while( (md = unconsumedMessages.dequeueNoWait())!=null ) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
listener.onMessage(message);
afterMessageIsConsumed(md, false);
}
}
session.executor.wakeup();
}
public void stop() {
@ -876,4 +883,28 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return "ActiveMQMessageConsumer { value=" +info.getConsumerId()+", started=" +started.get()+" }";
}
/**
* Delivers a message to the message listener.
* @return
* @throws JMSException
*/
public boolean iterate() {
MessageListener listener = this.messageListener;
if( listener!=null ) {
MessageDispatch md = unconsumedMessages.dequeueNoWait();
if( md!=null ) {
try {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
listener.onMessage(message);
afterMessageIsConsumed(md, false);
} catch (JMSException e) {
session.connection.onAsyncException(e);
}
return true;
}
}
return false;
}
}

View File

@ -61,8 +61,8 @@ public class ActiveMQSessionExecutor implements Task {
}
}
private void wakeup() {
if( !dispatchedBySessionPool && hasUncomsumedMessages() ) {
public void wakeup() {
if( !dispatchedBySessionPool ) {
if( taskRunner!=null ) {
try {
taskRunner.wakeup();
@ -148,6 +148,16 @@ public class ActiveMQSessionExecutor implements Task {
}
public boolean iterate() {
// Deliver any messages queued on the consumer to their listeners.
for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
if( consumer.iterate() ) {
return true;
}
}
// No messages left queued on the listeners.. so now dispatch messages queued on the session
MessageDispatch message = messageQueue.dequeueNoWait();
if( message==null ) {
return false;

View File

@ -32,7 +32,7 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
// +/-15% for a 30% spread -cgs
protected double collisionAvoidanceFactor = 0.15d;
protected int maximumRedeliveries = 5;
protected int maximumRedeliveries = 6;
protected long initialRedeliveryDelay = 1000L;
protected static Random randomNumberGenerator;
protected boolean useCollisionAvoidance = false;

View File

@ -183,6 +183,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
prefetchExtension--;
}
}
public void afterRollback() throws Exception {
super.afterRollback();
}
});
}
index++;

View File

@ -57,7 +57,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
protected RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(1000);
redeliveryPolicy.setMaximumRedeliveries(2);
redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setBackOffMultiplier((short) 2);
redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy;
@ -82,7 +82,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
try {
log.info("Message Received: " + message);
counter++;
if (counter <= 3) {
if (counter <= 4) {
log.info("Message Rollback.");
session.rollback();
} else {
@ -119,24 +119,26 @@ public class MessageListenerRedeliveryTest extends TestCase {
} catch (InterruptedException e) {
}
// first try
assertEquals(1, listener.counter);
// first try.. should get 2 since there is no delay on the
// first redeliver..
assertEquals(2, listener.counter);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// second try (redelivery after 1 sec)
assertEquals(2, listener.counter);
// 2nd redeliver (redelivery after 1 sec)
assertEquals(3, listener.counter);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
// third try (redelivery after 2 seconds) - it should give up after that
assertEquals(3, listener.counter);
// 3rd redeliver (redelivery after 2 seconds) - it should give up after that
assertEquals(4, listener.counter);
// create new message
producer.send(createTextMessage(session));
@ -148,7 +150,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
// ignore
}
// it should be committed, so no redelivery
assertEquals(4, listener.counter);
assertEquals(5, listener.counter);
try {
Thread.sleep(1500);
@ -156,7 +158,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
// ignore
}
// no redelivery, counter should still be 4
assertEquals(4, listener.counter);
assertEquals(5, listener.counter);
session.close();
}
@ -185,7 +187,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
}
// first try
assertEquals(1, listener.counter);
assertEquals(2, listener.counter);
try {
Thread.sleep(1000);
@ -193,7 +195,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
}
// second try (redelivery after 1 sec)
assertEquals(2, listener.counter);
assertEquals(3, listener.counter);
try {
Thread.sleep(2000);
@ -201,7 +203,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
}
// third try (redelivery after 2 seconds) - it should give up after that
assertEquals(3, listener.counter);
assertEquals(4, listener.counter);
// create new message
producer.send(createTextMessage(session));
@ -213,7 +215,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
// ignore
}
// it should be committed, so no redelivery
assertEquals(4, listener.counter);
assertEquals(5, listener.counter);
try {
Thread.sleep(1500);
@ -221,7 +223,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
// ignore
}
// no redelivery, counter should still be 4
assertEquals(4, listener.counter);
assertEquals(5, listener.counter);
session.close();
}

View File

@ -70,9 +70,15 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals("1st", m.getText());
session.rollback();
// Show re-delivery delay is incrementing.
// No delay on first rollback..
m = (TextMessage)consumer.receive(100);
assertNotNull(m);
session.rollback();
// Show subsequent re-delivery delay is incrementing.
m = (TextMessage)consumer.receive(100);
assertNull(m);
m = (TextMessage)consumer.receive(500);
assertNotNull(m);
assertEquals("1st", m.getText());
@ -117,7 +123,12 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals("1st", m.getText());
session.rollback();
// Show re-delivery delay is incrementing.
// No delay on first rollback..
m = (TextMessage)consumer.receive(100);
assertNotNull(m);
session.rollback();
// Show subsequent re-delivery delay is incrementing.
m = (TextMessage)consumer.receive(100);
assertNull(m);
m = (TextMessage)consumer.receive(500);

View File

@ -45,7 +45,7 @@ public class RollbacksWhileConsumingLargeQueueTest extends
private CountDownLatch latch;
private Throwable failure;
public void xtestWithReciever() throws Throwable {
public void testWithReciever() throws Throwable {
latch = new CountDownLatch(numberOfMessagesOnQueue);
Session session = connection.createSession(true, 0);
MessageConsumer consumer = session.createConsumer(destination);
@ -148,11 +148,11 @@ public class RollbacksWhileConsumingLargeQueueTest extends
int value = deliveryCounter.incrementAndGet();
if (value % 2 == 0) {
log.info("Rolling Back message: " + value + " id: " + msgId + ", content: " + msgText);
log.info("Rolling Back message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
throw new RuntimeException("Dummy exception on message: " + value);
}
log.info("Received message: " + value + " id: " + msgId + ", content: " + msgText);
log.info("Received message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
ackCounter.incrementAndGet();
latch.countDown();
}