diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 248b240355..249e637dec 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -254,8 +254,6 @@ **/StompSslTest.* - - **/RollbacksWhileConsumingLargeQueueTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index db072db337..52aa3a9aa3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -776,6 +776,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } if(deliveredMessages.isEmpty()) return; + + // Only increase the redlivery delay after the first redelivery.. + if( rollbackCounter > 0 ) + redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); + rollbackCounter++; if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){ // We need to NACK the messages so that they get sent to the @@ -791,23 +796,29 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC }else{ // stop the delivery of messages. unconsumedMessages.stop(); - // Start up the delivery again a little later. - redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); - Scheduler.executeAfterDelay(new Runnable(){ - public void run(){ - try{ - if(started.get()) - start(); - }catch(JMSException e){ - session.connection.onAsyncException(e); - } - } - },redeliveryDelay); + for(Iterator iter=deliveredMessages.iterator();iter.hasNext();){ MessageDispatch md=(MessageDispatch) iter.next(); md.getMessage().onMessageRolledBack(); unconsumedMessages.enqueueFirst(md); } + + if( redeliveryDelay > 0 ) { + // Start up the delivery again a little later. + Scheduler.executeAfterDelay(new Runnable(){ + public void run(){ + try{ + if(started.get()) + start(); + }catch(JMSException e){ + session.connection.onAsyncException(e); + } + } + },redeliveryDelay); + } else { + start(); + } + } deliveredCounter-=deliveredMessages.size(); deliveredMessages.clear(); @@ -820,31 +831,33 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC public void dispatch(MessageDispatch md) { MessageListener listener = this.messageListener; try { - if (!unconsumedMessages.isClosed()) { - if (listener != null && unconsumedMessages.isRunning() ) { - ActiveMQMessage message = createActiveMQMessage(md); - beforeMessageIsConsumed(md); - try { - listener.onMessage(message); - afterMessageIsConsumed(md, false); - } catch (RuntimeException e) { - if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) { - // Redeliver the message - } else { - // Transacted or Client ack: Deliver the next message. - afterMessageIsConsumed(md, false); - } - log.warn("Exception while processing message: " + e, e); - } - } else { - unconsumedMessages.enqueue(md); - if (availableListener != null) { - availableListener.onMessageAvailable(this); - } - } + synchronized(unconsumedMessages.getMutex()){ + if (!unconsumedMessages.isClosed()) { + if (listener != null && unconsumedMessages.isRunning() ) { + ActiveMQMessage message = createActiveMQMessage(md); + beforeMessageIsConsumed(md); + try { + listener.onMessage(message); + afterMessageIsConsumed(md, false); + } catch (RuntimeException e) { + if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) { + // Redeliver the message + } else { + // Transacted or Client ack: Deliver the next message. + afterMessageIsConsumed(md, false); + } + log.warn("Exception while processing message: " + e, e); + } + } else { + unconsumedMessages.enqueue(md); + if (availableListener != null) { + availableListener.onMessageAvailable(this); + } + } + } } } catch (Exception e) { - log.warn("could not process message: " + md, e); + session.connection.onAsyncException(e); } } @@ -853,18 +866,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } public void start() throws JMSException { + if (unconsumedMessages.isClosed()) { + return; + } started.set(true); unconsumedMessages.start(); - MessageListener listener = this.messageListener; - if( listener!=null ) { - MessageDispatch md; - while( (md = unconsumedMessages.dequeueNoWait())!=null ) { - ActiveMQMessage message = createActiveMQMessage(md); - beforeMessageIsConsumed(md); - listener.onMessage(message); - afterMessageIsConsumed(md, false); - } - } + session.executor.wakeup(); } public void stop() { @@ -876,4 +883,28 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return "ActiveMQMessageConsumer { value=" +info.getConsumerId()+", started=" +started.get()+" }"; } + /** + * Delivers a message to the message listener. + * @return + * @throws JMSException + */ + public boolean iterate() { + MessageListener listener = this.messageListener; + if( listener!=null ) { + MessageDispatch md = unconsumedMessages.dequeueNoWait(); + if( md!=null ) { + try { + ActiveMQMessage message = createActiveMQMessage(md); + beforeMessageIsConsumed(md); + listener.onMessage(message); + afterMessageIsConsumed(md, false); + } catch (JMSException e) { + session.connection.onAsyncException(e); + } + return true; + } + } + return false; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index b34840caa4..88b6817c3f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -61,8 +61,8 @@ public class ActiveMQSessionExecutor implements Task { } } - private void wakeup() { - if( !dispatchedBySessionPool && hasUncomsumedMessages() ) { + public void wakeup() { + if( !dispatchedBySessionPool ) { if( taskRunner!=null ) { try { taskRunner.wakeup(); @@ -148,6 +148,16 @@ public class ActiveMQSessionExecutor implements Task { } public boolean iterate() { + + // Deliver any messages queued on the consumer to their listeners. + for (Iterator i = this.session.consumers.iterator(); i.hasNext();) { + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next(); + if( consumer.iterate() ) { + return true; + } + } + + // No messages left queued on the listeners.. so now dispatch messages queued on the session MessageDispatch message = messageQueue.dequeueNoWait(); if( message==null ) { return false; diff --git a/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java index aed940d79c..4314b30132 100644 --- a/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java @@ -32,7 +32,7 @@ public class RedeliveryPolicy implements Cloneable, Serializable { // +/-15% for a 30% spread -cgs protected double collisionAvoidanceFactor = 0.15d; - protected int maximumRedeliveries = 5; + protected int maximumRedeliveries = 6; protected long initialRedeliveryDelay = 1000L; protected static Random randomNumberGenerator; protected boolean useCollisionAvoidance = false; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 99b9ac62fb..fc2caf5175 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -183,6 +183,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ prefetchExtension--; } } + + public void afterRollback() throws Exception { + super.afterRollback(); + } }); } index++; diff --git a/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java b/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java index 6db9b879c6..acbd4c9fe0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java @@ -57,7 +57,7 @@ public class MessageListenerRedeliveryTest extends TestCase { protected RedeliveryPolicy getRedeliveryPolicy() { RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setInitialRedeliveryDelay(1000); - redeliveryPolicy.setMaximumRedeliveries(2); + redeliveryPolicy.setMaximumRedeliveries(3); redeliveryPolicy.setBackOffMultiplier((short) 2); redeliveryPolicy.setUseExponentialBackOff(true); return redeliveryPolicy; @@ -82,7 +82,7 @@ public class MessageListenerRedeliveryTest extends TestCase { try { log.info("Message Received: " + message); counter++; - if (counter <= 3) { + if (counter <= 4) { log.info("Message Rollback."); session.rollback(); } else { @@ -119,24 +119,26 @@ public class MessageListenerRedeliveryTest extends TestCase { } catch (InterruptedException e) { } - // first try - assertEquals(1, listener.counter); + + // first try.. should get 2 since there is no delay on the + // first redeliver.. + assertEquals(2, listener.counter); try { Thread.sleep(1000); } catch (InterruptedException e) { } - // second try (redelivery after 1 sec) - assertEquals(2, listener.counter); + // 2nd redeliver (redelivery after 1 sec) + assertEquals(3, listener.counter); try { Thread.sleep(2000); } catch (InterruptedException e) { } - // third try (redelivery after 2 seconds) - it should give up after that - assertEquals(3, listener.counter); + // 3rd redeliver (redelivery after 2 seconds) - it should give up after that + assertEquals(4, listener.counter); // create new message producer.send(createTextMessage(session)); @@ -148,7 +150,7 @@ public class MessageListenerRedeliveryTest extends TestCase { // ignore } // it should be committed, so no redelivery - assertEquals(4, listener.counter); + assertEquals(5, listener.counter); try { Thread.sleep(1500); @@ -156,7 +158,7 @@ public class MessageListenerRedeliveryTest extends TestCase { // ignore } // no redelivery, counter should still be 4 - assertEquals(4, listener.counter); + assertEquals(5, listener.counter); session.close(); } @@ -184,8 +186,8 @@ public class MessageListenerRedeliveryTest extends TestCase { } catch (InterruptedException e) { } - // first try - assertEquals(1, listener.counter); + // first try + assertEquals(2, listener.counter); try { Thread.sleep(1000); @@ -193,7 +195,7 @@ public class MessageListenerRedeliveryTest extends TestCase { } // second try (redelivery after 1 sec) - assertEquals(2, listener.counter); + assertEquals(3, listener.counter); try { Thread.sleep(2000); @@ -201,7 +203,7 @@ public class MessageListenerRedeliveryTest extends TestCase { } // third try (redelivery after 2 seconds) - it should give up after that - assertEquals(3, listener.counter); + assertEquals(4, listener.counter); // create new message producer.send(createTextMessage(session)); @@ -213,7 +215,7 @@ public class MessageListenerRedeliveryTest extends TestCase { // ignore } // it should be committed, so no redelivery - assertEquals(4, listener.counter); + assertEquals(5, listener.counter); try { Thread.sleep(1500); @@ -221,7 +223,7 @@ public class MessageListenerRedeliveryTest extends TestCase { // ignore } // no redelivery, counter should still be 4 - assertEquals(4, listener.counter); + assertEquals(5, listener.counter); session.close(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index c7d8ec13f9..6a7ebe396d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -70,9 +70,15 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertEquals("1st", m.getText()); session.rollback(); - // Show re-delivery delay is incrementing. + // No delay on first rollback.. + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + session.rollback(); + + // Show subsequent re-delivery delay is incrementing. m = (TextMessage)consumer.receive(100); assertNull(m); + m = (TextMessage)consumer.receive(500); assertNotNull(m); assertEquals("1st", m.getText()); @@ -117,7 +123,12 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertEquals("1st", m.getText()); session.rollback(); - // Show re-delivery delay is incrementing. + // No delay on first rollback.. + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + session.rollback(); + + // Show subsequent re-delivery delay is incrementing. m = (TextMessage)consumer.receive(100); assertNull(m); m = (TextMessage)consumer.receive(500); diff --git a/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java index 8c8deac79d..cbf1badf76 100644 --- a/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java @@ -45,7 +45,7 @@ public class RollbacksWhileConsumingLargeQueueTest extends private CountDownLatch latch; private Throwable failure; - public void xtestWithReciever() throws Throwable { + public void testWithReciever() throws Throwable { latch = new CountDownLatch(numberOfMessagesOnQueue); Session session = connection.createSession(true, 0); MessageConsumer consumer = session.createConsumer(destination); @@ -148,11 +148,11 @@ public class RollbacksWhileConsumingLargeQueueTest extends int value = deliveryCounter.incrementAndGet(); if (value % 2 == 0) { - log.info("Rolling Back message: " + value + " id: " + msgId + ", content: " + msgText); + log.info("Rolling Back message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText); throw new RuntimeException("Dummy exception on message: " + value); } - log.info("Received message: " + value + " id: " + msgId + ", content: " + msgText); + log.info("Received message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText); ackCounter.incrementAndGet(); latch.countDown(); }