diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index e327ef1557..14c2869dbb 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -937,7 +937,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta @Override public void afterRollback() throws Exception { LOG.trace("rollback {}", ack, new Throwable("here")); - md.getMessage().onMessageRolledBack(); // ensure we don't filter this as a duplicate connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); @@ -956,7 +955,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); int redeliveryCounter = md.getMessage().getRedeliveryCounter(); if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES - && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { + && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { // We need to NACK the messages so that they get // sent to the // DLQ. @@ -986,6 +985,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } }, redeliveryDelay); } + md.getMessage().onMessageRolledBack(); } }); } diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java index 91f2b71923..e0a8f333d1 100644 --- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java +++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java @@ -98,7 +98,10 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, } public long getNextRedeliveryDelay(long previousDelay) { - long nextDelay = redeliveryDelay; + long nextDelay = initialRedeliveryDelay; + if (nextDelay == 0) { + nextDelay = redeliveryDelay; + } if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) { nextDelay = (long) (previousDelay * backOffMultiplier); diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java index 5927d3ecf0..904dd188d0 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java @@ -23,6 +23,8 @@ import java.lang.reflect.Method; import java.util.Timer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; @@ -424,12 +426,18 @@ public class MDBTest extends TestCase { adapter.setServerUrl("vm://localhost?broker.persistent=false"); adapter.start(new StubBootstrapContext()); - final CountDownLatch messageDelivered = new CountDownLatch(2); + final CountDownLatch messageDelivered = new CountDownLatch(5); + final AtomicLong timeReceived = new AtomicLong(); + final AtomicBoolean failed = new AtomicBoolean(false); final StubMessageEndpoint endpoint = new StubMessageEndpoint() { public void onMessage(Message message) { super.onMessage(message); try { + long now = System.currentTimeMillis(); + if ((now - timeReceived.getAndSet(now)) > 1000) { + failed.set(true); + } messageDelivered.countDown(); if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) { throw new RuntimeException(getName() + " ex on first delivery"); @@ -463,6 +471,7 @@ public class MDBTest extends TestCase { ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); activationSpec.setDestinationType(Queue.class.getName()); activationSpec.setDestination("TEST"); + activationSpec.setInitialRedeliveryDelay(100); activationSpec.setResourceAdapter(adapter); activationSpec.validate(); @@ -486,7 +495,7 @@ public class MDBTest extends TestCase { } catch (Exception e) { } - + timeReceived.set(System.currentTimeMillis()); // Send the broker a message to that endpoint MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); producer.send(session.createTextMessage("Hello!")); @@ -494,6 +503,7 @@ public class MDBTest extends TestCase { // Wait for the message to be delivered twice. assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS)); + assertFalse("Delivery policy delay not working", failed.get()); // Shut the Endpoint down. adapter.endpointDeactivation(messageEndpointFactory, activationSpec); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index e2b58677c3..1f8f687e77 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -69,6 +69,20 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertEquals(500, delay); } + public void testGetNextWithInitialDelay() throws Exception { + + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setInitialRedeliveryDelay(500); + + long delay = policy.getNextRedeliveryDelay(500); + assertEquals(500, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(500, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(500, delay); + + } + /** * @throws Exception */