This commit is contained in:
Dejan Bosanac 2015-03-18 17:07:45 +01:00
parent ca456c4601
commit 20832f1f1b
4 changed files with 32 additions and 5 deletions

View File

@ -937,7 +937,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
@Override @Override
public void afterRollback() throws Exception { public void afterRollback() throws Exception {
LOG.trace("rollback {}", ack, new Throwable("here")); LOG.trace("rollback {}", ack, new Throwable("here"));
md.getMessage().onMessageRolledBack();
// ensure we don't filter this as a duplicate // ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
@ -956,7 +955,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter(); int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get // We need to NACK the messages so that they get
// sent to the // sent to the
// DLQ. // DLQ.
@ -986,6 +985,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} }
}, redeliveryDelay); }, redeliveryDelay);
} }
md.getMessage().onMessageRolledBack();
} }
}); });
} }

View File

@ -98,7 +98,10 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
} }
public long getNextRedeliveryDelay(long previousDelay) { public long getNextRedeliveryDelay(long previousDelay) {
long nextDelay = redeliveryDelay; long nextDelay = initialRedeliveryDelay;
if (nextDelay == 0) {
nextDelay = redeliveryDelay;
}
if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) { if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
nextDelay = (long) (previousDelay * backOffMultiplier); nextDelay = (long) (previousDelay * backOffMultiplier);

View File

@ -23,6 +23,8 @@ import java.lang.reflect.Method;
import java.util.Timer; import java.util.Timer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection; import javax.jms.Connection;
@ -424,12 +426,18 @@ public class MDBTest extends TestCase {
adapter.setServerUrl("vm://localhost?broker.persistent=false"); adapter.setServerUrl("vm://localhost?broker.persistent=false");
adapter.start(new StubBootstrapContext()); adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(2); final CountDownLatch messageDelivered = new CountDownLatch(5);
final AtomicLong timeReceived = new AtomicLong();
final AtomicBoolean failed = new AtomicBoolean(false);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() { final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
public void onMessage(Message message) { public void onMessage(Message message) {
super.onMessage(message); super.onMessage(message);
try { try {
long now = System.currentTimeMillis();
if ((now - timeReceived.getAndSet(now)) > 1000) {
failed.set(true);
}
messageDelivered.countDown(); messageDelivered.countDown();
if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) { if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
throw new RuntimeException(getName() + " ex on first delivery"); throw new RuntimeException(getName() + " ex on first delivery");
@ -463,6 +471,7 @@ public class MDBTest extends TestCase {
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(Queue.class.getName()); activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setDestination("TEST"); activationSpec.setDestination("TEST");
activationSpec.setInitialRedeliveryDelay(100);
activationSpec.setResourceAdapter(adapter); activationSpec.setResourceAdapter(adapter);
activationSpec.validate(); activationSpec.validate();
@ -486,7 +495,7 @@ public class MDBTest extends TestCase {
} catch (Exception e) { } catch (Exception e) {
} }
timeReceived.set(System.currentTimeMillis());
// Send the broker a message to that endpoint // Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
producer.send(session.createTextMessage("Hello!")); producer.send(session.createTextMessage("Hello!"));
@ -494,6 +503,7 @@ public class MDBTest extends TestCase {
// Wait for the message to be delivered twice. // Wait for the message to be delivered twice.
assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS)); assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
assertFalse("Delivery policy delay not working", failed.get());
// Shut the Endpoint down. // Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec); adapter.endpointDeactivation(messageEndpointFactory, activationSpec);

View File

@ -69,6 +69,20 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals(500, delay); assertEquals(500, delay);
} }
public void testGetNextWithInitialDelay() throws Exception {
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
long delay = policy.getNextRedeliveryDelay(500);
assertEquals(500, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(500, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(500, delay);
}
/** /**
* @throws Exception * @throws Exception
*/ */